当前位置: 首页 > 知识库问答 >
问题:

Spring云流kafka活页夹重试

湛光华
2023-03-14

我有一个使用kafka活页夹的spring cloud stream应用程序,它可以消费和发送消息。在应用程序中,我使用重试策略配置自定义错误处理程序,并将不可重试的异常添加到处理程序中。配置示例:

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
    SeekToCurrentErrorHandler customErrorHandler
) {
    return (((container, destinationName, group) -> {
        container.setErrorHandler(customErrorHandler);
    }));
}

@Bean
public SeekToCurrentErrorHandler customErrorHandler() {
    var errorHandler = new SeekToCurrentErrorHandler(
        (consumerRecord, e) -> log.error("Got exception skip record record: {}", consumerRecord, e),
        new FixedBackOff(1000L, 10)
    );
    errorHandler.addNotRetryableException(App.MyCustomException.class);
    return errorHandler;
}

但是我看到,如果异常抛出,比应用程序重试处理消息3次。预期行为-如果App. MyCustomException.class抛出,将不会重复消费消息。如何为Spring云流kafka绑定应用程序配置重试策略?

这里的代码示例:github

运行复制问题的测试。

共有1个答案

桓智敏
2023-03-14

您提供的自定义是针对容器级错误处理程序的。Binder有不同的重试机制。您可以将以下内容添加到您的配置中,以确保发生异常时不会重试记录。

spring.cloud.stream:
    bindings:
      processor-in-0:
        ...
        consumer:
          retryableExceptions:
            ru.vichukano.kafka.binder.retry.App.MyCustomException: false

当我尝试这样做时,我没有看到消息被重新传递。

以下是对此的一些解释。

 类似资料:
  • 如何使用Spring Cloud Stream Kafka Binder为生产者启用压缩(例如GZIP)?

  • 我正在尝试通过SCSt频道构建并获取KTable。但这并不奏效。输入KTable没有数据,但如果我尝试查看KSTream聚合(toStream()),我可以看到一些数据。我明白了,KTable是不可查询的,它没有可查询的名称。 类别: 绑定: application.yml:

  • 我们如何使用Spring-Cloud-stream-binder-kinesis建立两个AWS kinesis连接? 第一个连接:Spring应用程序和AWS kinesis流在同一个AWS账户中。 第二个连接:其他AWS运动流位于不同的AWS帐户中。 从spring应用程序到不同AWS帐户中的两个不同运动流是否可能有两个不同的连接?如果是,我们如何实施?

  • 我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff

  • spring . cloud . stream . Kafka . binder . zknodes是必须的吗?如果价值缺失会发生什么?