<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="spring-amqp:KipcastDirect:KipcastQueue:KipcastRouting?type=direct&autodelete=true&durable=true" />
<log message="Message received!!! "/>
<to uri="spring-amqp:KipcastDirect2:TestQueue:KipcastRouting2?type=direct&autodelete=false&durable=true" />
</route>
</camelContext>
<rabbit:connection-factory id="amqpConnectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="amqpConnectionFactory" message-converter="messageConverter" exchange="KipcastBean" />
<rabbit:admin connection-factory="amqpConnectionFactory"/>
<bean id="amqpConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="host" value="10.211.55.20"/>
<property name="port" value="5672"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="virtualHost" value="/"/>
</bean>
<bean id="messageConverter" class="amqp.spring.converter.XStreamConverter"/>
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.211.55.20");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("KipcastDirect", "direct",
true, /* durable */
true, /* autodelete */
null); /* */
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties.Builder bob = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties minBasic = bob.build();
minBasic = bob.priority(0).messageId("Test").build();
minBasic = bob.priority(0).deliveryMode(1).build();
while (true) {
channel.basicPublish("KipcastDirect", "KipcastRouting", minBasic, messageBodyBytes);
System.out.println(" [x] Sent ");
}
[ SimpleAsyncTaskExecutor-1] SpringAMQPConsumer WARN Caused by: [org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException - Listener threw exception]
org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:590)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:529)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:472)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:56)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:103)[spring-rabbit-1.0.0.RELEASE.jar:]
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)[:1.6.0_37]
at java.lang.reflect.Method.invoke(Method.java:597)[:1.6.0_37]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor$MethodInvocationRetryCallback.doWithRetry(StatefulRetryOperationsInterceptor.java:173)[spring-retry-1.0.0.RELEASE.jar:]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:239)[spring-retry-1.0.0.RELEASE.jar:]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:186)[spring-retry-1.0.0.RELEASE.jar:]
at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:145)[spring-retry-1.0.0.RELEASE.jar:]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at $Proxy46.invokeListener(Unknown Source)[:]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:560)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:452)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:436)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:420)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$200(SimpleMessageListenerContainer.java:56)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:505)[spring-rabbit-1.0.0.RELEASE.jar:]
at java.lang.Thread.run(Thread.java:680)[:1.6.0_37]
[ SimpleAsyncTaskExecutor-1] erationsInterceptorFactoryBean WARN Message dropped on recovery: (Body:'Hello, world!'; ID:Test; Content:text/plain; Headers:{}; Exchange:KipcastDirect; RoutingKey:KipcastRouting; Reply:null; DeliveryMode:NON_PERSISTENT; DeliveryTag:2)
org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:590)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:529)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:472)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:56)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:103)[spring-rabbit-1.0.0.RELEASE.jar:]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.6.0_37]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)[:1.6.0_37]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)[:1.6.0_37]
at java.lang.reflect.Method.invoke(Method.java:597)[:1.6.0_37]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor$MethodInvocationRetryCallback.doWithRetry(StatefulRetryOperationsInterceptor.java:173)[spring-retry-1.0.0.RELEASE.jar:]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:239)[spring-retry-1.0.0.RELEASE.jar:]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:186)[spring-retry-1.0.0.RELEASE.jar:]
at org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor.invoke(StatefulRetryOperationsInterceptor.java:145)[spring-retry-1.0.0.RELEASE.jar:]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)[spring-aop-3.0.7.RELEASE.jar:3.0.7.RELEASE]
at $Proxy46.invokeListener(Unknown Source)[:]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:560)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:452)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:436)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:420)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$200(SimpleMessageListenerContainer.java:56)[spring-rabbit-1.0.0.RELEASE.jar:]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:505)[spring-rabbit-1.0.0.RELEASE.jar:]
at java.lang.Thread.run(Thread.java:680)[:1.6.0_37]
那有什么不好?我必须生成消息ID的原因是什么?
问题与messageConverter bean有关:
<bean id="messageConverter" class="amqp.spring.converter.XStreamConverter"/>
已被替换为
<bean id="jsonMessageConverter" class="amqp.spring.converter.XStreamConverter"/>
<bean id="textMessageConverter" class="amqp.spring.converter.StringConverter"/>
<bean id="messageConverter" class="amqp.spring.converter.ContentTypeConverterFactory">
<property name="converters">
<map>
<entry key="application/json" value-ref="jsonMessageConverter"/>
<entry key="application/xml" value-ref="textMessageConverter"/>
</map>
</property>
<property name="fallbackConverter" ref="textMessageConverter"/>
</bean>
这样就可以解决问题,并正确地路由消息。
我一直试图使用2.12.1-snapshot中的RabbitMQComponent版本让camel进行路由。这样做,我可以很容易地消费,但在路由到另一个队列时会遇到ad问题。 在这篇文章中,我已经验证了指定的交换机是否配置了适当的路由密钥。我注意到,我能够大量消费,但不能生产到out.queue。 以下是对处理消息的RabbitMQProducer的唯一引用。 我花了很多时间研究了RabbitMQ
在学习RabbitMq中,有几种情况我不太知道怎么回事: 情况一 假如我定义了一个队列test,他没有显示绑定交换机,那么会自动绑定到默认交换机,那么这个时候绑定键bindkey是不是队列名字test??现在生产者发了一个消息,假如这个消息没有定义路由键routingKey,那么这个时候消息会被路由到队列test吗? 情况二 假如我定义了一个队列test1和test2,他们也是没有显示绑定交换机,
我已经实现了如下所示的示例Spring Dynamic Destination 在rabbitmq中,它动态地创建一个交换,但没有提供绑定或路由密钥的选项。我的要求是用路由密钥向这个动态创建的exchange发送消息。我需要如何实现这一点来设置路由密钥?
我正在使用ActiveMQ Artemis 2.17.0,并且面临路由问题。 我实现了一个插件,它记录了before消息路由,我看到一些消息从路由到。 没有转移设置,主题和队列由生产者和消费者动态创建。有一个将目标映射到虚拟主题的设置 和都是有效的主题,但它们不应该被链接。 什么能解释这种行为?
我正在考虑将Socket.io集成到一个express应用程序中。 js有一个非常好的特性,可以通过socket.io消息调用快速路由。 不过,帆在其他方面比我需要的要多一点。我正在寻找一种方法,使socket.io请求转发到快速路由,而不必使用整个sails框架。我想这是一个很常见的需求,所以我很惊讶我没有找到一个npm模块来做这件事,但是找了很长时间,我什么也没有找到。Express.io会这
当试图在一条路由中使用两个以上nettyendpoint时,我会遇到以下异常。 IllegalStateException:I/O线程中的await*()会导致死锁或性能突然下降。改用addListener(),或者从另一个线程调用await*()。