当前位置: 首页 > 知识库问答 >
问题:

Camel正在使用来自队列的消息,但未能重新创建它

易镜
2023-03-14
<!--Receive data from oracle queue -->
<route id="ReadFromQueue" autoStartup="true">
    <from uri="oracleQueue:queue:SCAQUSER.SCD2TC?jmsMessageType=Text" />
    <log message="Picked up message ${body}" />
    <to uri="file:/e:/Tradechannel/in" />
</route>
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  Timeout: 1 seconds.  Iterations: 1 Interval: 120 Last interval: 1
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  wait time: 1
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  payload type:SYS.AQ$_JMS_TEXT_MESSAGEpayload type code:2002
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.getCompliant:  Session :false
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsUtil.getTextData:  entry
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsUtil.getTextData:  textLen: 288
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsUtil.getTextData:  exit
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  text_message retrieved
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  set AQ enqueue time as 1576249545000
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  msg_id: 999832415E09038AE0535C35EB751E09 corrid: null priority: 1
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  msg_delay(secs): 0
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  exptime(secs): -1
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  attempts: 0
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  exception queue: null
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.getCompliant:  Session :false
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  recv_time: 1576746524523
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.inGlobalTransRechecked:  entry
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.inGlobalTransRechecked:  oracle.jms.useEmulatedXA is on
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] EmulatedXAHandler.inGlobalTrans:  entry, reCheck=true
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] EmulatedXAHandler.checkForGlobalTxn:  entry
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] EmulatedXAHandler.inGlobalTrans:  exit
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.inGlobalTransRechecked:  exit
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsSession.restartConsumers:  entry
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.doCommit:  acknowledged one message received by committing the database connection
Camel (camel-1) thread #4 - JmsConsumer[SCAQUSER.SCD2TC] [Thu Dec 19 10:08:44 CET 2019] AQjmsConsumer.jdbcDequeue:  exit

消息已从队列中删除,但在我的应用程序日志中,我得到一个错误······

2019-12-19 10:08:44:554 o.a.c.c.jms.EndpointMessageListener WARN  - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - JMSXGroupSeq]
org.apache.camel.RuntimeCamelException: JMSXGroupSeq
    at org.apache.camel.component.jms.JmsBinding.extractHeadersFromJms(JmsBinding.java:225)
    at org.apache.camel.component.jms.JmsMessage.populateInitialHeaders(JmsMessage.java:235)
    at org.apache.camel.impl.DefaultMessage.createHeaders(DefaultMessage.java:258)
    at org.apache.camel.component.jms.JmsMessage.ensureInitialHeaders(JmsMessage.java:220)
    at org.apache.camel.component.jms.JmsMessage.getHeader(JmsMessage.java:170)
    at org.apache.camel.impl.DefaultMessage.getHeader(DefaultMessage.java:94)
    at org.apache.camel.impl.DefaultUnitOfWork.(DefaultUnitOfWork.java:115)
    at org.apache.camel.impl.DefaultUnitOfWork.(DefaultUnitOfWork.java:75)
    at org.apache.camel.impl.DefaultUnitOfWorkFactory.createUnitOfWork(DefaultUnitOfWorkFactory.java:34)
    at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.createUnitOfWork(CamelInternalProcessor.java:695)
    at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:663)
    at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:634)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:149)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:113)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: javax.jms.MessageFormatException: JMS-117: Conversion failed - invalid property type
    at oracle.jms.AQjmsError.throwMsgFormatEx(AQjmsError.java:452)
    at oracle.jms.AQjmsMessage.getObjectProperty(AQjmsMessage.java:1584)
    at org.apache.camel.component.jms.JmsMessageHelper.getProperty(JmsMessageHelper.java:118)
    at org.apache.camel.component.jms.JmsBinding.extractHeadersFromJms(JmsBinding.java:214)
    ... 25 common frames omitted

