当前位置: 首页 > 知识库问答 >
问题:

我可以使用spring kafka RecoveringBatchErrorHandler来处理反序列化异常吗?

劳法
2023-03-14

我想要一个批处理侦听器,它在失败记录之前提交偏移量,记录失败记录,然后从失败记录之后的第一个偏移量开始检索新的批处理。

我目前的方法是通过抛出一个BatchListenerFailedException来处理侦听器代码中抛出的异常,该异常由RecoveringBatchErrorHandler按照我的意愿处理。然而,我希望以这种方式处理所有例外情况;也就是说,侦听器引发的异常以及由于反序列化失败导致的任何异常。我使用的是BatchMessagingMessageConverter。我理解,如果Kafka反序列化程序中出现反序列化异常,我可以使用ErrorHandling反序列化程序;然而,我在MessagingMessageConverter中的配置出现反序列化异常,我认为这是在Kafka客户端BytesDeserializer成功反序列化我的消息之后。我怎样才能最好地实现我的目标?

这是我的容器工厂配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
         ConsumerFactory<Object, Object> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true);
    RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler(
            new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, 2)
    );
    factory.setBatchErrorHandler(errorHandler);
    BatchMessagingMessageConverter messageConverter = new BatchMessagingMessageConverter(new BytesJsonMessageConverter());
    factory.setMessageConverter(messageConverter);
    factory.setConcurrency(1);
    return factory;
}

@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "pojo-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
    props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, BytesDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, BytesDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return new DefaultKafkaConsumerFactory<>(props);
}

下面是我的听众:

@KafkaListener(id = "myKafkaListener", idIsGroup = false, autoStartup = "true", topics = {"pojo-topic"}, containerFactory = "kafkaListenerContainerFactory")
public void receive(List<Message<Pojo>> messages) {
    System.out.println("received " + messages.size() + " messages");
    int i = 0;
    try {
        //exceptions thrown here are handled as I intend
        for (var mm : messages) {
            var m = mm.getPayload();
            System.out.println("received: " + m + " at offset " + mm.getHeaders().get(KafkaHeaders.OFFSET, Long.class));
            i++;
        }
    } catch (Exception e) {
        throw new BatchListenerFailedException("listener threw exception when processing batch", e, i);
    }
}

使现代化

以下是我发送字符串(仅“a”)而不是json对象,并且反序列化失败时的堆栈跟踪:

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:79) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.ContainerAwareBatchErrorHandler.handle(ContainerAwareBatchErrorHandler.java:56) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2015) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1859) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1725) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1704) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1274) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1266) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2376) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2008) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1978) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1930) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1842) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 8 common frames omitted
Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:122) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:174) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:322) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:153) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1988) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 11 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3593) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2688) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:870) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:762) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4684) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3643) ~[jackson-databind-2.12.4.jar:2.12.4]
    at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:119) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 16 common frames omitted

共有1个答案

艾凌龙
2023-03-14

这里有两个解决方案;第一种方法使用ErrorHandlingDeserializerJsonDeserializer。第二个是解决方法,使用了ByteArrayJsonDeserializer,我打开了一个问题,以便在批处理侦听器适配器中提供更无缝的解决方案。

示例1,使用反序列化程序:

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Foo
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.listener.type=batch
@SpringBootApplication
public class So69055510Application {

    public static void main(String[] args) {
        SpringApplication.run(So69055510Application.class, args);
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so69055510").partitions(1).replicas(1).build();
    }

