我有一个关于Kafka流的时间窗的问题,有些概念真的让我困惑。
我们有一个主题每天获得1000万个事件,我们有6天的日志保留,所以总的主题包含6000万个事件。
现在,我创建了一个KTable,我正在执行load all操作并迭代事件。正如我之前提到的,实际上我们只是当前的事件,而不是6千万事件,所以我在KTable定义中窗口化了这些数据。
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1))
现在,当我用以下语句加载所有事件时,一切都运行良好。
store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())
问题是在一天的早些时候,这将加载100万个事件,但稍后将加载1000万个事件,所以我必须迭代超过1000万个事件,而我们是在批处理模式下工作,我想我可以进一步优化这一点,只加载事件最后一个小时,所以对于相同的KTable配置,我尝试使用以下语句。
store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())
但令我惊讶的是,它没有返回任何数据。
有人能解释一下为什么这没有返回任何结果吗,我想我误解了时间窗口的概念。
然后我做了一些进一步的测试,并将我的KTable配置更改为followed。
.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1)))
store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())
但我不确定我走的是正确的路...
如果我将使用下面的语句来进行最新的KTable配置,那么这会从当前的一天向我传递1000万个事件吗?
store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())
在窗口存储上使用交互式查询时,时间范围将应用于窗口开始时间戳。因此,如果您有一个1天的窗口,并从[现在-1小时,现在)
查询具有窗口开始时间戳的数据,您将找不到任何匹配的窗口,因为在此时间范围内没有窗口开始。
为什么会这样?如果我在“assigntimestamps(timestampExtractor)”之前添加“keyby(keySelector)”,那么程序可以工作。有人能解释一下原因吗?
我们使用流,并将每个消息发布到另一个主题,该主题按用户id对记录进行分区(按用户id重新分区原始流)。 然后我们消耗这个重新分区的流,我们将消耗的记录存储在加窗10分钟的本地状态存储中。一个特定用户的所有点击总是在同一个分区中,但顺序并不保证,因为最初的主题有10个分区。 我理解Kafka流的窗口模型,当新记录进入时,时间会提前,但我需要这个窗口使用处理时间,而不是事件时间,然后当窗口过期时,我需
你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点
Spring Boot environment侦听kafka主题(@KafkaListener/@StreamListener)将侦听器工厂配置为以批处理模式运行: 或通过应用程序。属性: 如何配置框架,以便给定两个数字:N和T,它将尝试为侦听器获取N条记录,但不会等待超过T秒,如下所述:https://doc.akka.io/docs/akka/2.5/stream/operators/Sour
我们正在使用Kafka流的会话窗口来聚合相关事件的到达。除了聚合之外,我们还使用API指定窗口的保留时间。流信息: 会话窗口(非活动时间)为1分钟,传递到的保留时间为2分钟。我们使用定制的来映射事件的时间。 示例: 事件:e1;事件时间:上午10:00:00;到达时间:下午2点(同一天) 事件:e2;事件时间:上午10:00:30;到达时间:下午2:10(同一天) 第二个事件的到达时间是e1到达后
我正在运行一个简单的示例来测试基于EventTime的Windows。我能够生成带有处理时间的输出,但当我使用EventTime时,没有输出。请帮助我明白我做错了什么。