当前位置: 首页 > 知识库问答 >
问题:

Spring云流kafka使用者错误处理和重试问题

臧梓
2023-03-14

我需要在Spring云流kafka活页夹错误处理场景的帮助。我的应用程序有一个java 8消费者,其绑定在application.yaml中指定。

@Bean
public Consumer<Message<Transaction>> doProcess() {

    return message -> {
        Transaction transaction = message.getPayload();
       
        if(true) {
            throw new RuntimeException("exception!! !!:)");
        }
       Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, 
       Acknowledgment.class);
       if (acknowledgment != null) {
           System.out.println("Acknowledgment provided");
           acknowledgment.acknowledge();
       }
  }
}

application.yaml:

spring.application.name: appname
spring.cloud.stream:
  function.definition: doProcess
  kafka:
    default.consumer:
      startOffset: latest
      useNativeDecoding: true
    bindings:
      input.consumer.autoCommitOffset: false

bindings:
  doProcess-in-0:
    destination: kafka.input.topic.name
    group: appGroup
    content-type: application/*+avro
    consumer:
      autoCommitOffset: false.

现在,我正在处理错误,有两个问题:

>

  • 我正在尝试手动包装消息的消费,而不是使用自动提交偏移设置为真。因此,当我将自动提交偏移设置设为假并测试错误场景时,会面临奇怪的行为,即每当抛出异常时,消息都会重试n次,即使在服务重新启动后也会发生失败消息的重试/重新传递(如果在n次重试完成之前重新启动)。一旦n次重试完成,即使在服务重新启动后也不会选择消息。那么这是否意味着,消费者在n次重试/重新传递消息后提交偏移量,这不应该是这种情况,因为自动提交偏移是错误的。

    注意:我尚未配置任何 dlq。

    我们需要编写自定义异常处理程序,在那里我们可以捕获异常(应用程序代码和框架中的错误)并通过AWS环境中的电子邮件向用户组发送通知。但是,我们无法找到任何可以捕获这两种类型异常的错误处理程序。比如扩展SeekTo货币错误处理程序或任何其他可以在错误事件上调用的侦听器。

    编辑:

    根据 Gary 提供的解决方案,我们可以使用以下 bean 来配置自定义错误处理程序:

    @Bean
        public ListenerContainerCustomizer<AbstractMessageListenerContainer> MQLCC() {
            System.out.println(String.format("DEBUG: Bean %s has bean created.", "MQLCC"));
            return new ListenerContainerCustomizerCustom ();
        }
    
        private static class ListenerContainerCustomizerCustom implements ListenerContainerCustomizer<AbstractMessageListenerContainer> {
            @Override
            public void configure(AbstractMessageListenerContainer container, String destinationName, String group) {
                System.out.println(String.format("HELLO from container %s, destination: %s, group: %s", container, destinationName, group));
            }
    
        }
    
  • 共有1个答案

    蒋星雨
    2023-03-14

    侦听器容器中的默认错误处理程序将重试10次,然后记录错误并丢弃记录;对于不同的行为,您需要配置自定义错误处理程序和恢复策略。使用ListenerContainerCustomizerbean来配置容器。

    请参阅https://docs . spring . io/spring-Kafka/docs/current/reference/html/# default-eh和https://docs . spring . io/spring-Kafka/docs/current/reference/html/# dead-letters

    (3.2及更高版本)

    https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#seek-至当前和https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#dead-字母

    对于早期版本。

     类似资料:
    • 我设置了一个Spring云流Kafka制作人和消费者,有3个Kafka经纪人在运行。我已经设置了min.insync。将副本复制到4,以查看生产者错误处理的工作方式。消息通道。send(发送) 以上是我的生产者配置。虽然retries设置为3,但生产者仍会多次重试。虽然sync设置为true,但发送呼叫会立即发出。虽然定义了错误通道和目标,并且将errorChannelEnabled设置为true

    • 大家好,我们使用的是Spring kafka 1.3.3,我们的应用程序是消耗-进程-发布管道。 如果在生产阶段流水线出现任何故障,我们如何处理重试并寻求返回。例如:应用程序正在消耗消息,处理它们并以异步方式发布到另一个主题中。但如果在发布中有任何错误

    • 我们在Spring Cloud Stream上有一个应用程序,它与Project React集成在一起。我们通过在消息标题中设置spring.cloud.stream.sendto.destination来动态设置目标主题并发布消息。我们正在寻找处理错误的情况下,如Kafka服务器断断续续或主题不可用而发布。我们已经实现了@ServiceActivator来处理所有错误。动态设置主题时,Servi

    • 我已经使用Spring云流启动了一个小型微服务。 我只有两个流绑定,如下所示: 我用Serenity开发了组件测试,我将通道注入到我想要发送测试消息的地方: 哪里: 只是定义为字符串常量: 组件测试模块导入依赖项: 我发送的信息如下: 快乐流工作正常。但是,我想在侦听器无法处理消息时测试错误流。 这是一个监听器的例子: 从try/catch引发异常时,错误由服务激活器处理: 在没有Spring-C

    • 我正在开发一个应用程序,在该应用程序中,事件会导致spring data repository保存数据; 此代码可以引发各种异常,如DataIntegrityViolationException(运行时异常)。 处理此类异常和 生成带有导致此错误的有效负载的消息 例外, 允许生产者采取操作。

    • 我用Spring云溪和Kafka溪。假设我有一个处理器,它的功能是将KStream字符串转换为KStream CityProgrammes。它调用一个API来根据名称查找城市,并调用另一个转换来查找该城市附近的任何事件。 现在的问题是,任何错误发生在转换期间,整个应用程序停止。我想把一个特定的消息发送给DLQ,然后继续前进。我已经读了几天了,每个人都建议在被调用的服务中处理错误,但在我看来这是一个