当前位置: 首页 > 知识库问答 >
问题:

如何在Flink中连接两个流并进行操作?

郁高韵
2023-03-14

我有一个数据流


共有1个答案

司空叶五
2023-03-14

在所描述的情况下,最好的想法是简单地使用广播状态模式。元素较少的第二个流将成为广播流,然后具有更多元素的第一个流将用第二个元素的元素进行丰富。所以,你会有这样的东西:

//define broadcast state here

firstStream.keyBy([someKey])
.connect(secondStream.broadcast([mapStateDescriptor])
.process([YourProcessFunction])

然后在您的流程元素的流程函数中,您可以进行扩充以生成预期的元组。

有关广播模式的更多信息,请参见:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

 类似资料:
  • 例如,我想在单个中组合和的流,因此结果应该是:。换句话说:如果第一个源已耗尽-从第二个源获取元素。我最近的尝试是: 也对datetime进行了类似的尝试,但结果相同。

  • 在Flink-Job中,我目前有两个流,一个是每分钟从Kafka主题更新的主数据流,另一个流(广播流)用于KeyedBroadcastProcessFunction的process元素函数中,用于对主流数据进行一些计算。 2)主数据可以有两个广播流吗? 3)由于流数据是完全不同的数据,广播,第三个数据流不经常变化,所以连接是不起作用的。它就像一个主数据,在计算中和主数据流一起使用,找不到任何解决方

  • 我有两个流,希望将第二个流连接到窗口内的第一个流,因为我需要对与会话相关的两个流的连接进行一些计算(流的连接控制会话)。 实际上,当从留档读取时,(会话)窗口只允许在单个流上进行计算,而不允许在连接中进行计算。 我曾尝试使用会话窗口和协处理器函数的组合,但结果并不完全符合我的预期。 有没有办法合并Flink中与会话窗口相关的两个流?

  • 我有3个不同类型的键控数据流。 我不能使用联合(允许多个数据流),因为类型不同。我希望避免创建包装器,并将所有流转换为相同的类型。

  • 我开始使用flink,看看官方教程之一。 据我所知,这个练习的目标是在时间属性上加入两个流。 任务: 此练习的结果是一个Tuple2记录的数据流,每个记录对应一个不同的rideId。您应该忽略结束事件,只在每次骑乘开始时加入事件,并提供相应的票价数据。 生成的流应打印到标准输出。 问:EnrichmentFunction如何连接这两个流aka。它如何知道参加哪个集市和哪个骑行?我希望它能够缓冲多个

  • 我在flink中有两个nifi流源,我需要对这两个源执行连接。哪种方法更好?它是数据流提供的联接api还是表api(https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/streaming.html#streaming-concepts)?