当前位置: 首页 > 知识库问答 >
问题:

Flink:数据流到表

韦睿
2023-03-14

不幸的是,Kafka Flink连接器只支持-csv、json和avro格式。因此,我不得不使用较低级别的API(数据流)。

问题:如果我可以从datastream对象中创建一个表,那么我就可以接受在该表上运行的查询。它将使转换部分无缝和通用。是否可以在数据流对象上运行SQL查询?

共有1个答案

毛峻
2023-03-14

如果您有一个Datastream对象,那么您可以简单地使用StreamTableEnvironment将给定的Datastream注册为表。

这将看起来或多或少如下所示:

val myStream = ...
val env: StreamExecutionEnvironment = configureFlinkEnv(StreamExecutionEnvironment.getExecutionEnvironment)
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
tEnv.registerDataStream("myTable", myStream, [Field expressions])

那么您应该能够查询从您的数据流创建的动态表。

 类似资料:
  • 尝试使用Apache Flink从Cassandra获取数据,引用本文,我可以读取数据,但我不知道如何将其加载到DataStream对象中。代码如下: 我试过了 将变量中的数据加载到数据流中

  • 如果是,请把我放在轨道上实现。

  • 下面是我的流处理的伪代码。 上面的代码流程正在创建多个文件,我猜每个文件都有不同窗口的记录。例如,每个文件中的记录都有时间戳,范围在30-40秒之间,而窗口时间只有10秒。我预期的输出模式是将每个窗口数据写入单独的文件。对此的任何引用或输入都会有很大帮助。

  • 我正在构建一个有以下要求的应用程序,我刚刚开始使用Flink。 null null 谢谢并感激你的帮助。

  • 如果每个事件间隔为1秒,并且有2秒的滞后,那么我希望示例输入和输出如下所示。 输入:1,2,3,4,5,6,7... 输出:NA,NA,1,2,3,4,5...

  • 需要一些建议,我已经使用scala创建了一个flink作业来消费来自Kafka的消息。但是消息是用base64编码压缩的。我已经试过这个代码了 代码由于它不是有效的Json格式而失败。 然后我尝试使用SimpleStringSchema(),就像下面的代码一样 Kafka的信息完美地消耗了,但是输出如下 如何将此数据解码为有效的JSON? 此致敬意