当前位置: 首页 > 知识库问答 >
问题:

RabbitMQ和Camel:由于“恢复时丢弃消息”导致路由中断

奚曦哲
2023-03-14
<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route> 
        <from uri="spring-amqp:KipcastDirect:KipcastQueue:KipcastRouting?type=direct&amp;autodelete=true&amp;durable=true" />
        <log message="Message received!!! "/> 
        <to   uri="spring-amqp:KipcastDirect2:TestQueue:KipcastRouting2?type=direct&amp;autodelete=false&amp;durable=true" />
    </route>
</camelContext>

<rabbit:connection-factory id="amqpConnectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="amqpConnectionFactory" message-converter="messageConverter" exchange="KipcastBean" />
<rabbit:admin connection-factory="amqpConnectionFactory"/>

<bean id="amqpConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <property name="host" value="10.211.55.20"/>
    <property name="port" value="5672"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="virtualHost" value="/"/>
</bean>

<bean id="messageConverter" class="amqp.spring.converter.XStreamConverter"/>
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.20");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare("KipcastDirect", "direct", 
       true,    /* durable */
       true,    /* autodelete */
       null);   /* */

byte[] messageBodyBytes = "Hello, world!".getBytes();

AMQP.BasicProperties.Builder bob = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties minBasic = bob.build();
minBasic = bob.priority(0).messageId("Test").build(); 
minBasic = bob.priority(0).deliveryMode(1).build(); 

while (true) {

    channel.basicPublish("KipcastDirect", "KipcastRouting", minBasic, messageBodyBytes);
    System.out.println(" [x] Sent ");

}
[     SimpleAsyncTaskExecutor-1] SpringAMQPConsumer             WARN  Caused by: [org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException - Listener threw exception]
org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:590)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:529)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:472)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:56)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:103)[spring-rabbit-1.0.0.RELEASE.jar:]
    at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)[:1.6.0_37]
    at java.lang.reflect.Method.invoke(Method.java:597)[:1.6.0_37]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor$MethodInvocationRetryCallback.doWithRetry(StatefulRetryOperationsInterceptor.java:173)[spring-retry-1.0.0.RELEASE.jar:]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:239)[spring-retry-1.0.0.RELEASE.jar:]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:186)[spring-retry-1.0.0.RELEASE.jar:]
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:145)[spring-retry-1.0.0.RELEASE.jar:]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at $Proxy46.invokeListener(Unknown Source)[:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:560)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:452)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:436)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:420)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$200(SimpleMessageListenerContainer.java:56)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:505)[spring-rabbit-1.0.0.RELEASE.jar:]
    at java.lang.Thread.run(Thread.java:680)[:1.6.0_37]
[     SimpleAsyncTaskExecutor-1] erationsInterceptorFactoryBean WARN  Message dropped on recovery: (Body:'Hello, world!'; ID:Test; Content:text/plain; Headers:{}; Exchange:KipcastDirect; RoutingKey:KipcastRouting; Reply:null; DeliveryMode:NON_PERSISTENT; DeliveryTag:2)
org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:590)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:529)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:472)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:56)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:103)[spring-rabbit-1.0.0.RELEASE.jar:]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.6.0_37]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)[:1.6.0_37]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)[:1.6.0_37]
    at java.lang.reflect.Method.invoke(Method.java:597)[:1.6.0_37]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor$MethodInvocationRetryCallback.doWithRetry(StatefulRetryOperationsInterceptor.java:173)[spring-retry-1.0.0.RELEASE.jar:]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:239)[spring-retry-1.0.0.RELEASE.jar:]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:186)[spring-retry-1.0.0.RELEASE.jar:]
    at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:145)[spring-retry-1.0.0.RELEASE.jar:]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
    at $Proxy46.invokeListener(Unknown Source)[:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:560)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:452)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:436)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:420)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$200(SimpleMessageListenerContainer.java:56)[spring-rabbit-1.0.0.RELEASE.jar:]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:505)[spring-rabbit-1.0.0.RELEASE.jar:]
    at java.lang.Thread.run(Thread.java:680)[:1.6.0_37]

那有什么不好?我必须生成消息ID的原因是什么?

共有1个答案

邓俊材
2023-03-14

问题与messageConverter bean有关:

<bean id="messageConverter" class="amqp.spring.converter.XStreamConverter"/>

已被替换为

<bean id="jsonMessageConverter" class="amqp.spring.converter.XStreamConverter"/>
<bean id="textMessageConverter" class="amqp.spring.converter.StringConverter"/>
<bean id="messageConverter" class="amqp.spring.converter.ContentTypeConverterFactory">
    <property name="converters">
        <map>
            <entry key="application/json" value-ref="jsonMessageConverter"/>
            <entry key="application/xml" value-ref="textMessageConverter"/>
        </map>
    </property>
    <property name="fallbackConverter" ref="textMessageConverter"/>
</bean>

这样就可以解决问题,并正确地路由消息。

 类似资料:
  • 我一直试图使用2.12.1-snapshot中的RabbitMQComponent版本让camel进行路由。这样做,我可以很容易地消费,但在路由到另一个队列时会遇到ad问题。 在这篇文章中,我已经验证了指定的交换机是否配置了适当的路由密钥。我注意到,我能够大量消费,但不能生产到out.queue。 以下是对处理消息的RabbitMQProducer的唯一引用。 我花了很多时间研究了RabbitMQ

  • 我已经实现了如下所示的示例Spring Dynamic Destination 在rabbitmq中,它动态地创建一个交换,但没有提供绑定或路由密钥的选项。我的要求是用路由密钥向这个动态创建的exchange发送消息。我需要如何实现这一点来设置路由密钥?

  • 我正在使用ActiveMQ Artemis 2.17.0,并且面临路由问题。 我实现了一个插件,它记录了before消息路由,我看到一些消息从路由到。 没有转移设置,主题和队列由生产者和消费者动态创建。有一个将目标映射到虚拟主题的设置 和都是有效的主题,但它们不应该被链接。 什么能解释这种行为?

  • 我正在考虑将Socket.io集成到一个express应用程序中。 js有一个非常好的特性,可以通过socket.io消息调用快速路由。 不过,帆在其他方面比我需要的要多一点。我正在寻找一种方法,使socket.io请求转发到快速路由,而不必使用整个sails框架。我想这是一个很常见的需求,所以我很惊讶我没有找到一个npm模块来做这件事,但是找了很长时间,我什么也没有找到。Express.io会这

  • 当试图在一条路由中使用两个以上nettyendpoint时,我会遇到以下异常。 IllegalStateException:I/O线程中的await*()会导致死锁或性能突然下降。改用addListener(),或者从另一个线程调用await*()。

  • 我是Spring集成的新手,正在研究一个从单个通道向多个通道发送消息的示例,从这个角度来看,为每个通道使用Redis消息存储,目的是不丢失任何消息。要求将消息发送到通道-replyChannel、mailChannel和dbChannel。目前,代码只打印sysout语句,没有主要功能。 为了检查消息是否被正确路由,我编写了一个java测试类来发送15条消息。 检查输出,我发现一些消息正在丢失。也