当前位置: 首页 > 知识库问答 >
问题:

重新使用未为其提交偏移量的消息

饶谦
2023-03-14

我有一个自定义的Kafka消费者,我用它向REST API发送一些请求。根据API的响应,我要么提交偏移量,要么跳过没有提交的消息。

while (true) {

    ConsumerRecords<String, Object> records = consumer.poll(200);
    for (ConsumerRecord<String, Object> record : records) {

        // Sending a POST request and retrieving the answer
        // ...

        if (responseCode.startsWith("2")) {
            try { 
               consumer.commitSync();
            } catch(CommitFailedException ex) {
              ex.printStackTrace(); 
            }
        } else {
              // Do Nothing
        }
    }
}

现在,当来自REST API的响应没有以2开头时,偏移量不会提交,但消息不会被重新使用。如何强制使用者重新使用未提交的偏移信息?

共有1个答案

翟黎明
2023-03-14

如果计划使用seek(),请确保您的数据是幂等的。由于您是有选择地提交偏移量,所以被遗漏的记录可能会在提交(成功处理)记录之前。如果您执行seek()--它将GroupID的指针移动到未提交的偏移量并开始重播,您也将得到那些成功处理的消息。它也有可能成为一个无限循环。

或者,您可以将不成功记录的元数据保存在内存或db中,并从“poll(retention.ms)”开始重播主题,以便重播所有记录,但添加一个筛选器,只处理那些通过API进行的元数据与您先前保存的数据匹配的记录。每小时或几小时进行一次批处理。

 类似资料:
  • 我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。 总共有5000条信息, 重新启动前-消耗2900条消息和提交的偏移量 kafka版本(strimzi)>2.0.0 kafka-python==2.0.1

  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。

  • 我有Kafka流应用程序。我的应用程序正在成功处理事件。 如何使用重新处理/跳过事件所需的偏移量更改Kafka committed consumer offset。我试过如何更改topic?的起始偏移量?。但我得到了“节点不存在”错误。请帮帮我。

  • 我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用

  • null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1