bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group <group_name>
如有任何帮助,我们将不胜感激。
这对我有用:
bin/kafka-consumer-groups.sh --new-consumer --describe --group ta-services --bootstrap-server localhost:9092
结果是这样的:
GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
ta-services, modsec_logs, 0, 2, 2, 0, instance1-0_/127.0.0.1
ta-services, modsec_logs, 1, 9, 9, 0, instance1-0_/127.0.0.1
ta-services, modsec_logs, 2, 1, 1, 0, instance1-1_/127.0.0.1
ta-services, modsec_logs, 3, 1, 1, 0, instance1-1_/127.0.0.1
请记住,该命令似乎只在您的消费者还活着时才起作用。一旦您停止它-您将不会从命令中得到任何输出。我认为这是这个偏移量检查器在0.9.x vs 0.8.x中的不足之一。我还找不到一种方法来检查0.9中“死亡”消费者的偏移量(通过脚本,而不是手动读取_offsets...主题中的数据)--如果有人知道怎么做,请发帖!
相反,我需要做的是将更改为新的内容,然后它将从最早的偏移量恢复。 会不会有其他的犯罪行为? 更新 根据我的理解,这看起来像是每次auto commit enable为false时,它都将提交偏移量。这是Camel Kafka组件的一个特性,因为即使启用了自动提交,它也将在x条消息之后同步
我正在探索ChronicleQueue来保存我的一个应用程序中生成的事件。我想在经过一些处理后,将保存的事件按其原始发生顺序发布到不同的系统。我有多个应用程序实例,每个实例都可以运行一个单线程appender来将事件添加到ChronicleQueue。尽管跨实例排序是必要的,但我想理解以下两个问题。 2)还需要一些建议,如果有一种方法来保持跨实例的事件顺序。
我已经编写了一个Java Kafka消费者。我想确定如何明确确保一旦Kafka消费者启动,它只读取从那时起由制作人发送的消息,即它不应读取制作人已发送给Kafka的任何消息。有人能解释一下如何确保这一点吗 这是我使用的属性的片段 更新9月14日: 我使用的是以下属性,似乎消费者有时仍然从一开始就阅读,有人能告诉我现在出了什么问题吗? 我使用Kafka版本0.8.2
我的用例是使用kafka消费者api,这样我们就可以从kafka主题中手动读取最后一次成功处理的数据的偏移量,然后手动确认Kafka的成功处理数据。(这是为了减少数据丢失)。然而,在我当前的实现中,程序向前移动并从下一个偏移读取,即使我注释掉了“ack.acknowledge()”。我是新来的Kafka和实现我的消费者下面的方式(我们使用Spring引导) 问题是:即使我注释掉ack.acknow
我有以下代码 消费者订阅的主题会不断收到记录。有时,消费者会因处理步骤而崩溃。然后,当使用者重新启动时,我希望它从主题的最新偏移量开始使用(即,忽略在使用者关闭时发布到主题的记录)。我认为方法可以确保这一点。然而,这种方法似乎毫无效果。消费者从其崩溃的偏移量开始消费。 什么是正确的方式使用? 编辑:使用以下配置创建消费者
我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个