当前位置: 首页 > 知识库问答 >
问题:

如何正确使用Kafka消费者“seek”来返回所有分区的未提交偏移量?

谷永贞
2023-03-14

对于Java Kafka消费者seek()函数,它要求传入topicpartionoffest。但是,我认为这个seek方法将为我的使用者获取订阅的TopicPartitions的集合。

这是我试图处理的例子。

使用者A订阅主题“test-topic”分区1和2。当调用poll()时,我从每个分区读取消息。我处理了一些消息,但我的应用程序得到了一个异常。我不调用commitsync()。现在,我要倒回到在上次poll()中检索到的偏移量,并尝试重新处理它们。那么我该怎么做呢?我是否需要检查每个主题分区最后提交的偏移量,并为每个分区调用seek()?是否多次调用seek()只接受最后一次调用的seek()?正如我所说的,我希望确保我的使用者返回所有分区,这样我就不会丢失任何分配的分区上的任何数据。

共有1个答案

盖弘毅
2023-03-14

我处理了一些消息,但我的应用程序得到了一个异常。我不调用commitSync()

如果不调用commitsync(),则不会提交消息。如果假设异常杀死了您的程序,那么在重新启动之后,使用者通常从最后提交的偏移量中读取它。

您可能还想检查auto.offset.reset并将其设置为aresty

检查您的消息是否自动提交,因为您正在执行commitsync()您不需要自动提交,即enable.auto.commit可以设置为false(在Confluent Kafka中默认为true)

如果您的程序没有被异常终止,您总是有消耗的记录。您可以重试处理每个记录,然后提交。

ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
for(ConsumerRecord record: records)
{
   tryProcess(record, 3);
}
consumer.commitSync();

void tryProcess(ConsumerRecord record, int maxRetries) {
    if(maxRetries < 1) {
        log.warn("max retries exhausted for record");
        return;
    }
    try {
         process(record);
    } catch(Exception ex){ 
        tryProcess(record, --maxRetries);
    }
}

您还可以尝试重试处理记录批,而不是每个记录,如tryprocess(records,3),其中记录对应于consumerrecords,该批重试3次。我不认为有寻求的必要。

不过,我仍然对seek()api的用法很好奇

例如,seek(),当我们不使用订阅,即consumer.subscribe(),而使用consumer.assign(),当我们只想查看主题中的消息,例如控制台consumer.subscrime.subscrime.subscrime.subscrime.subscrime.subscrime.subscrime.subscrime有时,我们可能需要在某个偏移量后看到一些消息,或者最后n个消息等,而不需要对它们做任何事情,而只是显示。

 类似资料:
  • 随着JavaKafka消费者寻求()它需要我们传入TopicPartion和Offest。但是,我认为这个查找方法会为我的消费者收集订阅的Topic分区。 这是我试图处理的例子。 消费者A订阅了主题“测试主题”分区1和分区2。调用时,我从每个分区读取消息。我处理一些消息,但我的应用程序出现异常。我不调用。现在,我想倒带到上一次中检索到的偏移量,并尝试重新处理它们。那我该怎么做呢?我是否需要检查每个

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。

  • 我们有一个问题,似乎Kafka消费者没有收到发布到某个主题的消息。(我说这是因为我还没有弄清楚这件事的真相,我可能错了。) 我使用Spring for Apache Kafka,而我的消费者实际上是一个用注释的方法。 这个问题是断断续续的,我很难重新创建它。 有没有一种方法让我看看Kafka经纪人的日志,或任何其他工具,以帮助我找出抵消为我的消费者?我想要具体的证据来证明我的消费者是否收到了信息。

  • 我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?

  • 我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用

  • null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1