当前位置: 首页 > 知识库问答 >
问题:

从Kafka到JMS的骆驼路线不工作(JMS->Kafka->JMS)

伏子辰
2023-03-14
from("jms:topic:test.source.topic?asyncConsumer=true")
            .log("Message: ${body}")
            .to("kafka:testing?brokers=192.168.0.100:9092");

from("kafka:testing?brokers=192.168.0.100:9092")
            .log("Message received from Kafka : ${body}")
            .log("    on the topic ${headers[kafka.TOPIC]}")
            .log("    on the partition ${headers[kafka.PARTITION]}")
            .log("    with the offset ${headers[kafka.OFFSET]}")
            .log("    with the key ${headers[kafka.KEY]}")
            // manually set JMSDeliveryMode (1 - NON_PERSISTENT, 2 - PERSISTENT)
            .process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    exchange.getIn().setHeader("JMSDeliveryMode", "1");
               }
            })
            .to("jms:topic:test.sink.topic");
ERROR 28642 --- [aConsumer[testing]] o.a.c.p.e.DefaultErrorHandler: Failed delivery for (MessageId: ID-PCID on ExchangeId: ID-PCID). Exhausted after delivery attempt: 1 caught: org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: AMQ139015: Illegal deliveryMode value: 0

org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: AMQ139015: Illegal deliveryMode value: 0
    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:311) ~[spring-jms-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:185) ~[spring-jms-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:507) ~[spring-jms-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:525) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:438) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:392) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:155) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:168) ~[camel-base-3.4.0.jar:3.4.0]
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:395) ~[camel-base-3.4.0.jar:3.4.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148) ~[camel-base-3.4.0.jar:3.4.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60) ~[camel-base-3.4.0.jar:3.4.0]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:147) ~[camel-base-3.4.0.jar:3.4.0]
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:286) ~[camel-base-3.4.0.jar:3.4.0]
    at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[camel-base-3.4.0.jar:3.4.0]
    at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:40) ~[camel-support-3.4.0.jar:3.4.0]
    at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:346) ~[camel-kafka-3.4.0.jar:3.4.0]
    at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:222) ~[camel-kafka-3.4.0.jar:3.4.0]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: javax.jms.JMSException: AMQ139015: Illegal deliveryMode value: 0
    at org.apache.activemq.artemis.jms.client.ActiveMQMessage.setJMSDeliveryMode(ActiveMQMessage.java:450) ~[artemis-jms-client-2.12.0.jar:2.12.0]
    at org.apache.camel.component.jms.JmsMessageHelper.setJMSDeliveryMode(JmsMessageHelper.java:435) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsBinding.appendJmsProperty(JmsBinding.java:393) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsBinding.appendJmsProperties(JmsBinding.java:371) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:346) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:325) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:561) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.lambda$send$0(JmsConfiguration.java:527) ~[camel-jms-3.4.0.jar:3.4.0]
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:504) ~[spring-jms-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    ... 19 common frames omitted

当从JMS向Kafka发送消息时,路由运行良好。

共有1个答案

傅自明
2023-03-14

以下是相关的错误消息:

AMQ139015: Illegal deliveryMode value: 0

此错误消息表示有东西调用了JMS方法javax.JMS.message#setJMSDeliveryMode,其值为00的值无效。有效的交付模式值由javax.jms.DeliveryMode定义:

  • 1(对于非持久性)
  • 2(用于持久化)。
 类似资料:
  • 我正在使用Apache Camel使用来自kafka主题的消息,然后处理该消息,同时处理如果发生异常,我将该消息重定向到另一个kafka主题,并以单独的路由处理该消息。所以我有一个如下所示的路线。 上面的代码实际上是以相同的kafka(kafka1)发送错误消息。 我通过在进程中设置解决了这一问题。这是预期的行为吗?它为什么忽略kafka2而使用kafka1? > 使用的 版本的camel-2.1

  • 我对骆驼很陌生,如果这很明显,请原谅。 我们正在尝试设置一条骆驼路线(在 talend esb 中),它执行以下操作: < li >通过JMS接收消息 < li >数据库更新 < li >通过JMS使用请求/回复将消息发送到另一个系统 < li >使用回复中的信息进行另一次数据库更新 这一切都在一条路线上。我发现该路由不再接受 1 中的任何消息。当它正在等待 3 中的回复时。 我曾尝试在JMS组件

  • 我有一个从JMS队列读取项目并将其写入数据库的路径。 我已经阅读了关于ApacheCamelJMS组件的文档,但我没有得到我的问题的确切和明确的答案,即“如果路由中出现异常,JMS消费者是否会重新插入项目或解锁JMS队列中的消息?”。 谢谢 阿里

  • 我有一个Spring Boot2.25.1应用程序,它使用Camel 2.25.1与camel-kafka,一切都正常工作…在我的Kafka消费者中,我需要添加该功能以按需暂停消费,因此我升级到camel 3.18.1,以便我可以使用可暂停功能。升级到3.18.1后,我收到错误FileNotes与类文件TimeoutAwareAggregationStategy.class. 当我打开camel-

  • 我最近注意到Camel现在有自己的Kafka组件,所以我决定给它一个旋转。 我决定尝试一个很好的简单文件->kafka主题如下...

  • 我有一个驼峰endpoint,基本上是Kafka消费者从一个主题中读取信息并将其发送到数据库。它工作得很好,但是,我很难对它进行单元测试,因为我无法模拟Kafkaendpoint。有谁能帮我在骆驼路线上嘲笑Kafka的消费者吗?