当前位置: 首页 > 知识库问答 >
问题:

使用Spring Cloud stream kafka绑定消费者和启用DLQ覆盖重试尝试

长孙嘉容
2023-03-14

我正在使用带有kafka绑定的spring cloud streams。消费者使用spring进行配置。云流动Kafka。绑定。功能配置和DLQ已启用。

spring.cloud.stream.kafka.bindings.consumeTest-in-0:
    consumer:
        batch-mode: false
        ackEachRecord: true
        dlqName: test.file.error
        enableDlq: true
        dlqProducerProperties.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
        dlqProducerProperties.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
        maxAttempts: 5
        backOffInitialInterval: 5000
        backOffMaxInterval: 5000

当我使用来自主题的消息并发生异常时,我看到它重试3次,并将消息发送到DLQ。我已将maxAttemts配置为5,但我无法覆盖默认值3。

我使用的是spring kafka(2.7.8)和spring cloud(2020.0.4)。如何覆盖重试尝试和回退间隔参数?

我注意到,如果我在默认使用者级别(如下所示)添加参数,它们会被拾取,但不会在函数绑定级别:

spring.cloud.stream.default.consumer:
    maxAttempts: 5
    defaultRetryable: true
    backOffInitialInterval: 5000
    backOffMaxInterval: 5000
    backOffMultiplier: 1

谢啦

共有1个答案

宦文柏
2023-03-14

把它放在spring.cloud.stream.bindings.consumeTest-in-0.consumer.max尝试=5下面,而不是spring.cloud.stream.kafka.bindings.consumeTest-in-0.consumer.max尝试=5。

查看此处:通用属性

 类似资料:
  • 我是Guice的新手,正在为以下用例寻求帮助: 我开发了一个软件包(PCKG),其中该软件包的入门级依赖于其他类,例如: 在我的绑定模块中,我正在做: 注意我没有为A提供绑定信息,因为我想通过它的消费者类来提供它的绑定。(设计就是这样,所以我的要求是继续讨论主要问题,而不是设计)。 现在,我的消费者阶层正在做这样的事情: 同样在我的消费者包装中: Q1。我在消费者类中做的绑定正确吗?我很困惑,因为

  • 主要内容:1 并发消费重试,1.1 失败重试,1.2 超时重试,2 顺序消费重试,2.1 失败重试,2.2 超时重试,3 broker处理回退请求,3.1 asyncConsumerSendMsgBack处理回退请求,3.2 handleRetryAndDLQ处理重试和死信消息基于RocketMQ release-4.9.3,深入的介绍了DefaultMQPushConsumer消费者重试消息和死信消息源码。 消费重试:并发消费和顺序消费对于消费失败的消息均会有消息重试机制。 1 并发消费重试

  • 我尝试使用VS2019作为生成器,在Windows 10上使用CMake构建SRT。 通过配置步骤,我有: C编译器标识为MSVC 19.23.28106.4 CXX编译器标识为MSVC 19.23.28106.4检查工作的C编译器:C:/Program Files(x86)/Microsoft Visual Studio/2019/Community/VC/Tools/MSVC/14.23.28

  • 当消费者实例组出现时,会不会对Kafka的性能产生任何影响。重新启动时id已更改。老年人会发生什么。id它是否仍在代理内存中,或者何时将被删除?假设我有1000个消费者实例,并且所有实例都动态分配组。重新启动时的id。 可以为{log.retention.ms'}提供什么列表值。我可以设置为1毫秒吗?

  • 在我们的spring boot应用程序中,我们注意到Kafka消费者偶尔会在prod env中随机消费两次消息。我们在PCF中部署了6个实例和6个分区。我们发现在同一主题中收到两次具有相同偏移量和分区的消息,这会导致重复,对我们来说是业务关键。我们在非生产环境中没有注意到这一点,在非生产环境中很难复制。我们最近转向Kafka,但我们无法找到根本问题。 我们使用的是spring cloud stre

  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外: