当前位置: 首页 > 知识库问答 >
问题:

时间窗口内的Spring Kafka批次

樊俊悟
2023-03-14

Spring Boot environment侦听kafka主题(@KafkaListener/@StreamListener)将侦听器工厂配置为以批处理模式运行:

ConcurrentKafkaListenerContainerFactory # setBatchListener

或通过应用程序。属性:

spring.kafka.listener.type=batch

如何配置框架,以便给定两个数字:N和T,它将尝试为侦听器获取N条记录,但不会等待超过T秒,如下所述:https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/groupedWithin.html
我看过的一些属性:

  • 最大轮询记录确保您在一个批处理中不会获得超过N个数字
  • fetch-min-size在获取请求中至少获取这个数据量
  • 抓取-最大-等待但不要等待超过必要的时间
  • idleBetweenPolls只是在民意调查之间睡一会儿:)

似乎应该将fetch min size与fetch max wait结合使用,但它们比较的是字节,而不是消息/记录。

很明显,手动实现这一点是可能的,我正在考虑是否可以将Spring配置为手动实现这一点。

共有1个答案

水浩歌
2023-03-14

似乎应该将fetch min size与fetch max wait结合使用,但它们比较的是字节,而不是消息/记录。

这是正确的,不幸的是,Kafka没有提供诸如提取之类的机制。最小记录数。

我不认为Spring会将此功能叠加在kafka客户端之上;最好在Kafka本身中要求一个新功能。

Spring根本不会操作从轮询返回的记录,除了您现在可以指定subBatchPer分区以获取仅包含一个分区的批次,以便在使用一次读/prcess时正确支持僵尸Geofence/写。

 类似资料:
  • 我有一个关于Kafka流的时间窗的问题,有些概念真的让我困惑。 我们有一个主题每天获得1000万个事件,我们有6天的日志保留,所以总的主题包含6000万个事件。 现在,我创建了一个KTable,我正在执行load all操作并迭代事件。正如我之前提到的,实际上我们只是当前的事件,而不是6千万事件,所以我在KTable定义中窗口化了这些数据。 现在,当我用以下语句加载所有事件时,一切都运行良好。 问

  • 我有数据流就像 事件名,事件id,Start_time(时间戳)... 在这里,我想对最后一个带有时间戳的字段<;code>;Start_。 因此,我在flink window中看到的是,所以我猜它需要过去30分钟的事件,但不考虑 我想把数据放在start_ time在最后30分钟内的位置,然后我如何编写转换?我是否需要使用该列使用? 我是Flink的新手。 谢啦

  • 我们使用流,并将每个消息发布到另一个主题,该主题按用户id对记录进行分区(按用户id重新分区原始流)。 然后我们消耗这个重新分区的流,我们将消耗的记录存储在加窗10分钟的本地状态存储中。一个特定用户的所有点击总是在同一个分区中,但顺序并不保证,因为最初的主题有10个分区。 我理解Kafka流的窗口模型,当新记录进入时,时间会提前,但我需要这个窗口使用处理时间,而不是事件时间,然后当窗口过期时,我需

  • 在流处理问题中,我们有3个传感器,每个传感器每8毫秒生成一个时间戳样本(传感器的时间是同步的)。所以我想合并每个时间戳的数据(对于3个传感器,我们应该为每个时间戳输出3个合并的样本数据)。此外,我们有一个160毫秒的时间限制,这样每个数据在生成时间戳后最多应该在160毫秒后输出。所以我决定使用Flink事件时间概念和时间窗口。因为时间戳在每个传感器的样本中都是唯一的,所以我们认为它是数据流的关键。

  • 假设我有一个股票市场交易事件流,如下所示: 使得technicalN(其中N是一些数字)代表给定公司的日终股票市场交易数据的第N个技术交易条目[开盘(浮动)、高位(浮动)、低位(浮动)、收盘(浮动)、成交量(int)]。(即ticker GOOG的技术1不同于ticker MSFT的技术1。)如: (请注意,这些交易价格/交易量完全是虚构的。 假设我想创建一个大小为2、时间间隔为1天的窗口,这样我

  • 我想知道是否可以创建类似于以下内容的WindowAssigner: 但我不希望窗口在每个元素的事件时间中保持增长。我希望在接收到的第一个元素(对于该键)处定义窗口的开头,并在1秒后精确结束,无论有多少元素到达该秒。 所以它可能看起来像这样的假设: 谢谢