Camel RabbitMQ的新手。用Apache Camel写了一个简单的RabbitMQ消费者。
onException(StateException.class).log(LoggingLevel.WARN,"WarnMessage = Error on ID and Status ${body}.")
.asyncDelayedRedelivery().redeliveryDelay(5000).maximumRedeliveries(1)
.setHeader(RabbitMQConstants.REQUEUE, constant(true))
.handled(true)
.setFaultBody(constant(true));
from(STATE__Q_URL).delay(200).log(LoggingLevel.DEBUG,"Incoming Body is --> ${body}")
/* .wireTap("direct:logtofile")*/.to("invokeManager")
.log(LoggingLevel.INFO, "Message = Completed for ${body}");
invokeManager
当前在从队列中弹出一个值后进行一个简单的rest调用。它可以处理大约100条消息,并开始抛出此错误
java.net.SocketException: Connection reset
java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method) ~[na:1.8.0_144]
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) ~[na:1.8.0_144]
at java.net.SocketOutputStream.write(SocketOutputStream.java:155) ~[na:1.8.0_144]
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[na:1.8.0_144]
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) ~[na:1.8.0_144]
at java.io.DataOutputStream.flush(DataOutputStream.java:123) ~[na:1.8.0_144]
at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:177) ~[amqp-client-4.2.1.jar!/:4.2.1]
at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:559) ~[amqp-client-4.2.1.jar!/:4.2.1]
at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:127) ~[amqp-client-4.2.1.jar!/:4.2.1]
at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:396) ~[amqp-client-4.2.1.jar!/:4.2.1]
at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:378) ~[amqp-client-4.2.1.jar!/:4.2.1]
at com.rabbitmq.client.impl.AMQChannel.quiescingRpc(AMQChannel.java:313) ~[amqp-client-4.2.1.jar!/:4.2.1]
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:601) ~[amqp-client-4.2.1.jar!/:4.2.1]
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:534) ~[amqp-client-4.2.1.jar!/:4.2.1]
at com.rabbitmq.client.impl.StrictExceptionHandler.handleChannelKiller(StrictExceptionHandler.java:68) [amqp-client-4.2.1.jar!/:4.2.1]
at com.rabbitmq.client.impl.StrictExceptionHandler.handleConsumerException(StrictExceptionHandler.java:58) [amqp-client-4.2.1.jar!/:4.2.1]
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:154) [amqp-client-4.2.1.jar!/:4.2.1]
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100) [amqp-client-4.2.1.jar!/:4.2.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_144]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
2018-02-25 18:06:17.551 WARN 4155 --- [0.42.31.42:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Connection reset)
我不确定我做错了什么,任何帮助都是感激的。
@Robbo_UK是的,我发现问题是消费者的缓冲区大小。您需要设置几个字段才能解决这个问题
+"&prefetchEnabled=true"
+"&prefetchCount=100"
+"&threadPoolSize="100"
+"&channelPoolMaxSize="100"
我从代码中随机复制了这些值。这会解决你的问题。
因此,这将限制推送到使用者的消息数量,并防止缓冲区溢出。
我有一个场景,我想“拉”RabbitMQ队列/主题的消息,并一次处理一个。特别是当消费者启动时,队列中已经有消息。我尝试了以下方法,但没有成功(这意味着,这些选项中的每一个都会读取队列,直到队列为空,或者直到另一个线程关闭上下文)。 1.第一次处理后立即停止路由 与1类似,但使用闩锁而不是while loop和sleep。 使用轮询消费者 使用ConsumerTemplate()-类似于上面的代码
我在JPA上遇到了以下问题,但这可能更像是一个关于骆驼的概念问题。 我需要一个基于cron的石英消费者。但如果触发了,我想选择JPA组件作为第一步。 但是如果我用“to”调用JPA组件,那么它被用作生产者,而不是消费者。我可以以某种方式使用JPA组件来处理这个问题吗,或者我必须遵循服务激活器(基于bean的)逻辑并将JPA组件留在后面? 提前谢谢你,葛格利
我有一个从JMS队列读取项目并将其写入数据库的路径。 我已经阅读了关于ApacheCamelJMS组件的文档,但我没有得到我的问题的确切和明确的答案,即“如果路由中出现异常,JMS消费者是否会重新插入项目或解锁JMS队列中的消息?”。 谢谢 阿里
我对骆驼生产商有很好的了解,但我不能对各种骆驼消费者保持清醒的头脑。特别是事件驱动消费者和轮询消费者,camel如何知道为这些消费者调用回调? 消费者的一般流量是多少?
我在为 端口设置 消费者以捕获消息时遇到问题。我的: 申请开始: 并且<code>514</code>端口已打开但未侦听 我可以在tcpdump中看到,tcpdump-I eth1-nn-A-s 0端口514和udp正确发送和接收消息。 有人能告诉我我做错了什么吗?
我一直在尝试为Spring引导Kafka骆驼Avro消费者寻找示例代码,但没有运气。我在以下URL找到了Spring Camel Kafka消费者和生产者示例: https://thysmichels.com/2015/09/04/apache-camel-kafka-spring-integration/ 我的具体问题是,一旦我的bean从Avro模式创建,并且我有了POJO类,我如何将上面的c