当前位置: 首页 > 知识库问答 >
问题:

使用Kafka向Dlq Spring cloud stream发送消息时出错

罗星洲
2023-03-14
<dependencies>
         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
         </dependency>
         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        </dependencies>
        <dependencyManagement>
        <dependencies>
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Edgware.SR3</version>
                <type>pom</type>
                <scope>import</scope>
          </dependency>
          </dependencies>
          </dependencyManagement>

@Component
public class QueueConsumer {

    /** The Constant LOG. */
    public static final Logger LOG = LoggerFactory.getLogger(QueueConsumer.class);

    /** The processor. */
    @Autowired
    private IMessageProcessor processor;

    /**
     * Consume.
     *
     * @param message the message
     */
    @StreamListener(value = OrderEventSink.ORDER_EVENT)
    public void consume(Message<String> message) {
        try {
            processor.process(message);
        } catch (MessageProcessingFailedException e) {
            LOG.error("Error Code "+ e.getCode().getCode() + " " + e.getCode().getDescription(), e);
            throw e;
        }
    }
}
  1. 我正在使用spring cloud stream阅读来自Kafka主题的消息。正在从队列中读取并处理消息,如果消息在处理时失败,则该消息应进入配置的错误队列,但会出现以下错误。
  2. 从消息中提取标题时出现异常,解决此问题的最佳方法是什么?
  3. kafka版本为1.0,kafka客户端为2.11-1.0
     spring.cloud.stream.bindings.orderEvent.destination=orderEvents
     spring.cloud.stream.bindings.orderEvent.content- 
     type=application/json
     spring.cloud.stream.bindings.orderEvent.group=orderEvents-consumer
     spring.cloud.stream.bindings.orderEvent.consumer.back-off- 
     multiplier=5
     spring.cloud.stream.bindings.orderEvent.consumer.back-off-initial- 
     interval=60000
     spring.cloud.stream.bindings.orderEvent.consumer.max-attempts=1
     spring.cloud.stream.bindings.orderEvent.consumer.headerMode=raw
     spring.cloud.stream.bindings.kafka.binder.brokers=localhost
     spring.cloud.stream.bindings.kafka.binder.defaultBrokerPort=9092
     spring.cloud.stream.bindings.kafka.binder.zkNodes=localhost
     spring.cloud.stream.bindings.kafka.binder.defaultZkPort=2181
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     enableDlq=true
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     dlqName=dead-queue
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     dlqProducerProperties.configuration.key.
     serializer=org.apache.kafka.common.serialization.StringSerializer
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     dlqProducerProperties.configuration.value.
     serializer=org.apache.kafka.common.serialization.StringSerializer

共有1个答案

诸葛立果
2023-03-14

这是kafka活页夹1.3.2版本中的一个bug;它固定在主服务器(1.3.3.build-snapshot)上。

顺便说一句,最好的解决方案是使用Spring Boot 2.0.1和SCSt emlhurst.release(由cloud FINCHLEY引入--目前处于M9里程碑)。

这些版本具有对Kafka1.0的原生支持。

 类似资料:
  • 我是Kafka的新手,当我试图发送信息到我得到的主题下面的错误。有人能帮我一下吗? [2018-09-23 13:37:56,613]警告[Producer Clientid=Console-Producer]无法建立到节点-1的连接。代理可能不可用。(org.apache.kafka.clients.NetworkClient)

  • 我有一个应用程序,它定期生成原始JSON消息数组。我能够使用avro-tools将其转换为Avro。我这样做是因为由于Kafka-Connect JDBC接收器的限制,我需要消息包含模式。我可以在记事本上打开这个文件,看到它包括模式和几行数据。 现在,我想将其发送到我的中央Kafka代理,然后使用Kafka Connect JDBC接收器将数据放入数据库。我很难理解我应该如何将这些Avro文件发送

  • 我有一个问题与产生的消息Kafka的主题。 我使用来自外部供应商的Kafka管理服务,所以我问他经纪人的状况,他说一切都好。顺便说一句,它发生在三个不同的Kafka实例上。Kafka客户端版本也无关紧要-0.11.0.0和2.0.1都有。

  • 我用本地安装的Confluent4.0.0尝试了官方模式-注册表-汇合示例(Consumer/Producer),它可以在发送post请求和在listener接收时发送“Sensor”avro消息,但当我使用Confluent4.0.0附带的kafka-avro-console-consumer工具查看发送的avro消息时,该工具引发了以下错误(a)。我还尝试使用kafka-avro-consol

  • null 谁能给我一个向RabbitMQ发送消息的标准程序的例子。我正在使用Spring Boot,也可以使用它的特性。

  • 如何在Kafka中发送同步消息 实现这一点的一种方法是设置properties参数 。 但是我想知道是否有一种甚至直接或替代的方式在Kafka中发送同步消息。(比如producer.sync发送(...)等等)。