<!--Receive data from oracle queue -->
<route id="ReadFromQueue" autoStartup="true">
<from uri="oracleQueue:queue:SCAQUSER.SCD2TC?jmsMessageType=Text" />
<log message="Picked up message ${body}" />
<to uri="file:/e:/Tradechannel/in" />
</route>
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue: Timeout: 1 seconds. Iterations: 1 Interval: 120 Last interval: 1 Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue: wait time: 1 Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue: payload type:SYS.AQ$_JMS_TEXT_MESSAGEpayload type code:2002 Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.getCompliant: Session :false Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsUtil.getTextData: entry Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsUtil.getTextData: textLen: 288 Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsUtil.getTextData: exit Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue: text_message retrieved Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue: set AQ enqueue time as 1576249545000 Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue: msg_id: 999832415E09038AE0535C35EB751E09 corrid: null priority: 1 Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue: msg_delay(secs): 0 Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue: exptime(secs): -1 Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue: attempts: 0 Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue: exception queue: null Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.getCompliant: Session :false Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue: recv_time: 1576746524523 Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.inGlobalTransRechecked: entry Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.inGlobalTransRechecked: oracle.jms.useEmulatedXA is on Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] EmulatedXAHandler.inGlobalTrans: entry, reCheck=true Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] EmulatedXAHandler.checkForGlobalTxn: entry Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] EmulatedXAHandler.inGlobalTrans: exit Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.inGlobalTransRechecked: exit Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.restartConsumers: entry Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.doCommit: acknowledged one message received by committing the database connection Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue: exit
该消息已从队列中删除,但在我的应用程序日志中,我得到一个错误······
2019-12-19 10:08:44:554 o.a.c.c.jms.EndpointMessageListener WARN - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - JMSXGroupSeq] org.apache.camel.RuntimeCamelException: JMSXGroupSeq at org.apache.camel.component.jms.JmsBinding.extractHeadersFromJms(JmsBinding.java:225) at org.apache.camel.component.jms.JmsMessage.populateInitialHeaders(JmsMessage.java:235) at org.apache.camel.impl.DefaultMessage.createHeaders(DefaultMessage.java:258) at org.apache.camel.component.jms.JmsMessage.ensureInitialHeaders(JmsMessage.java:220) at org.apache.camel.component.jms.JmsMessage.getHeader(JmsMessage.java:170) at org.apache.camel.impl.DefaultMessage.getHeader(DefaultMessage.java:94) at org.apache.camel.impl.DefaultUnitOfWork.(DefaultUnitOfWork.java:115) at org.apache.camel.impl.DefaultUnitOfWork.(DefaultUnitOfWork.java:75) at org.apache.camel.impl.DefaultUnitOfWorkFactory.createUnitOfWork(DefaultUnitOfWorkFactory.java:34) at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.createUnitOfWork(CamelInternalProcessor.java:695) at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:663) at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:634) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:149) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:113) at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736) at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696) at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Caused by: javax.jms.MessageFormatException: JMS-117: Conversion failed - invalid property type at oracle.jms.AQjmsError.throwMsgFormatEx(AQjmsError.java:452) at oracle.jms.AQjmsMessage.getObjectProperty(AQjmsMessage.java:1584) at org.apache.camel.component.jms.JmsMessageHelper.getProperty(JmsMessageHelper.java:118) at org.apache.camel.component.jms.JmsBinding.extractHeadersFromJms(JmsBinding.java:214) ... 25 common frames omitted
如果我正确理解stacktrace,那么oracle.jms api缺少一个标头JMSXGroupSeq,这并不是不可能的,因为对方没有使用oracle jms api创建消息。他和我都无法查看实际的队列(它在我们共同的客户端),所以我无法检查真正到位的头。
使用自定义headerFilterStrategy没有任何帮助,因为错误就在我到达headerFilterStrategy之前抛出。参见下面从org.apache.camel.component.jms.JMSBinding方法中提取。错误是从jmsMessageHelper.getProperty()引发的
Object value = JmsMessageHelper.getProperty(jmsMessage, name);
if (headerFilterStrategy != null
&& headerFilterStrategy.applyFilterToExternalHeaders(name, value, exchange)) {
continue;
}
第二个问题...为什么Oracle确认消息时好像一切都很好,而实际上消息从未在接收者端成功重新创建?
由于错误消息“转换失败”,我怀疑JMS属性JMSXGroupSeq
的类型无效。
由于Camel只是尝试在jmsMessageHelper.getProperty
中读取这个值,我想这个属性中一定有一些非常奇怪的值。
您是否尝试过使用JMS工具箱之类的工具来窥视这类消息的内部?如果您看到属性值,您可能会建议生产者如何修复它。
但是,您当然可以使用其他东西来使用消息。它不尝试自动读取所有头,但至少做JMS的低级工作。也许Spring JMS会是一个候选者。Camel JMS在引擎盖下使用Spring JMS。
对于jmsxgroupseq
属性,JMS文档表示
JMSXGroupID和JMSXGroupEQ是客户端想要对消息进行分组时应该使用的标准属性。所有提供程序都必须支持它们。除非特别指出,JMSX属性的值和语义是未定义的。
我是这个消息队列的新手,刚刚开始学习一些基本的东西。 因此,对于我们的Spring Boot应用程序,我们遵循了contoller talks to service&service talks to repository这样的体系结构,所以在这里,我必须创建一个控制器,它将接受类DTO作为json,并将这些信息发布到apache Camel中指定的消息队列。我在跟踪这个链接!对于我的参考,工作良好
我是activeMQ的新手,在将消息从驻留在另一台服务器上的消息生成器推送到activeMQ定义的队列时遇到问题。 我在activeMQ上使用camel routes创建的应用程序中有几个队列。我尝试从另一台服务器上的应用程序对这些队列执行远程JNDI查找。我使用了来自http://activemq.apache.org/jndi-support.html页面的activemq文档片段。 我可以连
我有一个使用Spring和RabbitMQ的项目设置。目前,我的应用程序可能会收到一条amqp消息,在另一个异步进程完成之前无法处理该消息(遗留和完全分离,我无法控制)。因此,结果是我可能不得不等待处理消息一段时间。其结果是变压器出现异常。 当消息被NACK回rabbitMQ时,它会将其放回队列的头部,并立即重新拉入队列。如果我收到的无法处理的消息等于并发侦听器的数量,我的工作流就会被锁定。它转动
生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较
Iam试图用camel IE实现重播机制,我将不得不检索所有已经保存的消息,并转发到适当的camel路由进行重新处理。这将由quartz调度器触发。 我通过使用下面的代码实现了同样的效果。 1)一旦quartz调度器被触发,fwd到处理器,处理器将查询db并将消息作为列表,并在camel exchange属性中设置相同的属性作为列表。2)使用camel,其中LoopProcessor将在excha
我们的环境由3个jboss服务器组成(门户、jms、协调)。 协调服务器托管骆驼路由,该路由具有消耗自队列(SLAQueue)的路由 JMS服务器托管了我们的所有队列 最近,我们发现了一个错误,即托管在JMS服务器上的TaskQueue中的一些消息没有传递到门户服务器上的MDB。由于某些原因,它们被卡住了,当我们重新启动JMS服务器时,卡住的消息被传递 为了进行调查,我们在“org.apache.