当前位置: 首页 > 知识库问答 >
问题:

如何正确地寻找Kafka中最后的偏移量?

艾文斌
2023-03-14

我试图阅读上一篇文章中的Kafka主题,但我无法正确阅读:

在属性文件中,我设置了以下内容:

组id=我的团队
客户。id=my_client
启用。汽车提交=真
自动。抵消重置=最早的
隔离。级别=读取已提交

然后代码:

consumer = new KafkaConsumer<>(props); // read properties
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1L));
consumerRecords.forEach(consumerRecord -> System.out.println(consumerRecord.offset());

我可以看到偏移量是如何从0打印出来的,即使我在主题中有一些项目。

在日志文件中,我可以看到以下内容(缩写):

[Consumer clientId=my_client,groupId=my_group]在第1代完成了组的分配:{my_client-f1678be7-ce6b-48e8-acf2-741ab28f7266=分配(分区=[mytopic-0])]

[消费者clientId=my_client, group Id=my_group]成功加入第1代的组[消费者clientId=my_client, group Id=my_group]通知分配方新的Assignment(分区=[mytope-0])

[Consumer clientId=my_client,groupId=my_group]添加新分配的分区:mytopic-0

[Consumer clientId=my_client,groupId=my_group]未找到分区mytopic-0的提交偏移量

[Consumer clientId=我的客户,

知道我做错了什么吗?我用seekToBegining()poll()commitSync()seekToEnd()尝试了一些神奇的方法,但我认为默认情况下应该可以。

共有1个答案

潘驰
2023-03-14

汽车抵消重置=我会解决你的问题

https://docs.confluent.io/platform/current/clients/consumer.html#offset-管理层

 类似资料:
  • 随着JavaKafka消费者寻求()它需要我们传入TopicPartion和Offest。但是,我认为这个查找方法会为我的消费者收集订阅的Topic分区。 这是我试图处理的例子。 消费者A订阅了主题“测试主题”分区1和分区2。调用时,我从每个分区读取消息。我处理一些消息,但我的应用程序出现异常。我不调用。现在,我想倒带到上一次中检索到的偏移量,并尝试重新处理它们。那我该怎么做呢?我是否需要检查每个

  • 我们有一个问题,似乎Kafka消费者没有收到发布到某个主题的消息。(我说这是因为我还没有弄清楚这件事的真相,我可能错了。) 我使用Spring for Apache Kafka,而我的消费者实际上是一个用注释的方法。 这个问题是断断续续的,我很难重新创建它。 有没有一种方法让我看看Kafka经纪人的日志,或任何其他工具,以帮助我找出抵消为我的消费者?我想要具体的证据来证明我的消费者是否收到了信息。

  • 问题内容: 我正在使用Java 编写使用者。我想保持消息的实时性,因此,如果有太多消息在等待使用,例如1000条或更多,我应该放弃未使用的消息,并从最后一个偏移量开始使用。 对于此问题,我尝试比较主题的最后提交的偏移量和主题的结束偏移量(仅1个分区),如果这两个偏移量之间的差大于某个值,则将主题的最后提交的偏移量设置为下一个偏移量,这样我就可以放弃那些多余的消息。 现在我的问题是如何获得主题的最终

  • 现在我的问题是如何得到一个主题的结束偏移量,有人说我可以用老消费者,但是太复杂了,新消费者有这个功能吗?

  • 我有一个话题是两个消费群体消费的。题目中有10条留言。 现在我开始应用程序2(消费者组2),它正在消费相同的主题。它不在处理消息。当我描述kafka-consumer-groups(带有--group consumerGroup2)时,它令人惊讶地显示CURRENT-OFFSET=10和LOG-END-OFFSET=10。 理想情况下,这种情况不应该发生,并且kafka应该能够识别对于消费者组2没