当前位置: 首页 > 知识库问答 >
问题:

使用Spring Cloud Stream处理Avro反序列化异常

谭志用
2023-03-14

我有一个使用Spring Cloud Stream和Spring Kafka的应用程序,它处理Avro消息。该应用程序运行良好,但现在我想添加一些错误处理。

目标是:捕获反序列化异常,使用异常详细信息原始Kafka消息自定义上下文信息构建新对象,并将此对象推送到专用Kafka主题。基本上是DLQ,但原始消息将被截获并修饰。

问题是:虽然我可以拦截异常,但我不知道如何从Kafka那里获取原始消息(下面的TODO 1)。我已经查看了ConsumerAwareErrorHandler中返回的数据对象。把手,我在那里看不到。

以下是我的代码:

@EnableBinding(EventStream.class)
@SpringBootApplication
@Slf4j
public class SpringcloudApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringcloudApplication.class, args);
    }

    /* Configure custom exception handler */
    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> cust() {
        return (container, destination, group) -> {
            container.setErrorHandler(new ConsumerAwareErrorHandler() {
                @Override
                public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
                    log.info("Got error with data: {}", data);
                    // TODO 1 - How to get original message?
                    // TODO 2 - Send to dedicated (DLQ) topic
                }
            });
        };
    }

    @StreamListener(EventStream.INBOUND)
    public void consumeEvent(@Payload Message message) {
        log.info("Consuming event --> {}", message.toString());
        produceEvent(message);
    }

    @Autowired private EventStream eventStream;
    public Boolean produceEvent(Message message) {
        log.info("Producing event --> {}", message.toString());
        return eventStream
                .producer()
                .send(MessageBuilder.withPayload(message)
                    .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                    .build());
    }

}

以及属性文件

spring:
  cloud:
    stream:
      default-binder: kafka
      default:
        consumer:
          useNativeEncoding: true
        producer:
          useNativeEncoding: true
      kafka:
        binder:
          brokers: localhost:9092
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: "http://localhost:8081"
          consumer-properties:
            key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            schema.registry.url: "http://localhost:8081"
            specific.avro.reader: true
            spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
            spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
      bindings:
        event-consumer:
          destination: data_stream_in # incoming topic
          contentType: application/**avro
          group: data_stream_consumer
        event-producer:
          destination: data_stream_out
          contentType: application/**avro

我正在使用以下版本:

  • Spring防尘套2.3.2。释放

感谢您的帮助!

共有1个答案

谭景明
2023-03-14

句柄方法中的第二个参数是消费者记录,它是原始的Kafka记录,但如果您希望记录自动发送到DLQ,您可以执行以下操作。

@Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
        return (container, dest, group) -> {
            container.setErrorHandler(errorHandler);
        };
    }



    @Bean
    public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
    }


    @Bean
    public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
        return new DeadLetterPublishingRecoverer(bytesTemplate);
    }

实际上,您正在设置一个SeekToCurrentErrorHandler,它能够将失败的记录发送到DLQ。有关死信发布恢复器(DeadLetterPublishingRecoverer)工作原理的更多详细信息,请参阅Apache Kafka的Spring参考文档:https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-信件;有关详细信息,请参阅文档交互处理程序:https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-至当前

您还需要配置和ErrorHandlingDeserializer,

spring.cloud.stream.kafka.binder.configuration.value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.cloud.stream.kafka.binder.configuration.spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
...
similar for the value class. 

有关ErrorHandlingDeserializer的更多信息:https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer

如果要修改记录并将自定义消息添加到DLQ,可以通过覆盖handle方法,然后访问ConsumerRecord,然后调用super class方法来实现。

 类似资料:
  • 我是Avro和Kafka的新手,我花了几天时间来发送关于Kafka主题的序列化数据...不成功。 让我来解释一下我想要达到的目标: 在生产者方面,我通过SOAP接收数据并发送关于Kafka主题的内容。我正在使用CXF从WSDL生成POJO,并且编写了相应的模式。我正在尝试做的是序列化由CXF解封的对象,并在我的Kafka主题上发送它们。 在web上找到的大多数示例中,Avro记录都是使用已知的模式

  • 我试图使用Confluent Kafka REST Proxy从我的一个主题中检索Avro格式的数据,但不幸的是,我得到了一个反序列化错误。我使用以下命令查询Kafka REST代理 我得到的回应是 Kafka Rest Proxy服务器上的日志如下: 数据是使用KafkaAvroSerializer生成的,模式在模式注册表中。还请注意,在CLI上使用avro console consumer可以

  • 如何在批处理模式的情况下处理反序列化异常? 我正在使用Spring boot-2.3.8版本的Spring kafka。 尝试过此选项: 但它抛出了一个异常:由java引起。lang.IllegalStateException:错误处理程序必须是ErrorHandler,而不是org。springframework。Kafka。听众。请参阅OcuCurrentBatchErrorHandler 以

  • 目前,我正在使用Avro1.8.0序列化/反序列化对象,但面临一些问题,特别是java.util.Map对象。不面临其他类型对象的问题。 这里的示例代码- 在deserialize方法中,我试图根据输入数据获取模式,但avro抛出错误- 多谢了。

  • 主要目标是聚合两个Kafka主题,一个压缩慢速移动数据,另一个每秒接收一次的快速移动数据。 我已经能够在简单的场景中使用消息,例如KV(Long, String),使用如下内容: 但是,当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个KV(字符串,AVRO),我需要消费。 我尝试从AVRO模式生成Java类,然后将它们包含在“应用”中,例如: 但这似乎不是正确的方法。 是否有任何文档/示

  • 有没有一种方法让我忽略这些异常并在消费者处移动偏移量?我想,因为我使用手动偏移提交,我有这个问题。有人知道如何配置kafka-avro-serializer-6.0.0.jar来完成我想要的任务吗? 多谢了。