当前位置: 首页 > 知识库问答 >
问题:

SeekToCurInterrorHandler:DeadLetterPublishingRecoverer未处理反序列化错误

储思聪
2023-03-14

我正在尝试使用spring-kafka版本2.3.0编写kafka消费者。M2库。为了处理运行时错误,我使用SeekToSumtErrorHandler.class和DeadLetterPublishingRecoverer作为我的恢复器。这仅在我的消费者代码抛出异常时才正常工作,但在无法反序列化消息时失败。

我尝试自己实现ErrorHandler,我很成功,但使用这种方法,我自己最终编写了DLT代码来处理我不想做的错误消息。

下面是我的Kafka财产

spring:
   kafka:
     consumer:
        bootstrap-servers: localhost:9092
        group-id: group_id
        auto-offset-reset: latest
        key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
        value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
        properties:
          spring.json.trusted.packages: com.mypackage
          spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
          spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory,
        KafkaTemplate<Object, Object> template) {
      ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
      configurer.configure(factory, kafkaConsumerFactory);
    factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), maxFailures));}

共有1个答案

郤仰岳
2023-03-14

这对我来说很好(请注意,Boot将自动配置错误处理程序)。。。

java prettyprint-override">@SpringBootApplication
public class So56728833Application {

    public static void main(String[] args) {
        SpringApplication.run(So56728833Application.class, args);
    }

    @Bean
    public SeekToCurrentErrorHandler errorHandler(KafkaTemplate<String, String> template) {
        SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3);
        eh.setClassifier( // retry for all except deserialization exceptions
                new BinaryExceptionClassifier(Collections.singletonList(DeserializationException.class), false));
        return eh;
    }

    @KafkaListener(id = "so56728833"
            + "", topics = "so56728833")
    public void listen(Foo in) {
        System.out.println(in);
        if (in.getBar().equals("baz")) {
            throw new IllegalStateException("Test retries");
        }
    }

    @KafkaListener(id = "so56728833dlt", topics = "so56728833.DLT")
    public void listenDLT(Object in) {
        System.out.println("Received from DLT: " + (in instanceof byte[] ? new String((byte[]) in) : in));
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so56728833").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic dlt() {
        return TopicBuilder.name("so56728833.DLT").partitions(1).replicas(1).build();
    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}
spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      properties:
        spring.json.trusted.packages: com.example
        spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.value.default.type: com.example.So56728833Application$Foo
    producer:
      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

logging:
  level:
    org.springframework.kafka: trace

我在这个话题上有三条记录:

"badJSON"
"{\"bar\":\"baz\"}"
"{\"bar\":\"qux\"}"

我看到第一个直接去DLT,第二个在3次尝试后去那里。

 类似资料:
  • 问题内容: 我的问题很简单:我有以下简单的类: 我正在尝试处理以下JSON: 显然,这里存在一个问题(“ blah”无法解析为int) 以前,Jackson抛出类似org.codehaus.jackson.map.JsonMappingException的内容:无法从字符串值’blah’构造java.lang.Integer的实例:不是有效的Integer值 我同意这一点,但是我想在某个地方注册一

  • 问题内容: 我的问题很简单:我有以下简单的类: 我正在尝试处理以下JSON: 显然,这里存在一个问题(“ blah”无法解析为int) 以前,Jackson抛出类似org.codehaus.jackson.map.JsonMappingException的内容:无法从字符串值’blah’构造java.lang.Integer的实例:不是有效的Integer值 我同意这一点,但是我想在某个地方注册一

  • 问题内容: 我有一个像这样的课程: 我想序列化它们的列表: 当我进行序列化时,动物属性的类型信息将丢失。有没有一种方法可以某种方式安装解析器侦听器,以便在遇到列表中的每个元素时提供正确的类进行反序列化?这就是手动提供描述类类型的字符串的想法。 谢谢 问题答案: Gson项目代码库中的RuntimeTypeAdapter据说可以很好地用于多态序列化和反序列化。我认为我尚未尝试使用它。有关更多信息,请

  • 我在Spring Kafka设置中使用Avro和模式注册表。 我想以某种方式处理,它可能在反序列化过程中引发。 我找到了以下两个资源: https://github.com/spring-projects/spring-kafka/issues/164 有人知道如何在Spring Boot中捕获吗? 我使用的是Spring Boot 2.0.2 编辑:我找到了解决办法。

  • 问题内容: 我试图反序列化以DateTime作为修饰符的类: 但是,当我尝试tro反序列化时,却遇到以下异常: 我用它来反序列化: 还有我的jsonData的示例: 问题答案: 期望使用无参数构造函数。的最新版本没有这样的构造函数。 如果您已固定格式,即。应该只是一个时间戳,那么你可以简单地注册与。它将在内部用于字段。您可以摆脱注释。 您需要添加库。

  • 我在尝试反序列化xml时收到以下错误。这会产生错误: XmlSerializer serializer=新XmlSerializer(typeof(PrivateOptionsAPIResponse)) var result=序列化程序。反序列化(streamReader); 例外情况: 系统捕获到InvalidOperationException消息=XML文档(0,0)中有错误 InnerEx