我正在建立一个使用Kafka消息的应用程序。
为了捕获反序列化异常,我学习了Spring-docs关于反序列化错误处理的知识。我已经尝试了failedDeserializationFunction方法。
这是我的使用者配置类
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
/* Error Handling */
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);
return consumerProps;
}
@Bean
public ConsumerFactory<String, NTCMessageBody> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(NTCMessageBody.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
这是双功能提供程序
public class FailedNTCMessageBodyProvider implements BiFunction<byte[], Headers, NTCMessageBody> {
@Override
public NTCMessageBody apply(byte[] t, Headers u) {
return new NTCBadMessageBody(t);
}
}
public class NTCBadMessageBody extends NTCMessageBody{
private final byte[] failedDecode;
public NTCBadMessageBody(byte[] failedDecode) {
this.failedDecode = failedDecode;
}
public byte[] getFailedDecode() {
return this.failedDecode;
}
}
当我只发送一条关于主题的损坏消息时,我得到了这个错误(在循环中):
经常这样。
ErrorHandlingDeserializer
当反序列化器反序列化消息失败时,Spring没有办法处理该问题,因为它发生在poll()返回之前。为了解决这个问题,2.2版引入了ErrorHandlingDeserializer。此反序列化程序委托给一个真正的反序列化程序(键或值)。如果委托未能反序列化记录内容,ErrorHandlingDeserializer将返回一个DeserializationException,其中包含原因和原始字节。使用记录级MessageListener时,如果键或值包含DeserializationException,则使用失败的ConsumerRecord调用容器的ErrorHandler。在使用BatchMessageListener时,失败的记录将与批处理中的剩余记录一起传递给应用程序,因此应用程序侦听器负责检查特定记录中的键或值是否是DeserializationException。
因此,根据您的代码,您正在使用记录级MessageListener
,然后只需将ErrorHandler
添加到Container
处理异常
如果您的错误处理程序实现了此接口,例如,您可以相应地调整偏移量。例如,要重置偏移量以重播失败的消息,可以执行如下操作;但是请注意,这些都是简单的实现,您可能需要在错误处理程序中进行更多的检查。
@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
return (m, e, c) -> {
this.listen3Exception = e;
MessageHeaders headers = m.getHeaders();
c.seek(new org.apache.kafka.common.TopicPartition(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
headers.get(KafkaHeaders.OFFSET, Long.class));
return null;
};
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setErrorHandler(new ErrorHandler() {
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
//log.info("Skipping " + topic + "-" + partition + " offset " + offset);
consumer.seek(topicPartition, offset + 1);
System.out.println("OKKKKK");
}
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
}
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {
String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
//log.info("Skipping " + topic + "-" + partition + " offset " + offset);
consumer.seek(topicPartition, offset + 1);
System.out.println("OKKKKK");
}
});
return factory;
}
我使用的是Spring Kafka 1.1.2-Spring Boot 1.5.0 RC版本,并且配置了一个自定义值serialiser/Deserialiser类,扩展/。这些类确实使用Jackson ObjectMapper,它可以通过构造函数提供。 是否可以从Spring上下文中注入ObjectMapper?我已经配置了一个ObjectMapper,我希望在序列化/反序列化程序中重用它。
我想检查在页面之间导航时是否收到错误消息。当错误消息出现时,测试将被终止并进行报告。为此,我使用了以下与站点上的错误消息相关的元素。
我需要序列化/反序列化特定枚举: 我有个例外: 我如何通过GSON序列化/反序列化它?
我的数据服务如下所示: 如果我得到一个HTTP错误(即404),我会从core得到一条令人讨厌的控制台消息:error error:Uncaught(promise):[object]。es5。在我的情况下,我该如何处理?
当我只是从集群中获取文档时,我无法在spring data elasticsearch中反序列化OffsetDateTime。有一些方法可以解决这个问题,但大多数都适用于以前版本的Spring boot。在spring data elasticsearch的当前快照版本中,jackson映射器也被删除,并替换为一些映射器类。 有人可以给我一个提示,如何正确解决这个问题与当前版本的启动弹性6。x/7