如果我正确理解stacktrace,那么oracle.jms api缺少一个标头JMSXGroupSeq,这并不是不可能的,因为对方没有使用oracle jms api创建消息。他和我都无法查看实际的队列(它在我们共同的客户端),所以我无法检查真正到位的头。

使用自定义headerFilterStrategy没有任何帮助,因为错误就在我到达headerFilterStrategy之前抛出。参见下面从org.apache.camel.component.jms.JMSBinding方法中提取。错误是从jmsMessageHelper.getProperty()引发的

Object value = JmsMessageHelper.getProperty(jmsMessage, name);
if (headerFilterStrategy != null 
 && headerFilterStrategy.applyFilterToExternalHeaders(name, value, exchange)) {
 continue;
 }

第二个问题...为什么Oracle确认消息时好像一切都很好,而实际上消息从未在接收者端成功重新创建?

共有1个答案

管弘
2023-03-14

由于错误消息“转换失败”,我怀疑JMS属性JMSXGroupSeq的类型无效。

由于Camel只是尝试在jmsMessageHelper.getProperty中读取这个值,我想这个属性中一定有一些非常奇怪的值。

您是否尝试过使用JMS工具箱之类的工具来窥视这类消息的内部?如果您看到属性值,您可能会建议生产者如何修复它。

但是,您当然可以使用其他东西来使用消息。它不尝试自动读取所有头,但至少做JMS的低级工作。也许Spring JMS会是一个候选者。Camel JMS在引擎盖下使用Spring JMS。

对于jmsxgroupseq属性,JMS文档表示

JMSXGroupID和JMSXGroupEQ是客户端想要对消息进行分组时应该使用的标准属性。所有提供程序都必须支持它们。除非特别指出,JMSX属性的值和语义是未定义的。

 类似资料:
  • 我是这个消息队列的新手,刚刚开始学习一些基本的东西。 因此,对于我们的Spring Boot应用程序,我们遵循了contoller talks to service&service talks to repository这样的体系结构,所以在这里,我必须创建一个控制器,它将接受类DTO作为json,并将这些信息发布到apache Camel中指定的消息队列。我在跟踪这个链接!对于我的参考,工作良好

  • 我是activeMQ的新手,在将消息从驻留在另一台服务器上的消息生成器推送到activeMQ定义的队列时遇到问题。 我在activeMQ上使用camel routes创建的应用程序中有几个队列。我尝试从另一台服务器上的应用程序对这些队列执行远程JNDI查找。我使用了来自http://activemq.apache.org/jndi-support.html页面的activemq文档片段。 我可以连

  • 我有一个使用Spring和RabbitMQ的项目设置。目前,我的应用程序可能会收到一条amqp消息,在另一个异步进程完成之前无法处理该消息(遗留和完全分离,我无法控制)。因此,结果是我可能不得不等待处理消息一段时间。其结果是变压器出现异常。 当消息被NACK回rabbitMQ时,它会将其放回队列的头部,并立即重新拉入队列。如果我收到的无法处理的消息等于并发侦听器的数量,我的工作流就会被锁定。它转动

  • 生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较

  • Iam试图用camel IE实现重播机制,我将不得不检索所有已经保存的消息,并转发到适当的camel路由进行重新处理。这将由quartz调度器触发。 我通过使用下面的代码实现了同样的效果。 1)一旦quartz调度器被触发,fwd到处理器,处理器将查询db并将消息作为列表,并在camel exchange属性中设置相同的属性作为列表。2)使用camel,其中LoopProcessor将在excha

  • 我们的环境由3个jboss服务器组成(门户、jms、协调)。 协调服务器托管骆驼路由,该路由具有消耗自队列(SLAQueue)的路由 JMS服务器托管了我们的所有队列 最近,我们发现了一个错误,即托管在JMS服务器上的TaskQueue中的一些消息没有传递到门户服务器上的MDB。由于某些原因,它们被卡住了,当我们重新启动JMS服务器时,卡住的消息被传递 为了进行调查,我们在“org.apache.