我们使用流,并将每个消息发布到另一个主题,该主题按用户id对记录进行分区(按用户id重新分区原始流)。
然后我们消耗这个重新分区的流,我们将消耗的记录存储在加窗10分钟的本地状态存储中。一个特定用户的所有点击总是在同一个分区中,但顺序并不保证,因为最初的主题有10个分区。
我理解Kafka流的窗口模型,当新记录进入时,时间会提前,但我需要这个窗口使用处理时间,而不是事件时间,然后当窗口过期时,我需要能够对缓冲的事件进行排序,并按此顺序将它们发送到另一个主题。
注意事项:
>
我们需要能够使用处理时间(而不是事件时间)刷新/处理窗口内的记录。我们不能等待下一次点击来提前时间,因为它可能永远不会发生。
我们需要从商店中删除所有的记录,一旦窗口被排序和刷新。
您可以在这里看到我对类似问题的回答:https://stackoverflow.com/a/44345374/7897191
由于您的消息键已经是唯一的,您可以忽略我关于去重复的评论。
现在KIP-138(挂钟标点语义)已经在1.1.0中发布了,您应该可以毫无问题地实现概述的算法了。它使用处理器API。我不知道只有DSL才能做到这一点。
我有一个关于Kafka流的时间窗的问题,有些概念真的让我困惑。 我们有一个主题每天获得1000万个事件,我们有6天的日志保留,所以总的主题包含6000万个事件。 现在,我创建了一个KTable,我正在执行load all操作并迭代事件。正如我之前提到的,实际上我们只是当前的事件,而不是6千万事件,所以我在KTable定义中窗口化了这些数据。 现在,当我用以下语句加载所有事件时,一切都运行良好。 问
具有Kafka Streams应用,其通过例如1天的流连接来执行开窗(使用原始事件时间,而不是挂钟时间)。 如果启动此拓扑,并从头开始重新处理数据(如在 lambda 样式的体系结构中),此窗口是否会将旧数据保留在那里?da 例如:如果今天是2022-01-09,而我收到来自2021-03-01的数据,那么这个旧数据会进入表格,还是会从一开始就被拒绝? 在这种情况下,可以采取什么策略来重新处理这些
输入: 结果:
来自火花流背景-掌握Kafka流。 我有一个简单的Spark流媒体应用程序, 并返回该分钟内每个用户的最新事件 示例事件类似于 我感兴趣的是这将如何在Kafka流中工作,因为似乎每个事件都有一个输出--当我的用例是减少流量时。 从我到目前为止的阅读来看,这似乎不是直接的,您将不得不使用处理器API。 理想情况下,我希望使用DSL而不是处理器API,因为我刚刚开始研究Kafka流,但似乎我必须使用处
Spring Boot environment侦听kafka主题(@KafkaListener/@StreamListener)将侦听器工厂配置为以批处理模式运行: 或通过应用程序。属性: 如何配置框架,以便给定两个数字:N和T,它将尝试为侦听器获取N条记录,但不会等待超过T秒,如下所述:https://doc.akka.io/docs/akka/2.5/stream/operators/Sour
为什么会这样?如果我在“assigntimestamps(timestampExtractor)”之前添加“keyby(keySelector)”,那么程序可以工作。有人能解释一下原因吗?