我们有一个问题,似乎Kafka消费者没有收到发布到某个主题的消息。(我说这是因为我还没有弄清楚这件事的真相,我可能错了。)
我使用Spring for Apache Kafka,而我的消费者实际上是一个用@kafkalistener
注释的方法。
这个问题是断断续续的,我很难重新创建它。
有没有一种方法让我看看Kafka经纪人的日志,或任何其他工具,以帮助我找出抵消为我的消费者?我想要具体的证据来证明我的消费者是否收到了信息。
看看kafka-consumer-groups工具,它可以用来检查消费者的偏移量和滞后(在运行此命令时,consumer必须处于活动状态)。
./kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --new-consumer --describe --group console-consumer-55936
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
console-consumer-55936 test 0 6 6 0 consumer-1_/192.168.0.83
console-consumer-55936 test 1 1 1 0 consumer-1_/192.168.0.83
console-consumer-55936 test 2 1 1 0 consumer-1_/192.168.0.83
console-consumer-55936 test 3 1 1 0 consumer-1_/192.168.0.83
console-consumer-55936 test 4 2 2 0 consumer-1_/192.168.0.83
console-consumer-55936 test 5 1 1 0 consumer-1_/192.168.0.83
console-consumer-55936 test 6 1 1 0 consumer-1_/192.168.0.83
console-consumer-55936 test 7 2 2 0 consumer-1_/192.168.0.83
console-consumer-55936 test 8 1 1 0 consumer-1_/192.168.0.83
这应该允许您跟踪任何东西是否实际被消费。
我有一个话题是两个消费群体消费的。题目中有10条留言。 现在我开始应用程序2(消费者组2),它正在消费相同的主题。它不在处理消息。当我描述kafka-consumer-groups(带有--group consumerGroup2)时,它令人惊讶地显示CURRENT-OFFSET=10和LOG-END-OFFSET=10。 理想情况下,这种情况不应该发生,并且kafka应该能够识别对于消费者组2没
我的用例是使用kafka消费者api,这样我们就可以从kafka主题中手动读取最后一次成功处理的数据的偏移量,然后手动确认Kafka的成功处理数据。(这是为了减少数据丢失)。然而,在我当前的实现中,程序向前移动并从下一个偏移读取,即使我注释掉了“ack.acknowledge()”。我是新来的Kafka和实现我的消费者下面的方式(我们使用Spring引导) 问题是:即使我注释掉ack.acknow
我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个
试图理解消费者补偿和消费者群体补偿之间的关系。 下面的堆栈溢出链接提供了对消费群体补偿管理的极好理解<什么决定Kafka消费补偿?现在问题来了, 情节: 我们在一个消费者组组1中有消费者(c1)。 偏移值是否将存储在消费者(c1)和组(group1)两个级别?或者如果消费者属于任何消费者组,偏移量将存储在仅消费者组级别? 如果偏移值将存储在两个级别中,它是否是消费者级别偏移值将覆盖消费者组级别偏移
谢了。
我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。