当前位置: 首页 > 知识库问答 >
问题:

如何合并多个kafka流,以便对结果流的所有事件执行会话窗口

司寇祺
2023-03-14

我们有多个具有不同业务事件(页面视图、单击、滚动事件等)的输入主题。据我所知,Kafka流都有一个事件时间戳,可用于KStream与其他流或表的连接以对齐时间。

我们要做的是:合并用户id(即按用户id分组)的所有不同事件(源自上述不同主题),并对其应用会话窗口。

这应该通过在包含所有事件的流上使用group pByKey,然后使用聚合/减少(在此指定非活动时间)来实现。这个组合流必须按照事件时间的顺序(或者以上kafka流方法尊重这个事件时间的方式)包含来自不同输入主题的所有事件。

剩下的唯一挑战是创建这个组合/合并流。

当我查看Kafka Streams API时,有一个KStreamBuilder#merge操作,javadoc说:对于来自不同{@link KStream}的记录没有排序保证。。这是否意味着会话窗口将产生不正确的结果?

如果是,除了“合并”还有什么选择?

共有2个答案

柴兴修
2023-03-14

我们要做的是:合并所有不同的事件(源自上面提到的不同主题)的用户标识(即按用户标识分组),并对其应用会话窗口。

据我所知,您需要加入流(并使用groupBy确保它们可以通过用户id正确加入),而不是合并它们。然后,您可以使用会话窗口聚合进行后续操作。

谭嘉容
2023-03-14

我也在考虑加入,但事实上,这似乎取决于每个ID的每个主题是否有一个事件,或者在一个输入主题中是否有多个ID相同的事件。对于第一种情况,加入是一个很好的策略,但对于后一种情况,则不是这样,因为您会得到一些不必要的重复。

stream A: <a,1> <a,2>
stream B: <a,3>
join-output plus session: <a,1-3 + 2-3>

编号3将是重复的。

还要记住,如果您在连接结果或原始数据上应用时间戳,那么连接会稍微修改时间戳,因此您的会话窗口可能会有所不同。

关于merge()和排序。您可以安全地使用merge(),因为会话窗口将基于记录时间戳而不是偏移顺序生成。Kafka流中的所有窗口操作都可以优雅地处理无序数据。

 类似资料:
  • 从flink办公室引入会话窗口 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-窗口。。。会话窗口操作符为每个到达的记录创建一个新窗口,如果窗口之间的距离比定义的间隙更近,则将窗口合并在一起。为了可合并,会话窗口操作符需要合并触发器和合并窗口函数,。。。

  • 你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点

  • 我已经能够创建一个“会话开始信号”流,如本答案所述。 是否可以在每次窗口聚合结束时创建一个“会话结束信号”流?

  • 我有2个Kafka的主题流完全相同的内容从不同的来源,所以我可以有高可用性的情况下,其中一个来源失败。我正在尝试使用Kafka Streams0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何关于失败的消息,并且当所有源都启动时没有重复的消息。 当使用KStream的方法时,其中一个主题可以毫无问题地关闭(次要主题),但是当主主题关闭时,将不会向输出主题发送任何内容。这似乎是因为,

  • 为什么会这样?如果我在“assigntimestamps(timestampExtractor)”之前添加“keyby(keySelector)”,那么程序可以工作。有人能解释一下原因吗?

  • 我正在玩Firebase Storage烘烤成一个照片收集应用程序,收集照片并上传到Firebase Storage后端。我想上传多张照片并等待所有任务完成,收集下载URL并进行下一步。下面是我的代码 我希望收集所有的URI,并使用DownloadURI中存储的结果。当我输入这个时,我意识到这可能不是firebase的问题,而是理解如何等待并行任务完成,执行并继续处理结果。我怎样才能达到这个目标呢