场景:
session.timeout.ms:10秒
最大poll.interval.ms:5分钟
处理“Poll()”中使用的消息需要6分钟
C(6秒):发送另一个心跳
D(5分钟):发送了另一个心跳(5*60%3=0),但达到了“max.poll.interval.ms”(5分钟
在点“D”,消费者:
继续每3秒发送一次心跳?
如果是“1”点,则
a.在完成6分钟的处理后,考虑到由于在点“d”处重新平衡而改变了它的分区,该消费者将如何提交抵消?
从Kafka版本0.10.1.0开始,使用者心跳在后台线程中发送,这样客户机处理时间可以比会话超时更长,而不会导致使用者被认为死亡。
但是,max.poll.interval.ms
仍然设置使用者调用poll
方法的最长允许时间。
在您的情况下,处理时间为6分钟,这将意味着在点“D”,您的消费者将被视为死亡。
最后,是的,如果您已经知道您的处理时间将超过5分钟的默认时间,则需要增加max.poll.interval.ms
时间。
另一种选择是通过减少配置max.poll.records
来限制poll
期间提取的记录,该配置默认为500,并被描述为:“单个调用poll()中返回的最大记录数”。
我有一个Kafka主题,并为其附加了1个消费者(主题只有1个分区)。现在对于超时,我使用默认值(心跳:3秒,会话超时:10秒,轮询超时:5分钟)。 根据留档,轮询超时定义消费者必须在其他代理将该消费者从消费者组中删除之前处理消息。现在假设,消费者只需1分钟即可完成处理消息。 现在我有两个问题
我是Apache Camel的新手,我试图在一个简单的项目中理解和使用轮询消费者EIP,但我感到有点迷茫…谁能帮我解释一下,甚至用一个小的工作例子。 如有任何帮助,我们将不胜感激
我试图做一个简单的poc与Spring启动与版本(2.3.7发布)的SpringKafka,以实现消费者批处理的工作原理,以及如何再平衡工作,如果消费者需要更多的流转时长,因为我是全新的这个消息系统。 现在我看到kafka重新平衡单个消费者(不允许并发)的问题。 这些是我设置的max.poll.interval属性。ms=50000和factory.getContanerProperties。se
问题内容: 我想向同一队列发送一批20k JMS消息。我使用10个线程将任务拆分,因此每个线程将处理2k条消息。我不需要交易。 我想知道是否建议建立一个连接,一个会话和10个生产者? 如果所有线程共享一个生产者,该怎么办?我的消息会损坏还是会同步发送(不会提高性能)? 如果我总是连接到同一队列,那么决定是创建新连接还是会话的一般指导方针是什么? 谢谢你,很抱歉一次问了很多。 问题答案: 如果某些消
我有一个Kafka消费者,其中消息通过HTTP POST调用传递给另一个应用程序。我还使用手动提交偏移量 确认。确认(); 有一些HTTP返回错误代码,我们忽略错误并提交偏移量,还有一些错误代码我们不提交偏移量。问题是,kafka使用者仅在我重新启动使用者时才轮询未提交的消息。如果分区中有未提交的消息,是否还有轮询消息的地方?
我的Kafka消费者的代码是这样的 我已经意识到,这种消费者设置无法读取所有信息。我无法再现这一点,因为这是一个间歇性的问题。 当我使用 将最后 100 条消息与此消费者进行比较时,我发现我的消费者间歇性地随机错过了几条消息。我的消费者有什么问题? 在python中使用消息的方法太多了。应该有一种最好只有一种明显的方法来做到这一点。