当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka@kafkalistener-重试发送失败的消息并手动提交偏移量

司空实
2023-03-14
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

spring.kafka.listener.ack-mode=manual-immediate

logging.level.org.springframework.kafka=debug
@KafkaListener(topics = "testTopic",groupId="group-1")
    public void consumerOne(ConsumerRecord<String, String> record,Acknowledgment ack) {
        logger.info("OF PayLoad:{}", record);
        logger.info("OF value:"+record.value());
        logger.info("OF Offset:{}",record.offset());
        ack.acknowledge();

    }
    @KafkaListener(topics = "testTopic",groupId="group-2")
    public void consumerTwo(ConsumerRecord<String, String> record,Acknowledgment ack) {
        logger.info("MF PayLoad_2:{}", record);
        logger.info("MF value_2:"+record.value());
        logger.info("MF Offset_2:{}",record.offset());
        //ack.acknowledge();
    }

在这里,consumerTwo没有确认该消息。我想根据可配置的时间重试将所有消息再次发送给消费者。

任何例子或建议的建议将是很大的帮助。

共有1个答案

轩辕源
2023-03-14

看这个答案。

在2.2中,我们添加了向seektocurrenterrorhandler添加恢复器的功能,该恢复器可以执行某些操作。

最后一个注释中的pull请求显示了如果恢复器成功恢复消息,如何提交偏移量。这将在周四的2.2.4版本中播出。

 类似资料:
  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 我有一个关于Kafka自动提交机制的问题。我正在使用启用自动提交的Spring-Kafka。作为一个实验,我在系统空闲(主题中没有新消息,没有正在处理的消息)的情况下,将我的消费者与Kafka的连接断开了30秒。重新连接后,我收到了如下几条消息: 第一,我不明白有什么好犯的?系统空闲(所有以前的消息都已提交)。第二,断开时间为30秒,比max.poll.interval.ms的5分钟(300000

  • 我正在使用spring with Kafka来消费来自Kafka主题的数据。我已经将并发配置为10。因此不同的线程轮询代理以获取消息并处理消息。即使在一段时间后(成功处理),我们也会收到相同的消息返回给使用者的不同线程。我们能够在配置的max.poll.interval.ms=1500000内处理接收到的消息。 请找到以下配置的Kafka消费者属性。我已经通过Kafka配置了自动提交。 你能帮我解

  • 我用的是SpringKafka2.2.9的Spring靴2.1.9 如果消息多次失败(在afterRollbackProcessor中定义),消费者将停止轮询记录。但如果消费者重新启动,它会再次轮询相同的消息和进程。 但是我不希望消息再次被重新轮询,最好的阻止方法是什么? 这是我的配置 我怎样才能做到这一点?

  • 我正在使用投票机制编写一个Kafka消费者,在这个机制中,我每次投票都会收到100条消息。在使用消息之后,我一个接一个地手动提交偏移量。在提交偏移量时,有时我会从100条消息中得到一条错误消息。Rest成功提交。 假设偏移量5提交失败,但不包括偏移量5所有成功提交的偏移量。 那么,在这种情况下,提交失败的偏移量会发生什么变化?由于偏移量按顺序移动,我会在下一次轮询中获得提交失败的偏移量吗? 我的初

  • 我有一个自定义的Kafka消费者,我用它向REST API发送一些请求。根据API的响应,我要么提交偏移量,要么跳过没有提交的消息。 现在,当来自REST API的响应没有以开头时,偏移量不会提交,但消息不会被重新使用。如何强制使用者重新使用未提交的偏移信息?