当前位置: 首页 > 知识库问答 >
问题:

如何使用ErrorHandlingDeserializer与Spring云流绑定器kafka将错误记录发送到默认的DLQ主题?

苏晓博
2023-03-14

我想使用ErrorHandlingDeserializer来处理反序列化错误,并使用spring cloud stream binder kafka将错误记录发送到kafka binder的默认DLQ主题。

spring:
  application:
    name: my-service
  cloud:
    ## ====================================================== Vault Setting Omitted ====================================================
    vault:
    ## ====================================================== Vault Setting Omitted ====================================================
    schemaRegistryClient:
      enabled: true
      endpoint: my-schema-registry.com
      basic:
        auth:
          # user: This property is resolved from Vault.
          # password: This property is resolved from Vault.
    stream:
      kafka:
        heartbeat:
          interval:
            ms: 1000
        binder:
          brokers: my-kafka-broker.com:9094
          ## =============================================== SerDes ====================================================
          consumer-properties:
            allow.auto.create.topics: false
            specific.avro.reader: true
            schema.registry.url: ${spring.cloud.schemaRegistryClient.endpoint}
            auto.register.schemas: false
            basic.auth.credentials.source: USER_INFO
            basic.auth.user.info: ${spring.cloud.schemaRegistryClient.basic.auth.user}:${spring.cloud.schemaRegistryClient.basic.auth.password}
          producer-properties:
            allow.auto.create.topics: false
            key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.reflection: true
            schema.registry.url: ${spring.cloud.schemaRegistryClient.endpoint}
            auto.register.schemas: false
            basic.auth.credentials.source: USER_INFO
            basic.auth.user.info: ${spring.cloud.schemaRegistryClient.basic.auth.user}:${spring.cloud.schemaRegistryClient.basic.auth.password}
          ## =========================================== Kafka Security Omitted ================================================
          configuration:
          ## =========================================== Kafka Security Omitted ================================================
        bindings:
          my-in-0:
            consumer:
              enableDlq: true
              autoCommitOnError: true
              configuration:
                key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
                value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
                spring.deserializer.key.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
                spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
              dlqProducerProperties:
                configuration:
                  key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
                  value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      default:
        binder: kafka
        producer:
          useNativeEncoding: true
        consumer:
          useNativeEncoding: true
      function:
        definition: myFunction
      bindings:
        my-in-0:
          binder: kafka
          destination: my-in-topic
          content-type: application/*+avro
          group: myGroup
        my-out-0:
          binder: kafka
          destination: my-out-topic
          content-type: application/*+avro
          group: myGroup

共有1个答案

屠瑞
2023-03-14

禁用绑定中的dlq设置,并使用配置了死信发布恢复器的SeekTocurErrorHandler配置侦听器容器(使用ListenerContainerCustomizer bean)。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers

https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-信件

 类似资料:
  • 我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,

  • 当一个DLQ被设置为一个Spring云流Kafka消费者时,DLQ写入的主题可以被分区吗?我有一个要求,使密钥等于一个特定的字段,我想知道这将如何与Spring云流。

  • 我正在努力实现以下目标: 使用spring cloud Stream2.1.3.Release with Kafka binder将消息发送到输入通道,并实现发布订阅行为,其中每个消费者都将得到通知,并且能够处理发送到Kafka主题的消息。 我明白,在Kafka,如果每一个消费者都属于自己的消费者群体,就能从一个话题中读到每一条信息。在我的案例中,spring为我运行的spring boot应用程

  • 我在sping-boot应用程序中使用sping-kafka发送数据主题。我需要从oracle表中获取数据并发送它。 我从oracle表中获取列表。如何将它们发送到主题? 即。 > 有没有办法将它们作为列表发送?如果是,如何发送?如果是,那么如何在消费者端反序列化它? 是否可以使用spring book和spring kafka以流式方式发送数据?如果是,请提供更多信息或样本/片段plz。。。 如

  • 如何使用新的Spring Cloud Stream Kafka功能模型发送消息? 不推荐的方式是这样的。 但是我如何以函数式风格发送消息呢? 应用yml公司 我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!

  • 在使用ErrorHandlingDeserializer处理Avro组合的错误时,我无法发布到Dlq主题。以下是发布时的错误。 主题Topic_DLT在60000毫秒后不在元数据中。错误KafkaConsumerDestination{consumerDestination Name='Topic‘,partitions=6,dlqName='TOIC_DLT‘}。container-0-C-1