当前位置: 首页 > 知识库问答 >
问题:

Flink表到DataStream:如何访问列名?

穆丁雨
2023-03-14

我想使用Flink SQL将一个Kafka主题转换成一个表,然后将其转换回DataStream。

以下是source_ddl:

CREATE TABLE kafka_source (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'test-group',
    'format' = 'json'
)
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)

有办法做到这一点吗?谢了。

共有1个答案

顾文昌
2023-03-14

从Flink1.12开始,可以通过table.getschema.getfieldnames访问它。从1.13版本开始,可以通过row.getFieldNames访问它。

 类似资料:
  • 我试图编写一个流作业,它将数据流下沉到postgres表中。为了提供完整的信息,我的工作基于以下文章:https://tech.signavio.com/2017/postgres-flink-sink,这些文章建议使用JDBCoutputFormat。 所以我的问题是:我错过了什么吗?我应该将插入的行提交到某个地方吗? 向你致意,伊格内修斯

  • 我对阿帕奇Flink是新手。我想创建一个DataStream,并向它提供来自另一个系统的值。 我找到了如何添加“SourceFunctions”的示例,在该函数中,我必须等待来自源代码的值,并通过调用CTX.Collect将这些值发布到Flink,然后再次等待,这就是轮询。 这能做到吗?否则,我必须在SourceFunction中执行连接和回调,然后用sleep做一个循环,但我不想这样做... 我

  • 我想加入一个大表,不可能包含在TM内存和流(kakfa)中。我在测试中成功加入了这两个表,将table-api与datastream api混合在一起。我做了以下操作: 它正在工作,但我从未见过这种类型的实现。可以吗?缺点是什么?

  • 问题内容: 我在python中有一个由列表列表建模的2D数组,我想提取该列。我进行了快速研究,发现了一种使用numpy数组的方法。问题是 我不想使用numpy, 所以不想将列表列表转换为numpy数组,然后使用[:,1]语法。我尝试在正常的列表列表上使用它,但显示错误,因此是不可能的。我正在为列表列表请求类似的东西,而不必遍历每个元素(在numpy数组中,使用[:,1]语法访问列比在数组元素上进行

  • 问题内容: 我有两个原始流,我正在加入这些流,然后我要计算已加入的事件总数是多少,尚未加入的事件有多少。我通过使用如下所示的地图来做到这一点 问题1: 这是计算流中事件数量的适当方法吗? 问题2: 我注意到一种有线行为,有些人可能不相信。问题是,当我在IntelliJ IDE中运行Flink程序时,它显示了的正确值,但是当我将该程序提交为时。因此,我获得了将程序作为文件运行而不是实际计数时的初始值