如果我不想使用自动通信模式,sping提供了另一种方法。
spring kafkfa/#提交补偿为我们提供了以下关于提交补偿的信息:
RECORD
-当侦听器在处理记录后返回时提交偏移量BATCH
-处理poll()返回的所有记录后提交偏移量TIME
-当poll()返回的所有记录都已被处理时,只要超过自上次提交以来的确认时间,就提交偏移量COUNT
-当poll()返回的所有记录都已被处理时,只要自上次提交以来已收到ackCount记录,就提交偏移量COUNT\u TIME
-类似于TIME和COUNT,但如果任一条件为真,则执行提交手册
-消息侦听器负责确认();然后,应用与批处理相同的语义MANUAL_IMMEDIATE
-确认后立即提交偏移量。侦听器调用acknowledge()方法。
我有几个问题:
TIME
据我所知,Spring框架中的某个地方存在循环
while(true){
data = consumer.poll();
data.foreach(record->listener.listen(record))
}
民意测验多久进行一次?
时间是提交偏移量的唯一标准吗?假设投票返回了100条记录,当ackTime过期时-只处理了60条记录?
我没有抓住MANUAL_IMMEDIATE
和MANUAL
之间的区别
请为我澄清这些问题。
附笔。
据我所知,Garry Russel foreach的回答如下:
while(true){
data = consumer.poll();
data.foreach(record->new Thread(()->listener.listen(record)).start());
}
这取决于版本;最近的1.3版本有一个更简单的线程模型,由KIP-62提供了便利。
使用该版本,在调用者线程上调用侦听器;直到所有当前记录都被消耗掉,下一次轮询才会发生。除了RECORD
(和MANUAL*
)之外,提交的决定是在所有记录都发送给侦听器之后确定的。
MANUAL_IMMEDIATE
就是这个意思;偏移量在用户处理时立即提交;使用MANUAL
,手动偏移量在所有记录发送给侦听器后提交。
与早期版本相比要复杂一些;可以提取一个或两个额外的批,并且在每次轮询之前执行ack,以便在将第一批中的所有记录发送到侦听器之前提交偏移量。
编辑
在下面回复您的评论...
是的;线程在1.3中更改。在此之前,我们必须继续轮询消费者以避免代理重新平衡分区。与
所以,最坏的情况是容器包含3组记录——侦听器当前正在处理的记录、队列中的记录和我们无法放入队列的记录。任何未完成的偏移提交(手动或其他)都会在每次轮询之前执行。
轮询线程会等待处理器线程吗?
不我们不能这样做,因为这会导致重新平衡——就像我们在消费者线程上调用侦听器一样。
KIP-62实际上在0.10.1.0客户端中得到了修复,但直到1.3版本,我们才更改线程;多亏了KIP-62,这是一个重大的简化,我建议使用该版本。
我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。
我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用
在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费
我使用的是spring boot 2.2.4版本,spring-kafka 2.4.2版本 我的场景是以下一个: 所以我写了folloqing代码 生产者微服务 spring kafka配置: 在制作人方面所有的工作都很好。我能创造话题和发送信息。 消费者微服务 动态侦听器类 当我在生产者端发送消息时,我可以看到以下日志: 在消费者方面,我没有看到任何信息。我只看到下面的指纹: 谁能告诉我我错在哪
我们在Kubernetes中基于<code>gcr.io/google_containers/Kubernetes-Kafka:1.0-10.2.1</code>docker映像运行一个Kafka集群,使用<code>gcr.io/google_containers/Kubernetes-zookeeper:1.0-3.4.10</code>,使用三个Kafka和zookeer实例。 我们有几个不
我已经花了几个小时试图弄清楚如何将水平条与X轴上的零线偏移,这样当线的宽度大于1时,它就不会重叠。 感谢所有的帮助。 示例在CodePen上(希望它会出现):https://codepen.io/RomanKl/pen/mzmegG }; var CTX = document . getelementbyid(" chart 1 "); var myChart = 新图表(ctx, { 类型: “