我有一个使用Spring Cloud Stream和Spring Kafka的应用程序,它处理Avro消息。该应用程序运行良好,但现在我想添加一些错误处理。
目标是:捕获反序列化异常,使用异常详细信息原始Kafka消息自定义上下文信息构建新对象,并将此对象推送到专用Kafka主题。基本上是DLQ,但原始消息将被截获并修饰。
问题是:虽然我可以拦截异常,但我不知道如何从Kafka那里获取原始消息(下面的TODO 1)。我已经查看了ConsumerAwareErrorHandler中返回的数据对象。把手,我在那里看不到。
以下是我的代码:
@EnableBinding(EventStream.class)
@SpringBootApplication
@Slf4j
public class SpringcloudApplication {
public static void main(String[] args) {
SpringApplication.run(SpringcloudApplication.class, args);
}
/* Configure custom exception handler */
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> cust() {
return (container, destination, group) -> {
container.setErrorHandler(new ConsumerAwareErrorHandler() {
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
log.info("Got error with data: {}", data);
// TODO 1 - How to get original message?
// TODO 2 - Send to dedicated (DLQ) topic
}
});
};
}
@StreamListener(EventStream.INBOUND)
public void consumeEvent(@Payload Message message) {
log.info("Consuming event --> {}", message.toString());
produceEvent(message);
}
@Autowired private EventStream eventStream;
public Boolean produceEvent(Message message) {
log.info("Producing event --> {}", message.toString());
return eventStream
.producer()
.send(MessageBuilder.withPayload(message)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
}
}
以及属性文件:
spring:
cloud:
stream:
default-binder: kafka
default:
consumer:
useNativeEncoding: true
producer:
useNativeEncoding: true
kafka:
binder:
brokers: localhost:9092
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: "http://localhost:8081"
consumer-properties:
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
schema.registry.url: "http://localhost:8081"
specific.avro.reader: true
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
bindings:
event-consumer:
destination: data_stream_in # incoming topic
contentType: application/**avro
group: data_stream_consumer
event-producer:
destination: data_stream_out
contentType: application/**avro
我正在使用以下版本:
感谢您的帮助!
句柄
方法中的第二个参数是消费者记录
,它是原始的Kafka记录,但如果您希望记录自动发送到DLQ,您可以执行以下操作。
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setErrorHandler(errorHandler);
};
}
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
实际上,您正在设置一个SeekToCurrentErrorHandler,它能够将失败的记录发送到DLQ。有关死信发布恢复器(DeadLetterPublishingRecoverer)工作原理的更多详细信息,请参阅Apache Kafka的Spring参考文档:https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-信件;有关详细信息,请参阅文档交互处理程序:https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-至当前
您还需要配置和ErrorHandlingDeserializer,
spring.cloud.stream.kafka.binder.configuration.value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.cloud.stream.kafka.binder.configuration.spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
...
similar for the value class.
有关ErrorHandlingDeserializer
的更多信息:https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer
如果要修改记录并将自定义消息添加到DLQ,可以通过覆盖handle方法,然后访问ConsumerRecord,然后调用super class方法来实现。
我是Avro和Kafka的新手,我花了几天时间来发送关于Kafka主题的序列化数据...不成功。 让我来解释一下我想要达到的目标: 在生产者方面,我通过SOAP接收数据并发送关于Kafka主题的内容。我正在使用CXF从WSDL生成POJO,并且编写了相应的模式。我正在尝试做的是序列化由CXF解封的对象,并在我的Kafka主题上发送它们。 在web上找到的大多数示例中,Avro记录都是使用已知的模式
我试图使用Confluent Kafka REST Proxy从我的一个主题中检索Avro格式的数据,但不幸的是,我得到了一个反序列化错误。我使用以下命令查询Kafka REST代理 我得到的回应是 Kafka Rest Proxy服务器上的日志如下: 数据是使用KafkaAvroSerializer生成的,模式在模式注册表中。还请注意,在CLI上使用avro console consumer可以
目前,我正在使用Avro1.8.0序列化/反序列化对象,但面临一些问题,特别是java.util.Map对象。不面临其他类型对象的问题。 这里的示例代码- 在deserialize方法中,我试图根据输入数据获取模式,但avro抛出错误- 多谢了。
如何在批处理模式的情况下处理反序列化异常? 我正在使用Spring boot-2.3.8版本的Spring kafka。 尝试过此选项: 但它抛出了一个异常:由java引起。lang.IllegalStateException:错误处理程序必须是ErrorHandler,而不是org。springframework。Kafka。听众。请参阅OcuCurrentBatchErrorHandler 以
主要目标是聚合两个Kafka主题,一个压缩慢速移动数据,另一个每秒接收一次的快速移动数据。 我已经能够在简单的场景中使用消息,例如KV(Long, String),使用如下内容: 但是,当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个KV(字符串,AVRO),我需要消费。 我尝试从AVRO模式生成Java类,然后将它们包含在“应用”中,例如: 但这似乎不是正确的方法。 是否有任何文档/示
有没有一种方法让我忽略这些异常并在消费者处移动偏移量?我想,因为我使用手动偏移提交,我有这个问题。有人知道如何配置kafka-avro-serializer-6.0.0.jar来完成我想要的任务吗? 多谢了。