我试图阅读上一篇文章中的Kafka主题,但我无法正确阅读:
在属性文件中,我设置了以下内容:
组id=我的团队
客户。id=my_client
启用。汽车提交=真
自动。抵消重置=最早的
隔离。级别=读取已提交
然后代码:
consumer = new KafkaConsumer<>(props); // read properties
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1L));
consumerRecords.forEach(consumerRecord -> System.out.println(consumerRecord.offset());
我可以看到偏移量是如何从0打印出来的,即使我在主题中有一些项目。
在日志文件中,我可以看到以下内容(缩写):
[Consumer clientId=my_client,groupId=my_group]在第1代完成了组的分配:{my_client-f1678be7-ce6b-48e8-acf2-741ab28f7266=分配(分区=[mytopic-0])]
[消费者clientId=my_client, group Id=my_group]成功加入第1代的组[消费者clientId=my_client, group Id=my_group]通知分配方新的Assignment(分区=[mytope-0])
[Consumer clientId=my_client,groupId=my_group]添加新分配的分区:mytopic-0
[Consumer clientId=my_client,groupId=my_group]未找到分区mytopic-0的提交偏移量
[Consumer clientId=我的客户,
知道我做错了什么吗?我用seekToBegining()poll()commitSync()seekToEnd()尝试了一些神奇的方法,但我认为默认情况下应该可以。
汽车抵消重置=我会解决你的问题
https://docs.confluent.io/platform/current/clients/consumer.html#offset-管理层
随着JavaKafka消费者寻求()它需要我们传入TopicPartion和Offest。但是,我认为这个查找方法会为我的消费者收集订阅的Topic分区。 这是我试图处理的例子。 消费者A订阅了主题“测试主题”分区1和分区2。调用时,我从每个分区读取消息。我处理一些消息,但我的应用程序出现异常。我不调用。现在,我想倒带到上一次中检索到的偏移量,并尝试重新处理它们。那我该怎么做呢?我是否需要检查每个
我们有一个问题,似乎Kafka消费者没有收到发布到某个主题的消息。(我说这是因为我还没有弄清楚这件事的真相,我可能错了。) 我使用Spring for Apache Kafka,而我的消费者实际上是一个用注释的方法。 这个问题是断断续续的,我很难重新创建它。 有没有一种方法让我看看Kafka经纪人的日志,或任何其他工具,以帮助我找出抵消为我的消费者?我想要具体的证据来证明我的消费者是否收到了信息。
问题内容: 我正在使用Java 编写使用者。我想保持消息的实时性,因此,如果有太多消息在等待使用,例如1000条或更多,我应该放弃未使用的消息,并从最后一个偏移量开始使用。 对于此问题,我尝试比较主题的最后提交的偏移量和主题的结束偏移量(仅1个分区),如果这两个偏移量之间的差大于某个值,则将主题的最后提交的偏移量设置为下一个偏移量,这样我就可以放弃那些多余的消息。 现在我的问题是如何获得主题的最终
现在我的问题是如何得到一个主题的结束偏移量,有人说我可以用老消费者,但是太复杂了,新消费者有这个功能吗?
我有一个话题是两个消费群体消费的。题目中有10条留言。 现在我开始应用程序2(消费者组2),它正在消费相同的主题。它不在处理消息。当我描述kafka-consumer-groups(带有--group consumerGroup2)时,它令人惊讶地显示CURRENT-OFFSET=10和LOG-END-OFFSET=10。 理想情况下,这种情况不应该发生,并且kafka应该能够识别对于消费者组2没