我用的是Kafka:2.11-1.0.1。应用程序包含主题“X”的并发性为5的使用者,分区为5。
重新启动应用程序并在分区分配之前在主题“X”上发布消息时,主题“X”的5个使用者会找到组协调器,并将加入组请求发送给组协调器。预计会收到小组协调员的回复,但未收到回复。
我检查了Kafka服务器日志,但在html" target="_blank">调试日志级别找不到相关日志。
当我运行描述消费者组的命令时,作出如下观察:
新消息发布在主题“X”上,但消费者没有收到。
心跳和会话。时间out设置为默认值。
如果消息在主题“X”及其消费者的分区分配之前发布,则会出现此问题。
我的疑问是:为什么再平衡没有完成,以至于新消费者开始消费新产生的信息?
应用程序在消费者群体中有以下消费者
应用程序重启时发生了什么,以及其中一个主题是否已发布消息
根本原因:
组协调器在应用程序重新启动后没有等待所有使用者初始化,因此发生了第一次不必要的重新平衡,因此consumerA1从分区获取消息并开始处理它。
解决方案:为了避免这种不必要的初始再平衡,kafka提供了一种配置,其中组协调器等待消费者加入新的消费者组。文档
组最初的重新平衡。延迟太太
检查了我的kafkaserver.properties,它被设置为0。默认情况下的尝试,即3秒。避免了初始重新平衡,应用程序重启时GC等待3秒,在此期间所有其他消费者都初始化了。所有消费者都发送了加入组请求,因为所有GC都收到了来自所有消费者的请求。GC没有任何延迟地响应,重新平衡处理并成功完成。
我有一个带注释的kafka消费者方法@kafkalistener。我已经在容器上设置了重试模板,并且重试配置是这样的,如果在处理消息时发生了一些异常,它将始终重试。我已将最大轮询记录设置为1。如果这种情况实时发生,并且消费者一直在重试消息,经纪人会认为该消费者已经死亡并触发重新平衡吗?或者,在重试时,消费者是否会对未能处理的相同消息进行投票?如果这是真的,因为民意调查正在进行,我的假设是不会有任何
我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群
Kafka消费群体的滚动部署是否会导致该群体冻结? 让我们来考虑一下这个场景, 我们开始滚动部署 因此,如果您有一个足够大的集群,并且需要一些时间才能在一台机器上完成部署(通常情况下),这会导致消耗完全冻结吗? 如果是,在生产中进行消费者群体更新的策略是什么
当一个组中只有一个消费者,并且认为消费者无法在session.time.out内进行轮询时,将触发重新平衡,但是在这种情况下,组中只有一个消费者,现在假设session.time.out是30秒和消费者民意调查后50秒组协调员将识别消费者后50秒,并允许它提交偏移或协调员将断开消费者和没有偏移得到提交,并将重新平衡消费者与新的消费者标识?如果上次提交的偏移量是345678,在下一次轮询中,它处理了
我在站点1(3个代理)有两个集群设置cluster-1,在站点2(3个代理)有两个集群设置cluster-2。使用spring kafka(1.3.6)消费者(一台机器)并通过@KafkaListener注释收听消息。我们如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory,并同时监听来自这两个集群的数据。 我的侦听器应该同时使用来自这两个集群的消息。
如果将与有状态重试一起使用,以便每次重试都从代理轮询消息,则存在消费者组重新平衡的长重试周期可能导致分区被重新分配给另一个消费者的风险。因此,有状态重试周期/尝试将被重置,因为新消费者不知道重试的状态。 举个例子,如果重试最长期限是24小时,但消费者组重新平衡平均每12小时发生一次,则重试永远无法完成,一旦超过保留期限,消息(及其背后的消息)最终将从主题中过期。(假设在此时间内未解决可重试异常的原