当前位置: 首页 > 知识库问答 >
问题:

春云流Kafka流DLQ

孔征
2023-03-14

我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。

在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制:

spring:
  cloud:
    stream:
      function:
        definition: processor      
      bindings:         
        processor-in-0:
          destination: input-topic
          consumer:
            dlqName: input-topic-dlq
        processor-out-0:
          destination: output-topic       
      kafka:
        streams:
            binder:
              deserialization-exception-handler: sendToDlq
            configuration:
              metrics.recording.level: DEBUG
            brokers:
              - localhost:9092

我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,则将创建该主题。

如果我尝试从DLQ主题消费,我会收到如下错误:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input-topic-dlq --property print.value=true --property print.key=true --from-beginning
[2021-03-19 10:17:09,936] WARN [Consumer clientId=consumer-console-consumer-85295-1, groupId=console-consumer-85295] Error while fetching metadata with correlation id 2 : {input-topic-dlq=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

此时,当我查询Zookeeper ls/brokers/topics时,我看到创建的Topic。

现在,我尝试将非JSON消息发布到输入主题(我的默认反序列化器是JSON)。

但我看不到任何消息中的ininting-Topite-dlq主题创建。

奇怪的是,我可以在默认的“error.input topic dlq.appId”主题中看到消息。

我做错了什么吗?

共有1个答案

柳经纶
2023-03-14

我设法弄清楚了。Spring Cloud Stream Kafka Streams Binder的当前文档中似乎有一个拼写错误。

绑定的目标应该在spring.cloud.streams上。绑定级别类似于您已经拥有的,但特定于实现的使用者属性应该在spring.cloud.streams.kafka.streams上。绑定级别。

因此,您的配置应如下所示:

spring:
  cloud:
    stream:
      function:
        definition: processor      
      bindings:         
        processor-in-0:
          destination: input-topic
        processor-out-0:
          destination: output-topic       
      kafka:
        streams:
            binder:
              deserialization-exception-handler: sendToDlq
            bindings:
              processor-in-0:
                consumer:
                  dlqName: input-topic-dlq
            configuration:
              metrics.recording.level: DEBUG
            brokers:
              - localhost:9092
 类似资料:
  • Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?

  • 我正试图按照GitHub的建议设置测试 其中StreamProcessor设置为 -->line从不使用在我看来应该在主题“output”上的消息,因为@StreamProcessor有@Sendto(“output”) 我希望能够测试流处理的消息。

  • 我想在我的spring boot项目中使用Kafka Streams实时处理。所以我需要Kafka Streams配置,或者我想使用KStreams或KTable,但我在互联网上找不到示例。 我做了制作人和消费者现在我想流实时。

  • 我有一个常见的任务问题,我可以找到任何解决方案或帮助(也许我需要传递一些属性来工作?)我使用本地服务器1.3.0.M2并创建简单的流 在日志中,我得到了这个: 2017-09-28 12:31:00.644 信息 5156 --- [ -C-1] o.. a.k.c.c.internals.AbstractCoordinator : 成功加入第 1 代的组测试 2017-09-28 12:31:0

  • 现在我正在尝试用kafka创建消息服务功能以使用< code > spring-cloud-stream-bind-Kafka ,但效果不太好。 Spring罩1.4.2 当我使用此错误日志启动项目时失败 我在怀疑我的春靴版本。这么低配的版本。< br >我认为< code > spring-cloud-stream-binder-Kafka 在spring boot 2.0版本下无法使用或者其他

  • 当一个DLQ被设置为一个Spring云流Kafka消费者时,DLQ写入的主题可以被分区吗?我有一个要求,使密钥等于一个特定的字段,我想知道这将如何与Spring云流。