当前位置: 首页 > 知识库问答 >
问题:

SpringKafka抵消promise方式

方坚壁
2023-03-14

如果我不想使用自动通信模式,sping提供了另一种方法。

spring kafkfa/#提交补偿为我们提供了以下关于提交补偿的信息:

RECORD-当侦听器在处理记录后返回时提交偏移量
BATCH-处理poll()返回的所有记录后提交偏移量
TIME-当poll()返回的所有记录都已被处理时,只要超过自上次提交以来的确认时间,就提交偏移量
COUNT-当poll()返回的所有记录都已被处理时,只要自上次提交以来已收到ackCount记录,就提交偏移量
COUNT\u TIME-类似于TIME和COUNT,但如果任一条件为真,则执行提交
手册-消息侦听器负责确认();然后,应用与批处理相同的语义
MANUAL_IMMEDIATE-确认后立即提交偏移量。侦听器调用acknowledge()方法。

我有几个问题:

TIME据我所知,Spring框架中的某个地方存在循环

while(true){
   data = consumer.poll();
   data.foreach(record->listener.listen(record))
}

民意测验多久进行一次?

时间是提交偏移量的唯一标准吗?假设投票返回了100条记录,当ackTime过期时-只处理了60条记录?

我没有抓住MANUAL_IMMEDIATEMANUAL之间的区别

请为我澄清这些问题。

附笔。

据我所知,Garry Russel foreach的回答如下:

while(true){
   data = consumer.poll();
   data.foreach(record->new Thread(()->listener.listen(record)).start());
}

共有1个答案

濮佑运
2023-03-14

这取决于版本;最近的1.3版本有一个更简单的线程模型,由KIP-62提供了便利。

使用该版本,在调用者线程上调用侦听器;直到所有当前记录都被消耗掉,下一次轮询才会发生。除了RECORD(和MANUAL*)之外,提交的决定是在所有记录都发送给侦听器之后确定的。

MANUAL_IMMEDIATE就是这个意思;偏移量在用户处理时立即提交;使用MANUAL,手动偏移量在所有记录发送给侦听器后提交。

与早期版本相比要复杂一些;可以提取一个或两个额外的批,并且在每次轮询之前执行ack,以便在将第一批中的所有记录发送到侦听器之前提交偏移量。

编辑

在下面回复您的评论...

是的;线程在1.3中更改。在此之前,我们必须继续轮询消费者以避免代理重新平衡分区。与

所以,最坏的情况是容器包含3组记录——侦听器当前正在处理的记录、队列中的记录和我们无法放入队列的记录。任何未完成的偏移提交(手动或其他)都会在每次轮询之前执行。

轮询线程会等待处理器线程吗?

不我们不能这样做,因为这会导致重新平衡——就像我们在消费者线程上调用侦听器一样。

KIP-62实际上在0.10.1.0客户端中得到了修复,但直到1.3版本,我们才更改线程;多亏了KIP-62,这是一个重大的简化,我建议使用该版本。

 类似资料:
  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用

  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 我使用的是spring boot 2.2.4版本,spring-kafka 2.4.2版本 我的场景是以下一个: 所以我写了folloqing代码 生产者微服务 spring kafka配置: 在制作人方面所有的工作都很好。我能创造话题和发送信息。 消费者微服务 动态侦听器类 当我在生产者端发送消息时,我可以看到以下日志: 在消费者方面,我没有看到任何信息。我只看到下面的指纹: 谁能告诉我我错在哪

  • 我们在Kubernetes中基于<code>gcr.io/google_containers/Kubernetes-Kafka:1.0-10.2.1</code>docker映像运行一个Kafka集群,使用<code>gcr.io/google_containers/Kubernetes-zookeeper:1.0-3.4.10</code>,使用三个Kafka和zookeer实例。 我们有几个不

  • 我已经花了几个小时试图弄清楚如何将水平条与X轴上的零线偏移,这样当线的宽度大于1时,它就不会重叠。 感谢所有的帮助。 示例在CodePen上(希望它会出现):https://codepen.io/RomanKl/pen/mzmegG }; var CTX = document . getelementbyid(" chart 1 "); var myChart = 新图表(ctx, { 类型: “