当前位置: 首页 > 知识库问答 >
问题:

如果第一个代理关闭,则Kafka消费者无法消费

步博艺
2023-03-14
./kafka-topics.sh --create    --zookeeper localhost:2181  --replication-factor 3     --partitions 13    --topic demo
./kafka-console-producer.sh --topic  demo  --broker-list localhost:9094,localhost:9093,localhost:9092

./kafka-console-consumer.sh --group test --bootstrap-server localhost:9094,localhost:9093,localhost:9092  --topic demo

当所有的经纪人都起来的时候,一切都是好的。但是,如果我先杀死(按开始顺序),代理消息会被发送到代理,但使用者不能接收到任何消息,消息不会丢失。启动该代理后,使用者立即接收消息。

关闭broker实例后的使用者日志:

再次启动丢失的代理后的使用者日志:

谢谢

共有1个答案

裴嘉良
2023-03-14

尝试检查server-*.properties文件中的“offsets.topic.replication.factor”

例如:

############################# Internal Topic Settings       
# The replication factor for the group metadata internal topics    
# For anything other than development testing, a value greater than 1 is  recommended for to ensure availability such as 3.
offsets.topic.replication.factor=3

http://kafka.apache.org/documentation/#brokerconfigs

 类似资料:
  • 我们正在tomcat服务器中部署kafka消费者。消费者是使用Spring-Kafka2.1.7构建的。每个tc容器可以有多个属于同一个使用者组的使用者(使用ConcurrentKafkaListenerContainerFactory)。作为一般模式,在我的使用案例中,使用者以事务性的方式从一个主题读取并生产到另一个主题。tc服务器由通常的启动和关闭shell脚本启动和停止。如果要优雅地关闭co

  • 简而言之,我如何在生产者/消费者苏德上找到相关经纪人的健康状况。

  • 我使用的是高级消费者,如所述:https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例 我注意到,我的消费者不会永远运行,而是在一段时间后结束。在zookeeper一侧,我看到以下内容: INFO已处理的会话终止的setsionid: 0x144a4854325004d(org.apache.zookeeper.server.准备请

  • 假设我有一个服务,它通过kafka-rest-proxy来消费消息,并且总是在同一个消费者组上。我们还可以说,它正在消耗一个有一个分区的主题。当服务启动时,它在kafka-rest-proxy中创建一个新的使用者,并使用生成的使用者url,直到服务关闭。当服务重新启动时,它将在kafka-rest-proxy中创建一个新的消费者,并使用新的url(和新的消费者)进行消费。 > 因为kafka每个分

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?