随着JavaKafka消费者寻求()它需要我们传入TopicPartion和Offest。但是,我认为这个查找方法会为我的消费者收集订阅的Topic分区。
这是我试图处理的例子。
消费者A订阅了主题“测试主题”分区1和分区2。调用poll()
时,我从每个分区读取消息。我处理一些消息,但我的应用程序出现异常。我不调用commitSync()
。现在,我想倒带到上一次poll()
中检索到的偏移量,并尝试重新处理它们。那我该怎么做呢?我是否需要检查每个主题分区最后提交的偏移量,并为每个分区调用seek()
?多次调用seek()
是否只接受调用的最后一个seek()
?正如我所说的,我希望确保我的消费者返回所有分区,这样我就不会丢失任何分配分区上的任何数据。
我处理一些消息,但我的应用程序出现异常。我不调用commitSync()
如果不调用commitSync()
,则不会提交消息。如果假设异常终止了您的程序,则在重新启动后,使用者通常会从最后提交的偏移量读取该异常。
您可能还需要检查自动。抵消重置并将其设置为最早
。
检查您的邮件是否自动提交,因为您正在执行commitSync()
您不需要自动提交,即启用。汽车提交
可以设置为false
(在合流Kafka中默认为true
)
如果您的程序没有因异常而终止,则您始终拥有已使用的记录。您可以重试处理每个记录,然后提交。
ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
for(ConsumerRecord record: records)
{
tryProcess(record, 3);
}
consumer.commitSync();
void tryProcess(ConsumerRecord record, int maxRetries) {
if(maxRetries < 1) {
log.warn("max retries exhausted for record");
return;
}
try {
process(record);
} catch(Exception ex){
tryProcess(record, --maxRetries);
}
}
您也可以尝试用重试来处理批次记录,而不是每个记录,如tryProcess(记录,3)
,其中记录对应于消费者记录
,批次重试3次。我认为没有寻求的必要。
不过,我仍然对seek()api的用法感到好奇
例如,当我们不使用订阅,即consumer.subscribe(),而是使用consumer.assign(),我们通常只是想偷看(查看)主题中的消息,例如控制台消费者。有时,我们可能需要在一定的偏移量后看到一些消息,或者最后n条消息等,而不需要实际对它们做任何事情,只是显示。
对于Java Kafka消费者函数,它要求传入和。但是,我认为这个seek方法将为我的使用者获取订阅的TopicPartitions的集合。 这是我试图处理的例子。 使用者A订阅主题“test-topic”分区1和2。当调用时,我从每个分区读取消息。我处理了一些消息,但我的应用程序得到了一个异常。我不调用。现在,我要倒回到在上次中检索到的偏移量,并尝试重新处理它们。那么我该怎么做呢?我是否需要检查
我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。
我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?
我们有一个问题,似乎Kafka消费者没有收到发布到某个主题的消息。(我说这是因为我还没有弄清楚这件事的真相,我可能错了。) 我使用Spring for Apache Kafka,而我的消费者实际上是一个用注释的方法。 这个问题是断断续续的,我很难重新创建它。 有没有一种方法让我看看Kafka经纪人的日志,或任何其他工具,以帮助我找出抵消为我的消费者?我想要具体的证据来证明我的消费者是否收到了信息。
我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用
null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1