当前位置: 首页 > 知识库问答 >
问题:

如何捕捉Kafka-Spring中的反序列化错误?

陶柏
2023-03-14

我正在建立一个使用Kafka消息的应用程序

为了捕获反序列化异常,我学习了Spring-docs关于反序列化错误处理的知识。我已经尝试了failedDeserializationFunction方法。

这是我的使用者配置类

@Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);

        /*  Error Handling */
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);

        return consumerProps;
    }

    @Bean
    public ConsumerFactory<String, NTCMessageBody> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new JsonDeserializer<>(NTCMessageBody.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

这是双功能提供程序

public class FailedNTCMessageBodyProvider implements BiFunction<byte[], Headers, NTCMessageBody> {

    @Override
    public NTCMessageBody apply(byte[] t, Headers u) {
        return new NTCBadMessageBody(t);
    }

}

public class NTCBadMessageBody extends NTCMessageBody{

    private final byte[] failedDecode;

    public NTCBadMessageBody(byte[] failedDecode) {
        this.failedDecode = failedDecode;
    }

    public byte[] getFailedDecode() {
        return this.failedDecode;
    }

}

当我只发送一条关于主题的损坏消息时,我得到了这个错误(在循环中):

经常这样。

共有1个答案

谢涵亮
2023-03-14

ErrorHandlingDeserializer

当反序列化器反序列化消息失败时,Spring没有办法处理该问题,因为它发生在poll()返回之前。为了解决这个问题,2.2版引入了ErrorHandlingDeserializer。此反序列化程序委托给一个真正的反序列化程序(键或值)。如果委托未能反序列化记录内容,ErrorHandlingDeserializer将返回一个DeserializationException,其中包含原因和原始字节。使用记录级MessageListener时,如果键或值包含DeserializationException,则使用失败的ConsumerRecord调用容器的ErrorHandler。在使用BatchMessageListener时,失败的记录将与批处理中的剩余记录一起传递给应用程序,因此应用程序侦听器负责检查特定记录中的键或值是否是DeserializationException。

因此,根据您的代码,您正在使用记录级MessageListener,然后只需将ErrorHandler添加到Container

处理异常

如果您的错误处理程序实现了此接口,例如,您可以相应地调整偏移量。例如,要重置偏移量以重播失败的消息,可以执行如下操作;但是请注意,这些都是简单的实现,您可能需要在错误处理程序中进行更多的检查。

@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
return (m, e, c) -> {
    this.listen3Exception = e;
    MessageHeaders headers = m.getHeaders();
    c.seek(new org.apache.kafka.common.TopicPartition(
            headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
            headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
            headers.get(KafkaHeaders.OFFSET, Long.class));
    return null;
   };
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
kafkaListenerContainerFactory()  {

    ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setErrorHandler(new ErrorHandler() {
        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
            String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
            String topics = s.split("-")[0];
            int offset = Integer.valueOf(s.split("offset ")[1]);
            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

            TopicPartition topicPartition = new TopicPartition(topics, partition);
            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
            consumer.seek(topicPartition, offset + 1);
            System.out.println("OKKKKK");
        }

        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {

        }

        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {
            String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
            String topics = s.split("-")[0];
            int offset = Integer.valueOf(s.split("offset ")[1]);
            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

            TopicPartition topicPartition = new TopicPartition(topics, partition);
            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
            consumer.seek(topicPartition, offset + 1);
            System.out.println("OKKKKK");


        }
    });


    return factory;
}
 类似资料:
  • 我使用的是Spring Kafka 1.1.2-Spring Boot 1.5.0 RC版本,并且配置了一个自定义值serialiser/Deserialiser类,扩展/。这些类确实使用Jackson ObjectMapper,它可以通过构造函数提供。 是否可以从Spring上下文中注入ObjectMapper?我已经配置了一个ObjectMapper,我希望在序列化/反序列化程序中重用它。

  • 我想检查在页面之间导航时是否收到错误消息。当错误消息出现时,测试将被终止并进行报告。为此,我使用了以下与站点上的错误消息相关的元素。

  • 我需要序列化/反序列化特定枚举: 我有个例外: 我如何通过GSON序列化/反序列化它?

  • 我的数据服务如下所示: 如果我得到一个HTTP错误(即404),我会从core得到一条令人讨厌的控制台消息:error error:Uncaught(promise):[object]。es5。在我的情况下,我该如何处理?

  • 当我只是从集群中获取文档时,我无法在spring data elasticsearch中反序列化OffsetDateTime。有一些方法可以解决这个问题,但大多数都适用于以前版本的Spring boot。在spring data elasticsearch的当前快照版本中,jackson映射器也被删除,并替换为一些映射器类。 有人可以给我一个提示,如何正确解决这个问题与当前版本的启动弹性6。x/7