当前位置: 首页 > 知识库问答 >
问题:

camel jms组件如何使用队列中的消息

裴哲
2023-03-14

我们的环境由3个jboss服务器组成(门户、jms、协调)。

  1. 协调服务器托管骆驼路由,该路由具有消耗自队列(SLAQueue)的路由
  2. JMS服务器托管了我们的所有队列
  3. 最近,我们发现了一个错误,即托管在JMS服务器上的TaskQueue中的一些消息没有传递到门户服务器上的MDB。由于某些原因,它们被卡住了,当我们重新启动JMS服务器时,卡住的消息被传递

为了进行调查,我们在“org.apache.activemq.artemis”上启用了TRACE级别的日志记录。我们注意到我们的骆驼jms组件和jms服务器之间有很多交流。下面列出了一个聊天的例子,类似这样的日志每一秒钟就会被写入一次。

问题:

    < camel jms组件用于从队列中获取消息的机制是什么?JMS组件是否每秒轮询一次队列(拉)?还是JMS组件在消息到达队列时得到通知? < li >该机制与J2EE消息驱动beans不同吗?当消息到达队列时,MDB会得到通知。 < li >根据下面的聊天内容,我认为这是投票。如果是轮询,可以配置轮询窗口吗?我尝试了接收超时选项,但没有成功。

**骆驼JMS组件和JMS服务器之间的日志中的示例喋喋不休:**

2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) createdConsumer ActiveMQSession->ClientSessionImpl [name=eebaf2ec-47b3-11ec-ad21-0242ac110003, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@413c962e, metaData=(jms-session=,)]@72d15942 consumer=org.apache.activemq.artemis.ra.ActiveMQRAMessageConsumer@1dd2e8b
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) addConsumer(org.apache.activemq.artemis.ra.ActiveMQRAMessageConsumer@1dd2e8b)
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl] (Thread-206 (ActiveMQ-remoting-threads-ActiveMQServerImpl::serverUUID=b5ba8675-1a62-11ec-b397-0242ac110002-1886035738)) RemotingConnectionID=eeb5e9d6-47b3-11ec-ad21-0242ac110003 handling packet PACKET(SessionConsumerFlowCreditMessage)[type=70, channelID=12, responseAsync=false, requiresResponse=false, correlationID=-1, packetObject=SessionConsumerFlowCreditMessage, consumerID=5086, credits=1048576]
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) unlock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection] (Thread-206 (ActiveMQ-remoting-threads-ActiveMQServerImpl::serverUUID=b5ba8675-1a62-11ec-b397-0242ac110002-1886035738)) InVMConnection [serverID=0, id=eeb5e9d6-47b3-11ec-ad21-0242ac110003]::packet sent done
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) unlock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) lock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) tryLock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) getUseTryLock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) getUseTryLock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) lock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) receive org.apache.activemq.artemis.ra.ActiveMQRAMessageConsumer@1dd2e8b timeout=120000
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) checkState()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@5e7ed34b{consumerContext=ActiveMQConsumerContext{id=5086}, queueName=jms.queue.SLAQueue}:: receive(120000)
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@5e7ed34b{consumerContext=ActiveMQConsumerContext{id=5086}, queueName=jms.queue.SLAQueue}::receive(120000, false)
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler] (Thread-1288 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@775fa129)) ServerSessionPacketHandler::handlePacket,PACKET(SessionConsumerFlowCreditMessage)[type=70, channelID=12, responseAsync=false, requiresResponse=false, correlationID=-1, packetObject=SessionConsumerFlowCreditMessage, consumerID=5086, credits=1048576]
2021-11-17 16:39:24,612 DEBUG [org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl] (Thread-1288 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@775fa129)) ServerConsumerImpl [id=5086, filter=null, binding=LocalQueueBinding [address=jms.queue.SLAQueue, queue=QueueImpl[name=jms.queue.SLAQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=b5ba8675-1a62-11ec-b397-0242ac110002], temp=false]@3b0e1e9f, filter=null, name=jms.queue.SLAQueue, clusterName=jms.queue.SLAQueueb5ba8675-1a62-11ec-b397-0242ac110002]]::FlowControl::Received 1048576 credits, previous value = 0 currentValue = 1048576
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl] (Thread-1288 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@775fa129)) ServerConsumerImpl [id=5086, filter=null, binding=LocalQueueBinding [address=jms.queue.SLAQueue, queue=QueueImpl[name=jms.queue.SLAQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=b5ba8675-1a62-11ec-b397-0242ac110002], temp=false]@3b0e1e9f, filter=null, name=jms.queue.SLAQueue, clusterName=jms.queue.SLAQueueb5ba8675-1a62-11ec-b397-0242ac110002]]::calling promptDelivery from receiving credits
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler] (Thread-1288 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@775fa129)) ServerSessionPacketHandler::scheduling response::null
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler] (Thread-1288 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@775fa129)) ServerSessionPacketHandler::regular response sent::null

共有1个答案

经国安
2023-03-14

是的,JMS使用者正在轮询。默认情况下,Camel在引擎盖下使用Spring的DefaultMessageListenerContainer。

Spring-Listener本身直接使用JMS API。

Spring使用1秒的默认超时来拉取消息。我不知道你是否真的可以用Camel的< code>receiveTimeout选项来改变这一点。

 类似资料:
  • 我需要使用驼峰路由浏览来自活动mq的消息,而不使用这些消息。 JMS队列中的消息将被读取(仅浏览而不使用)并移动到数据库中,同时确保原始队列保持完整。 公共静态无效检查ReceivedOrders(){ }

  • (要求Workerman版本>=3.3.6) STOMP是一个通讯协议。它是支持大多数消息队列如RabbitMQ、Apollo等。 安装: composer require react/stomp 示例: <?php require_once __DIR__ . '/vendor/autoload.php'; use WorkermanWorker; $worker = new Worker('t

  • (要求Workerman版本>=3.3.6) 安装: composer require react/zmq 示例: <?php require_once __DIR__ . '/vendor/autoload.php'; use WorkermanWorker; $worker = new Worker('text://0.0.0.0:6161'); $worker->onWorkerStart

  • 我刚才看到了三个方法的文档,当我们在工作线程中工作时,它们可以用来在UI线程中执行一段代码。方法有: > public final void runOnUIThread(Runnable action)-在UI线程上运行指定的操作。如果当前线程是UI线程,则立即执行该操作。如果当前线程不是UI线程,则将操作发布到UI线程的事件队列中 public boolean post(Runnable act

  • 我使用了初始的前置和后置条目为0,因为最初在队列中,任何先进入的条目也将成为最后一个条目。然而,我的老师说我应该保持后面的条目为'-1',当插入一个元素到队列中时,首先增加后面的条目索引,然后添加,反对我的代码先添加后增加。我在网上查了一下,直到现在我也不知道我怎么错了。 如果我错了或者我的老师错了,请给我提供信息?