我正在尝试用rabbit活页夹配置一个Spring-Cloud-Stream应用程序
spring cloud stream : 3.1.4
spring cloud stream rabbit binder : 3.1.4
spring version : 2.5.5
下面是我的配置:
spring:
cloud:
stream:
rabbit:
bindings:
listen-in-0:
consumer:
queueNameGroupOnly: true
enable-batching: true
batch-size: 100
receive-timeout: 500
transacted: true
listen-out-0:
producer:
queueNameGroupOnly: true
transacted: true
batchingEnabled: false
enable-batching: false
bindings:
listen-in-0:
destination: test.request
group: test.request
consumer:
batch-mode: true
requiredGroups: test.request
maxAttempts: 1
listen-out-0:
destination: test.response
group: test.response
producer:
requiredGroups: test.response
我的消费者java代码:
@Bean
public Function<Message<List<Request>>, List<Message<Response>>> listen() {
...
}
}
当没有错误发生时,所有工作正常。但当我模拟一个异常时,我得到了以下异常:
2021-10-27 17:08:31,997 [rs-worker] [test.request-1] WARN o.s.a.r.l.ConditionalRejectingErrorHandler$DefaultExceptionStrategy [] - Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[B@7891e2d2(byte[18])' MessageProperties [headers={tenant=tenant1, request.id=75c328f6-42a9-4682-952e-46d088a5c09e}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test.request, receivedRoutingKey=#, deliveryTag=1, consumerTag=amq.ctag-S0u7Mj4EwX13Kzj_CgzGVg, consumerQueue=test.request])
2021-10-27 17:08:31,997 [rs-worker] [test.request-1] ERROR o.s.a.r.l.SimpleMessageListenerContainer [] - Execution of Rabbit message listener failed, and the error handler threw an exception
org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1460)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1744)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1519)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.executeWithList(SimpleMessageListenerContainer.java:1028)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1017)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1767)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1660)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1558)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
... 7 common frames omitted
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'test.request.errors'; nested exception is java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.springframework.amqp.core.Message
at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:254)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:211)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1500(AmqpInboundChannelAdapter.java:69)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$BatchListener.onMessageBatch(AmqpInboundChannelAdapter.java:481)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1652)
... 11 common frames omitted
Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.springframework.amqp.core.Message
at org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder$2.handleMessage(RabbitMessageChannelBinder.java:663)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:178)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
... 21 common frames omitted
当兔子绑定器尝试将消息发送回 DLQ 时,会引发此错误。
事实上,错误消息有效负载包含一个列表
是否有解决方法,或者是否在库的配置或使用中出错?
谢谢你。
编辑1:
好吧,所以我不能用spring-cloud-stream-rabbit-binder批量消息监听器的DLQ。
我必须自己处理它,那么我如何确保组成我的listene的所有函数之间的事务:
但是要自己处理异常,我必须在组成我的监听器的所有函数中处理异常。我有一个由多个函数组成的监听器,配置如下:
spring:
cloud:
function:
definition: heartbeat;listen|process|send
要在
心跳
函数中处理事务,没关系,但是如何确保组成侦听|进程|发送
的所有函数之间的事务?
实际上,当我在< code>process
函数中使用< code>StreamBridge发送消息时,我也遇到了同样的问题。如果发送功能失败,我不希望在“处理”功能中发送的消息被提交。
这是我的java代码处理函数:
@Configuration
public class WorkerProcessor {
@Bean
public Function<List<Request>, List<Request>> process(Service service) {
return (requests) -> service.run(requests);
}
}
@Component
@Transactional
public class Service {
StreamBridge bridge;
IWorker worker;
public Service(StreamBridge bridge, IWorker worker) {
this.bridge = bridge;
this.worker = worker;
}
@Transactional
public List<Request> run(List<Request> requests) {
requests.forEach(request -> {
worker.process(request, bridge);
});
return requests;
}
}
批处理侦听器当前不支持在活页夹中重新发布死信。将republishToDlq
设置为false。
很难说是否应该支持它(即,即使成功处理了一些记录,也会发送整个批处理),但它不应该抛出<code>ClassCastException</code>。
https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/348
对于批处理侦听器,最好在侦听器本身中处理异常。
版本:Spring Boot: 1.4.2 .发布春云Deps:布里克斯顿。SR7 这是我的申请。处理器应用程序的属性。 当我启动此应用程序时,将按预期创建事件交换,并将其绑定到名为:events exchange的队列。eventconsumers组(也可以)。但routingKey始终为“#”。我已经尝试了从各种文档中找到的所有选项。我在这里遗漏了什么吗? 我希望这个应用程序只订阅某些消息(我
我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,
我正在使用spring cloud stream阅读来自Kafka主题的消息。正在从队列中读取并处理消息,如果消息在处理时失败,则该消息应进入配置的错误队列,但会出现以下错误。 从消息中提取标题时出现异常,解决此问题的最佳方法是什么? kafka版本为1.0,kafka客户端为2.11-1.0
我们有一个要求,我们正在消费来自一个主题的消息,然后发生了一些丰富,然后我们将消息发布到另一个主题。以下是事件 使用者 - 使用消息 扩充 - 扩充使用的消息 制作人 - 已发布 向其他主题发送的丰富消息 我正在使用Spring cloud kafka binder,一切正常。突然,我们观察到生产者正在向主题发送重复的消息,然后我们使生产者是幂等的。为了更好地控制,我们将autocommitOff
我们正在使用spring云配置服务器。Spring配置客户端使用Spring控制总线(RabbitMQ)获取更新。 看起来每个配置客户端实例都会创建一个连接到'spring.cloud.bus'交换的队列。 对有多少应用程序实例可以连接到“spring.cloud.bus”交换有任何可伸缩性限制? 我想RabbitMQ可以扩展来处理这个问题。 寻找这方面的指导方针。 许多感谢,
如何使用新的Spring Cloud Stream Kafka功能模型发送消息? 不推荐的方式是这样的。 但是我如何以函数式风格发送消息呢? 应用yml公司 我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!