    @Bean
    NewTopic dlt() {
        return TopicBuilder.name("so69055510.DLT").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "so69055510", topics = "so69055510")
    void listen(List<Foo> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
        for (int i = 0; i < in.size(); i++) {
            Foo foo = in.get(i);
            if (foo == null
                    && headers.get(i).get(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {

                throw new BatchListenerFailedException("deserialization error",
                        new DeserializationException("Batch listener", null, false, null), i);
            }
            System.out.println(foo);
        }
    }

    @KafkaListener(id = "so69055510.DLT", topics = "so69055510.DLT",
            properties = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG +
                ":org.apache.kafka.common.serialization.StringDeserializer")
    void listenDlt(String in) {
        System.out.println("DLT: " + in);
    }

    @Bean
    BatchErrorHandler eh(ProducerFactory<String, byte[]> pf) {
        KafkaTemplate<String, byte[]> template = new KafkaTemplate<>(pf,
                Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
        RecoveringBatchErrorHandler eh = new RecoveringBatchErrorHandler(new DeadLetterPublishingRecoverer(template));
        eh.setLogLevel(Level.DEBUG);
        return eh;
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so69055510", "{\"bar\":\"baz\"}");
            template.send("so69055510", "JUNK");
            template.send("so69055510", "{\"bar\":\"qux\"}");
        };
    }

}
Foo [bar=baz]
DLT: JUNK
Foo [bar=qux]

示例2,使用自定义消息转换器。请注意,对于此解决方案,您需要以某种方式在域对象中指示反序列化失败:

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
@SpringBootApplication
public class So69055510Application {

    public static void main(String[] args) {
        SpringApplication.run(So69055510Application.class, args);
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so69055510").partitions(1).replicas(1).build();
    }

    @Bean
    NewTopic dlt() {
        return TopicBuilder.name("so69055510.DLT").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "so69055510", topics = "so69055510")
    void listen(List<Foo> in) {
        for (int i = 0; i < in.size(); i++) {
            Foo foo = in.get(i);
            if (foo.getBar().equals("thisIsABadOne")) {
                throw new BatchListenerFailedException("deserialization error",
                        new DeserializationException("Batch listener", null, false, null), i);
            }
            System.out.println(foo);
        }
    }

    @KafkaListener(id = "so69055510.DLT", topics = "so69055510.DLT")
    void listenDlt(String in) {
        System.out.println("DLT: " + in);
    }

    @Bean
    ByteArrayJsonMessageConverter converter() {
        return new ByteArrayJsonMessageConverter() {

            @Override
            public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                    Consumer<?, ?> consumer, Type type) {

                try {
                    return super.toMessage(record, acknowledgment, consumer, Foo.class); // <<<<<< type
                }
                catch (ConversionException ex) {
                    return MessageBuilder.withPayload(new Foo("thisIsABadOne"))
                            .build();
                }
            }

        };
    }

    @Bean
    BatchErrorHandler eh(KafkaTemplate<String, byte[]> template) {
        RecoveringBatchErrorHandler eh = new RecoveringBatchErrorHandler(new DeadLetterPublishingRecoverer(template));
        eh.setLogLevel(Level.DEBUG);
        return eh;
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, byte[]> template) {
        return args -> {
            template.send("so69055510", "{\"bar\":\"baz\"}".getBytes());
            template.send("so69055510", "JUNK".getBytes());
            template.send("so69055510", "{\"bar\":\"qux\"}".getBytes());
        };
    }

}
Foo [bar=baz]
DLT: JUNK
Foo [bar=qux]
 类似资料:
  • 我有一个使用Spring Cloud Stream和Spring Kafka的应用程序,它处理Avro消息。该应用程序运行良好,但现在我想添加一些错误处理。 目标是:捕获反序列化异常,使用异常详细信息原始Kafka消息自定义上下文信息构建新对象,并将此对象推送到专用Kafka主题。基本上是DLQ,但原始消息将被截获并修饰。 问题是:虽然我可以拦截异常,但我不知道如何从Kafka那里获取原始消息(下

  • 如何在批处理模式的情况下处理反序列化异常? 我正在使用Spring boot-2.3.8版本的Spring kafka。 尝试过此选项: 但它抛出了一个异常:由java引起。lang.IllegalStateException:错误处理程序必须是ErrorHandler,而不是org。springframework。Kafka。听众。请参阅OcuCurrentBatchErrorHandler 以

  • 试图在Java中使用protobuf反序列化消息,并得到以下异常。 原因:com.google.protobuf.InvalidProtocolBufferException:在解析协议消息时,输入意外地在字段中间结束。这可能意味着输入被截断,或者嵌入的消息错误报告了自己的长度。在com.google.protobuf.InvalidProtocolBufferException.Truncate

  • 在使用JSON库(如Jackson)反序列化时,如果JSON数据与目标类型不匹配,会抛出MismatchedInputException异常。 通过下面的代码可以复现问题。

  • 我创建了一个新的自定义反序列化程序,当json中存在空字符串时,该反序列化程序将作为null返回 我的问题是从性能的角度来看的。我想优化一个事实,即在字段的每一个可能的更改中都必须更改反序列化器。有可能优化你的吗?我正在考虑使用Reflaction,但我不知道怎么做。有可能吗?

  • 问题内容: 使用Gson的方法时,可以使用某种给定类的set 方法吗? 我想这样做是因为对于目标类的每个全局变量都会进行修剪。 为此有任何GSON API注释吗? 我知道GSON提供了编写自定义序列化器/反序列化器的功能,但我想知道是否还有另一种方法可以实现此目的。 问题答案: 我实施了并在上注册了。因此,对于收到的所有String字段,Gson将使用my 反序列化该值。 下面是我的代码: …以及