当前位置: 首页 > 知识库问答 >
问题:

Flink在会话窗口中连接两个流

陶弘业
2023-03-14

我有两个流,希望将第二个流连接到窗口内的第一个流,因为我需要对与会话相关的两个流的连接进行一些计算(流的连接控制会话)。

实际上,当从留档读取时,(会话)窗口只允许在单个流上进行计算,而不允许在连接中进行计算。

我曾尝试使用会话窗口和协处理器函数的组合,但结果并不完全符合我的预期。

有没有办法合并Flink中与会话窗口相关的两个流?

共有1个答案

邴俊民
2023-03-14

Flink的DataStream API包括一个会话窗口连接,如下所述。

你必须看看它的语义是否符合你的想法。会话间隔由在该间隔期间没有事件的两个流定义,并且连接是内部连接,因此如果存在仅包含一个流的元素的会话窗口,则不会发出任何输出。

如果这不能满足您的需求,那么我建议使用CoProcessFunction,但不要会话窗口。换句话说,我建议你可以自己实现所有的逻辑。

 类似资料:
  • 我有一个聚合函数,它计算WindowedStream中一系列事件的平均值。 这里的警告是,平均值需要在可能无序(或根本没有)到达的事件对上计算。 换句话说,我需要在计算之前对数据进行排序,因为序列很重要。 我可以用getResult API来实现这一点,但是这个函数在窗口中的每个事件上都被调用,这在性能方面没有意义。我也可以用flink cep来做这件事,但出于同样的原因,我想避免使用它。 理想情

  • 我有一个由两个字段“键控”的记录流,然后分配一个间隔为30秒的会话窗口。我使用附加在记录上的“时间戳”作为事件时间。我正在使用“Assign AscendingTimeStamps”水印。 以下面的记录为例。该流由(用户,place)键控。 Record1:user1,place1,timestamp t1 Record2:user2,place1,timestamp在t1之后30秒 桶1 Rec

  • 例如,我想在单个中组合和的流,因此结果应该是:。换句话说:如果第一个源已耗尽-从第二个源获取元素。我最近的尝试是: 也对datetime进行了类似的尝试,但结果相同。

  • Flink中的会话窗口在prod env上没有按预期工作(相同的逻辑在本地env上工作)。这个想法是为特定的用户ID发出“sample_event_two”的计数 尽管集合中存在sample_event_one(通过验证日志消息“已接收sample_event_one”是否存在来确认)并且计数计算正确,但我没有看到任何输出事件被创建。我看到日志消息“未找到 sampleOneEvent 事件,而不

  • 你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点