我有 3 个节点(节点 0,节点 1,节点 2)具有复制因子 2 的 Kafka 集群(代理 0、代理 1、代理 2)和 Zookeeper(使用与 Kafka tar 打包的 Zookeeper)在不同的节点(节点 4)上运行。
我在启动 zookeper 然后是剩余节点后启动了代理 0。在代理 0 日志中看到它正在读取__consumer_offsets,并且似乎它们存储在代理 0 上。以下是示例日志:
Kafka版本:kafka_2.10-0.10.2.0
2017-06-30 10:50:47,381] INFO [GroupCoordinator 0]: Loading group metadata for console-consumer-85124 with generation 2 (kafka.coordinator.GroupCoordinator)
[2017-06-30 10:50:47,382] INFO [Group Metadata Manager on Broker 0]: Finished loading offsets from __consumer_offsets-41 in 23 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2017-06-30 10:50:47,382] INFO [Group Metadata Manager on Broker 0]: Loading offsets and group metadata from __consumer_offsets-44 (kafka.coordinator.GroupMetadataManager)
[2017-06-30 10:50:47,387] INFO [Group Metadata Manager on Broker 0]: Finished loading offsets from __consumer_offsets-44 in 5 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2017-06-30 10:50:47,387] INFO [Group Metadata Manager on Broker 0]: Loading offsets and group metadata from __consumer_offsets-47 (kafka.coordinator.GroupMetadataManager)
[2017-06-30 10:50:47,398] INFO [Group Metadata Manager on Broker 0]: Finished loading offsets from __consumer_offsets-47 in 11 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2017-06-30 10:50:47,398] INFO [Group Metadata Manager on Broker 0]: Loading offsets and group metadata from __consumer_offsets-1 (kafka.coordinator.GroupMetadataManager)
此外,我可以在同一代理 0 日志中看到组协调员消息。
[2017-06-30 14:35:22,874] INFO [GroupCoordinator 0]: Preparing to restabilize group console-consumer-34472 with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-06-30 14:35:22,877] INFO [GroupCoordinator 0]: Group console-consumer-34472 with generation 2 is now empty (kafka.coordinator.GroupCoordinator)
[2017-06-30 14:35:25,946] INFO [GroupCoordinator 0]: Preparing to restabilize group console-consumer-6612 with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-06-30 14:35:25,946] INFO [GroupCoordinator 0]: Group console-consumer-6612 with generation 2 is now empty (kafka.coordinator.GroupCoordinator)
[2017-06-30 14:35:38,326] INFO [GroupCoordinator 0]: Preparing to restabilize group console-consumer-30165 with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-06-30 14:35:38,326] INFO [GroupCoordinator 0]: Group console-consumer-30165 with generation 2 is now empty (kafka.coordinator.GroupCoordinator)
[2017-06-30 14:43:15,656] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 3 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2017-06-30 14:53:15,653] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
在使用 kafka-console-consumer.sh 和 kafka-console-producer.sh 测试集群的容错时,我看到在杀死代理 1 或代理 2 时,消费者仍然可以接收来自生产者的新消息。重新平衡正在正确进行。
但是,杀死代理 0 会导致任意数量的使用者都不会消耗新旧消息。下面是代理 0 被杀死之前和之后的主题状态。
以前
Topic:test-topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test-topic Partition: 0 Leader: 2 Replicas: 2,0 Isr: 0,2
Topic: test-topic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: test-topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
后
Topic:test-topic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test-topic Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2
Topic: test-topic Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1
Topic: test-topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
以下是在杀死代理 0 后在使用者日志中看到的 WARN 消息
[2017-06-30 14:19:17,155] WARN Auto-commit of offsets {test-topic-2=OffsetAndMetadata{offset=4, metadata=''}, test-topic-0=OffsetAndMetadata{offset=5, metadata=''}, test-topic-1=OffsetAndMetadata{offset=4, metadata=''}} failed for group console-consumer-34472: Offset commit failed with a retriable exception. You should retry committing offsets. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2017-06-30 14:19:10,542] WARN Auto-commit of offsets {test-topic-2=OffsetAndMetadata{offset=4, metadata=''}, test-topic-0=OffsetAndMetadata{offset=5, metadata=''}, test-topic-1=OffsetAndMetadata{offset=4, metadata=''}} failed for group console-consumer-30165: Offset commit failed with a retriable exception. You should retry committing offsets. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
代理属性。其余默认属性保持不变。
broker.id=0
delete.topic.enable=true
auto.create.topics.enable=false
listeners=PLAINTEXT://XXX:9092
advertised.listeners=PLAINTEXT://XXX:9092
log.dirs=/tmp/kafka-logs-test1
num.partitions=3
zookeeper.connect=XXX:2181
生产者属性。其余默认属性保持不变。
bootstrap.servers=XXX,XXX,XXX
compression.type=snappy
消费者属性。其余默认属性保持不变。
zookeeper.connect=XXX:2181
zookeeper.connection.timeout.ms=6000
group.id=test-consumer-group
据我了解,如果节点持有/代理组协调员和__consumer_offsets死亡,那么尽管选出了新的分区领导人,消费者也无法恢复正常运营。
我在帖子中看到类似的东西。这篇文章建议重新启动死的代理节点。但是,尽管有更多的节点,但在生产环境中重新启动代理 0 之前,消息消耗仍存在延迟。
Q1: 如何缓解上述情况?
Q2:有没有办法将组协调器__consumer_offsets更改为另一个节点?
任何建议/帮助不胜感激。
检查__consumer_offsets主题上的复制因子。如果不是 3,那就是你的问题。
运行以下命令 kafka-topics --zookeeper localhost:2181 --describe --topic __consumer_offsets
,看看输出的第一行是说“ReplicationFactor:1”还是“ReplicationFactor:3”。
在执行试验以首先设置一个节点,然后创建本主题时,这是一个常见问题,复制因子为 1。稍后,当您扩展到 3 个节点时,您忘记更改此现有主题的主题级别设置,因此即使您正在生成和使用的主题是容错的,偏移量主题仍然仅停留在代理 0 上。
主要内容:1 并发消费重试,1.1 失败重试,1.2 超时重试,2 顺序消费重试,2.1 失败重试,2.2 超时重试,3 broker处理回退请求,3.1 asyncConsumerSendMsgBack处理回退请求,3.2 handleRetryAndDLQ处理重试和死信消息基于RocketMQ release-4.9.3,深入的介绍了DefaultMQPushConsumer消费者重试消息和死信消息源码。 消费重试:并发消费和顺序消费对于消费失败的消息均会有消息重试机制。 1 并发消费重试
我有一个springboot消费者应用程序。当我第一次运行它时,它消耗了来自Kafka主题的信息。但当我再次运行它时,它停止了消耗。在日志中,我看到以下消息。 我知道消费者无法获得偏移量。在这种情况下,消费者将引用自动偏移重置属性。如您所见,我已将其设置为,希望消费者从头开始阅读。但它没有。 应用程序. yml 在我的Java课上 我尝试了一些东西。 我将值设置为。不出所料,它抛出了一个异常,抱怨
消费者使用Spring的JavaConfig类如下: Kafka主题侦听器使用@KafkaListener注释,如下所示: 我的pom包括依赖项: 现在当我打包到war并部署到tomcat时,它不会显示任何错误,即使在调试模式下也不会显示任何错误,只是部署war什么都没有。 请帮助我了解是否缺少触发kafkalistner的某些配置。 谢谢Gary我添加了上下文。xml和web。xml,但我得到了
我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认
我可以在命令行上针对Kafka位置安装发送和接收消息。我也可以通过Java代码发送消息。这些消息显示在Kafka命令提示符中。我还有一个Kafka消费者的Java代码。代码昨天收到了消息。但是今天早上没有收到任何消息。代码没有更改。我想知道属性配置是否不太正确。这是我的配置: 制片人: 生产记录设置为 消费者: 对于Java代码: 少了什么?
本周早些时候,我在这里获得了一些关于Stackoverflow的帮助,这导致了一个生产者/消费者模式的发展,用于加载处理并将大型数据集导入RavenDb。CPU受限任务的并行化与IO受限任务的并行化 我现在希望限制生产商提前准备的工作单元的数量,以管理内存消耗。我已经使用一个基本信号量实现了节流,但在某个点上实现死锁时遇到了问题。 我无法找出导致死锁的原因。以下是代码摘录: 这是对LoadData
我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用