当前位置: 首页 > 知识库问答 >
问题:

对于spring-云-流-绑定-Kafka,是否有任何方法可以在最大尝试次数内使用Acknowdge.nack重试?

姬衡
2023-03-14

我正在尝试使用消费批次在Kafka,我发现文件说重试是不支持的,如下所示。

使用批处理模式时不支持在绑定器内重试,因此maxAttempts将被重写为1。您可以配置SeekToCurrentBatchErrorHandler(使用ListenerContainerCustomizer)来实现类似于在绑定器中重试的功能。您还可以使用手动AckMode并调用ACKOWLEDGMENT.NACK(索引,睡眠)来提交部分批处理的偏移量,并重新传递剩余的记录。有关这些技术的更多信息,请参阅Apache Kafka spring文档

如果我使用Acknowdge.nack(索引,睡眠),它会在错误发生时无限重试。是否有任何方法可以在maxAttempts时间内使用Acknowdge.nack重试?

代码就像

@StreamListener(Sink.INPUT)
    public void propagate(@Payload List<PayLoad> payloads, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
        
        try {
            propagate(payloads);
            acknowledgment.acknowledge();
        } catch (Exception e) {
            acknowledgment.nack(0, 50);
            
        }

    }

共有1个答案

慕容兴贤
2023-03-14

没有;你必须自己记录重试次数。

但是,从版本2.5开始,您现在可以使用RecoveringBatchErrorHandler,其中您抛出一个特定的异常来告诉处理程序哪个记录失败了,它会提交该记录之前的记录的偏移量,并对失败的记录应用重试逻辑。

参见https://docs.spring.io/spring-kafka/reference/html/#recovery-batch-eh

 类似资料:
  • 问题内容: 我有一个python脚本正在查询共享Linux主机上的MySQL服务器。出于某种原因,对MySQL的查询通常会返回“服务器已消失”错误: 如果此后立即再次尝试查询,通常会成功。因此,我想知道python中是否有一种明智的方法来尝试执行查询,如果失败,则可以重试固定次数的尝试。可能我想让它尝试5次再完全放弃。 这是我的代码类型: 显然,我可以通过在except子句中进行另一次尝试来做到这

  • 问题内容: 在Java中,我们不能覆盖final方法,但是有可能重载吗? 问题答案: 是的,重载最终方法是完全合法的。 例如:

  • 我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例: 但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重

  • 一段时间以来,我一直试图让Spring Cloud Stream与Kafka Streams一起使用,我的项目使用嵌入式kafka进行Kafka DSL测试,我使用这个存储库作为我的测试实现的基础(它本身就是这个问题的测试用例)。 我在这里制作了一个存储库来演示这一点。 基本上,当使用“Processor.class”的“DemoApplicationTest.ExampleAppWorking.

  • 我正在使用带有kafka绑定的spring cloud streams。消费者使用spring进行配置。云流动Kafka。绑定。功能配置和DLQ已启用。 当我使用来自主题的消息并发生异常时,我看到它重试3次,并将消息发送到DLQ。我已将maxAttemts配置为5,但我无法覆盖默认值3。 我使用的是spring kafka(2.7.8)和spring cloud(2020.0.4)。如何覆盖重试尝

  • H全部, 如果有人有任何经验的kafka-spark流对处理各种数据,请给我一个简短的细节,如果这是一个可行的解决方案,并比有两个不同的管道更好。 提前道谢!