当前位置: 首页 > 知识库问答 >
问题:

春云Kafka流:出版到DLQ是失败的与Avro

弘和同
2023-03-14

在使用ErrorHandlingDeserializer处理Avro组合的错误时,我无法发布到Dlq主题。以下是发布时的错误。

主题Topic_DLT在60000毫秒后不在元数据中。错误KafkaConsumerDestination{consumerDestination Name='Topic‘,partitions=6,dlqName='TOIC_DLT‘}。container-0-C-1 o.s.i.h.日志处理程序:250-org.springframework.messaging。MessageHandlingException:消息处理程序[org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder中发生错误$1@49abe531]; 嵌套异常为java.lang.RuntimeException:失败,failedMessage=GenericMessage

这是应用程序.yml

spring:
  cloud:
    stream:
      bindings:
        process-in-0:
          destination: TOPIC
          group: groupID
      kafka:
        binder:
          brokers:
            - xxx:9092
          configuration:
            security.protocol: SASL_SSL
            sasl.mechanism: PLAIN
          jaas:
            loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
            options:
              username: username
              password: pwd
          consumer-properties:
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer            
        bindings:
          process-in-0:
            consumer:
              configuration:
                basic.auth.credentials.source: USER_INFO
                schema.registry.url: registryUrl
                schema.registry.basic.auth.user.info: user:pwd
                security.protocol: SASL_SSL
                sasl.mechanism: PLAIN
              max-attempts: 1
              dlqProducerProperties:
                configuration:
                  basic.auth.credentials.source: USER_INFO
                  schema.registry.url: registryUrl
                  schema.registry.basic.auth.user.info: user:pwd
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
              deserializationExceptionHandler: sendToDlq
              ackEachRecord: true
              enableDlq: true
              dlqName: TOPIC_DLT
              autoCommitOnError: true
              autoCommitOffhtml" target="_blank">set: true

我使用以下依赖关系:

Spring云依赖 - 2021.0.1 Spring
启动启动器父级 - 2.6.3
Spring云流绑定器Kafka卡模式注册表客户端 - 5.3.0 卡

夫卡序列化程序 - 5.3.0

我不知道我到底错过了什么。

共有1个答案

林德惠
2023-03-14

在经历了大量的留档后,我发现为了让Spring完成发布DLQ的工作,我们需要为原始主题和DLT主题拥有相同数量的分区。如果做不到,那么我们需要将dlq分区设置为1或手动提供Dlq分区函数bean。通过提供dlq分区:1,所有消息都将转到分区0。

 类似资料:
  • 我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,

  • 当一个DLQ被设置为一个Spring云流Kafka消费者时,DLQ写入的主题可以被分区吗?我有一个要求,使密钥等于一个特定的字段,我想知道这将如何与Spring云流。

  • Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?

  • 我在Kafka中配置了3个代理运行在不同的端口上。我用的是春云流Kafka 我正在创建一个获得连续数据流的数据管道。我在kafka topic中存储3个代理运行的数据流。到目前为止没有问题。我担心的是假设3个经纪人倒下了5分钟,然后在那个时候我无法获得关于kafka主题的数据。将会有5分钟的数据丢失。从Spring开机我会得到警告 有没有一种方法可以在所有代理都停机时临时存储数据,并在代理再次启动

  • 应用程序A写入用户对象(json)下面的Kafka主题: 应用程序 B 正在尝试使用此用户对象(用户.java驻留在应用程序 B 项目中)与以下 application.yml(多个绑定器): 下面是我的Spring Cloud Stream处理器类的外观: 但是,它没有将Message有效负载转换为“用户”类,而是不断抛出错误,未找到 父类“User事件”。 有什么想法吗?

  • 我是Spring云流的新手。我正在使用兔子MQ粘合剂。问题是当我在主类中使用单个 bean 启动应用程序时,它就成功启动了。但是,如果我在主类中注册了 1 个以上的 bean,则应用程序无法从以下日志启动。 application.yml文件