当前位置: 首页 > 知识库问答 >
问题:

春云流兔子活页夹:向DLQ发送批量消息出错

冉永宁
2023-03-14

我正在尝试用rabbit活页夹配置一个Spring-Cloud-Stream应用程序

spring cloud stream : 3.1.4
spring cloud stream rabbit binder : 3.1.4
spring version : 2.5.5

下面是我的配置:

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          listen-in-0:
            consumer:
              queueNameGroupOnly: true
              enable-batching: true
              batch-size: 100
              receive-timeout: 500
              transacted: true
          listen-out-0:
            producer:
              queueNameGroupOnly: true
              transacted: true
              batchingEnabled: false
              enable-batching: false
      bindings:
        listen-in-0:
          destination: test.request
          group: test.request
          consumer:
            batch-mode: true
            requiredGroups: test.request
            maxAttempts: 1
        listen-out-0:
          destination: test.response
          group:  test.response
          producer:
            requiredGroups:  test.response

我的消费者java代码:

@Bean
    public Function<Message<List<Request>>, List<Message<Response>>> listen() {
      ...
    }
}

当没有错误发生时,所有工作正常。但当我模拟一个异常时,我得到了以下异常:

2021-10-27 17:08:31,997 [rs-worker] [test.request-1] WARN  o.s.a.r.l.ConditionalRejectingErrorHandler$DefaultExceptionStrategy []    - Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[B@7891e2d2(byte[18])' MessageProperties [headers={tenant=tenant1, request.id=75c328f6-42a9-4682-952e-46d088a5c09e}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test.request, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-S0u7Mj4EwX13Kzj_CgzGVg, consumerQueue=test.request])
2021-10-27 17:08:31,997 [rs-worker] [test.request-1] ERROR o.s.a.r.l.SimpleMessageListenerContainer []    - Execution of Rabbit message listener failed, and the error handler threw an exception
org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1460)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1744)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1519)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.executeWithList(SimpleMessageListenerContainer.java:1028)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1017)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1767)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1660)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1558)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
    ... 7 common frames omitted
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'test.request.errors'; nested exception is java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.springframework.amqp.core.Message
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:254)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:211)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1500(AmqpInboundChannelAdapter.java:69)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$BatchListener.onMessageBatch(AmqpInboundChannelAdapter.java:481)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1652)
    ... 11 common frames omitted
Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.springframework.amqp.core.Message
    at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder$2.handleMessage(RabbitMessageChannelBinder.java:663)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:178)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    ... 21 common frames omitted

当兔子绑定器尝试将消息发送回 DLQ 时,会引发此错误。

事实上,错误消息有效负载包含一个列表

是否有解决方法,或者是否在库的配置或使用中出错?

谢谢你。

编辑1:

好吧,所以我不能用spring-cloud-stream-rabbit-binder批量消息监听器的DLQ。

我必须自己处理它,那么我如何确保组成我的listene的所有函数之间的事务:

但是要自己处理异常,我必须在组成我的监听器的所有函数中处理异常。我有一个由多个函数组成的监听器,配置如下:

spring:
  cloud:
    function:
      definition: heartbeat;listen|process|send

要在心跳函数中处理事务,没关系,但是如何确保组成侦听|进程|发送的所有函数之间的事务?

实际上,当我在< code>process函数中使用< code>StreamBridge发送消息时,我也遇到了同样的问题。如果发送功能失败,我不希望在“处理”功能中发送的消息被提交。

这是我的java代码处理函数:


@Configuration
public class WorkerProcessor {

    @Bean
    public Function<List<Request>, List<Request>> process(Service service) {
        return (requests) -> service.run(requests);
    }
}
@Component
@Transactional
public class Service {
    StreamBridge bridge;
    IWorker worker;
    public Service(StreamBridge bridge, IWorker worker) {
        this.bridge = bridge;
        this.worker = worker;
    }
    @Transactional
    public List<Request> run(List<Request> requests) {
        requests.forEach(request -> {
            worker.process(request, bridge);
        });
        return requests;
    }
}

共有1个答案

姚昊焱
2023-03-14

批处理侦听器当前不支持在活页夹中重新发布死信。将republishToDlq设置为false。

很难说是否应该支持它(即,即使成功处理了一些记录,也会发送整个批处理),但它不应该抛出<code>ClassCastException</code>。

https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/348

对于批处理侦听器,最好在侦听器本身中处理异常。

 类似资料:
  • 版本:Spring Boot: 1.4.2 .发布春云Deps:布里克斯顿。SR7 这是我的申请。处理器应用程序的属性。 当我启动此应用程序时,将按预期创建事件交换,并将其绑定到名为:events exchange的队列。eventconsumers组(也可以)。但routingKey始终为“#”。我已经尝试了从各种文档中找到的所有选项。我在这里遗漏了什么吗? 我希望这个应用程序只订阅某些消息(我

  • 我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,

  • 我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff

  • 我正在使用spring cloud stream阅读来自Kafka主题的消息。正在从队列中读取并处理消息,如果消息在处理时失败,则该消息应进入配置的错误队列,但会出现以下错误。 从消息中提取标题时出现异常,解决此问题的最佳方法是什么? kafka版本为1.0,kafka客户端为2.11-1.0

  • 我们正在使用spring云配置服务器。Spring配置客户端使用Spring控制总线(RabbitMQ)获取更新。 看起来每个配置客户端实例都会创建一个连接到'spring.cloud.bus'交换的队列。 对有多少应用程序实例可以连接到“spring.cloud.bus”交换有任何可伸缩性限制? 我想RabbitMQ可以扩展来处理这个问题。 寻找这方面的指导方针。 许多感谢,

  • 如何使用新的Spring Cloud Stream Kafka功能模型发送消息? 不推荐的方式是这样的。 但是我如何以函数式风格发送消息呢? 应用yml公司 我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!