当前位置: 首页 > 知识库问答 >
问题:

使用Apache Camel ConsumerTemplate轮询来自ActiveMQ Artemis的字节/大型消息

越运锋
2023-03-14

当通过JMS连接到ActiveMQ Artemis时,我正在与一个基于Apache Camel的应用程序中的一个问题作斗争。在其中一个Camel路由的末尾,消息存储在Artemis JMS队列中。在同一应用程序中运行的遗留组件使用ConsumerTemplate定期从那里提取它们。

这对带有纯文本体的骆驼消息很好,但在使用字节数组体时会导致错误:Artemis似乎将任何带有字节体的消息视为“大消息”,这些消息是流式传输的,而不是保存在内存中。通过ConsumerTemplate接收可以工作,但是一旦访问主体或头,就会引发如下异常

org.apache.camel.RuntimeCamelException: Failed to extract body due to: javax.jms.IllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session. Message: ActiveMQMessage[ID:90c4d1d5-3233-11ea-b0cc-44032c68a56f]:PERSISTENT/ClientLargeMessageImpl[messageID=2974, durable=true, address=mytest,userID=90c4d1d5-3233-11ea-b0cc-44032c68a56f,properties=TypedProperties[firedTime=Wed Jan 08 17:26:03 CET 2020,__AMQ_CID=90b4f34e-3233-11ea-b0cc-44032c68a56f,breadcrumbId=ID-NB045-evolit-co-at-1578500762151-0-1,_AMQ_ROUTING_TYPE=1,_AMQ_LARGE_SIZE=3]]
        at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:172) ~[camel-jms-2.22.1.jar:2.22.1]
        at org.apache.camel.component.jms.JmsMessage.createBody(JmsMessage.java:221) ~[camel-jms-2.22.1.jar:2.22.1]
        at org.apache.camel.impl.MessageSupport.getBody(MessageSupport.java:54) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.example.cdi.JmsPoller.someMethod(JmsPoller.java:36) ~[classes/:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_171]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_171]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_171]
        at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:481) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:300) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:273) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:188) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:53) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.bean.BeanProducer.process(BeanProducer.java:41) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148) ~[camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) [camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) [camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) [camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:197) [camel-core-2.22.1.jar:2.22.1]
        at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:79) [camel-core-2.22.1.jar:2.22.1]
        at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_171]
        at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_171]
Caused by: javax.jms.IllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session
        at org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:273) ~[artemis-core-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.saveToOutputStream(ClientLargeMessageImpl.java:115) ~[artemis-core-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.saveToOutputStream(ActiveMQMessage.java:853) ~[artemis-jms-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.setObjectProperty(ActiveMQMessage.java:693) ~[artemis-jms-client-2.6.2.jar:2.6.2]
        at org.apache.camel.component.jms.JmsBinding.createByteArrayFromBytesMessage(JmsBinding.java:251) ~[camel-jms-2.22.1.jar:2.22.1]
        at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:163) ~[camel-jms-2.22.1.jar:2.22.1]
        ... 21 more
Caused by: org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session
        at org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:273) ~[artemis-core-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.saveToOutputStream(ClientLargeMessageImpl.java:115) ~[artemis-core-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.saveToOutputStream(ActiveMQMessage.java:853) ~[artemis-jms-client-2.6.2.jar:2.6.2]
        at org.apache.activemq.artemis.jms.client.ActiveMQMessage.setObjectProperty(ActiveMQMessage.java:693) ~[artemis-jms-client-2.6.2.jar:2.6.2]
        at org.apache.camel.component.jms.JmsBinding.createByteArrayFromBytesMessage(JmsBinding.java:251) ~[camel-jms-2.22.1.jar:2.22.1]
        at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:163) ~[camel-jms-2.22.1.jar:2.22.1]
        ... 21 more

对于不超过Artemis的minlargeMessageSize的消息也会出现此问题,即使在测试程序中只有3个字节。

巧合的是,在用于测试应用程序的独立应用程序中也出现了同样的问题。在那里,我能够通过保持JMS会话和接收器打开,直到完全读取JMS消息正文和消息头来解决这个问题。对于Camel,它在Camel所基于的Springjmstemplate中被抽象出来。

我查阅了Camel JMS组件的用户文档,以找到可能对我有所帮助的配置选项。我尝试了以下方法:

    null
    使用者端的
  • MessageConverterMapJMSMessage:没有影响,它们在会话已经关闭时执行
  • Alwayscopymessage在生产者端:我认为复制可能会阻止流式大消息的使用,没有影响
  • StreamMessageTypeEnabled=false在生产者端:没有影响
  • jmsmessageType=bytes在生产者和消费者端:没有影响
  • transferExchange=true:这似乎解决了我的具体情况,但感觉像是一种变通方法。文档建议谨慎使用该选项。

所以现在,transferExchange似乎是我最好的选择,假设它真的解决了我在所有测试用例中的问题。尽管如此,我还是很乐意对这个问题或不同的解决方案有更好的理解:

  1. 为什么Artemis将小字节数组消息视为大消息?
  2. Camel ConsumerTemplate支持流式大消息吗?

我的版本是Camel 2.22.1和Artemis 2.10.1。

我可以通过修改Camel的发布包中的Camel Examplecamel-example-cdi来重现我的问题,使其具有如下所示的最小类。此外,我还添加了camel-jms和Artemis依赖项,并在本地启动了Artemis,这两者都类似于camel-example-artemis-large-messages示例中所描述的。

public class MyRoutes extends RouteBuilder {

    @Override
    public void configure() {
        setupJmsComponent();

        from("timer:writeTimer?period=6000")
                .log("writing to JMS")
                .setBody(() -> new byte[]{0,1,2})
                .to(JmsPoller.ENDPOINT);

        from("timer:pollTimer?period=3000")
            .to("bean:jmsPoller");
    }

    private void setupJmsComponent() {
        ActiveMQJMSConnectionFactory connectionFactory = new ActiveMQJMSConnectionFactory("tcp://localhost:61616");
        JmsComponent jmsComponent = new JmsComponent();
        jmsComponent.setConnectionFactory(connectionFactory);
        getContext().addComponent("jms", jmsComponent);
    }

}
@Singleton
@Named("jmsPoller")
public class JmsPoller {
    static final String ENDPOINT = "jms:queue:mytest";

    @Inject
    private ConsumerTemplate consumerTemplate;

    public void someMethod(String body) {
        Exchange exchange = consumerTemplate.receive(ENDPOINT, 1000L);
        System.out.println("Received " + (exchange == null ? null : exchange.getIn().getBody()));
    }

}

共有1个答案

乐正穆冉
2023-03-14

ActiveMQ Artemis不会将任何带有字节正文的消息视为“大”消息。值得注意的是,代理最终将所有消息体视为字节数组,因为它们就是这样。然而,为了被认为是“大”消息必须超过一定的大小。文件指出:

任何大于一定大小的消息都被视为大消息。大型消息将被拆分并以片段形式发送。这是由URL参数MinlargeMessageSize确定的。

注:

因此,我建议您在读取正文之前保持会话打开,或者增加发送消息的应用程序URL上的minlargeMessageSize,这样就不会认为消息是“大的”。后一个选项可能会导致代理上的内存占用更多,因为整个消息正文将同时保存在内存中。

 类似资料:
  • 我是Kafka新手,我正在使用Kafka1.0。 我使用拉取模式读取kafka消息,也就是说,我定期查看Kafka主题以获取新消息,但我没有将偏移量写回Kafka。 我会问Kafka如何知道我消耗了哪些偏移量,或者Kafka记住进度的机制是什么(Kafka偏移量)

  • 问题内容: 我正在尝试与ajax,jquery,php和mysql进行基于长轮询的聊天,但是似乎有些错误(也是我对长轮询的新手)。 index.php: 和poll.php 它在index.php和poll.php中均未显示任何错误,但是当我插入ID大于old_msg_id的数据时,没有任何反应。 问题答案: 将您的poll.php文件中的代码更改为以下内容:

  • 我们使用、和,并使用和进行配置。现在我们需要迁移到函数方法。和可以使用消费者bean和供应商bean进行转换,但是我们不能迁移。因为如果我们使用,函数将被禁用,因此我们无法进行迁移。我们尝试研究,但它似乎只适用于具有有限流的反应性供应商。有什么办法做这件事吗?参考:https://cloud.spring.io/spring-cloud-stream/reference/html/spring-c

  • 我对Kafka很陌生,对它有一些疑问。我已经配置了一个kafka消费者来消费来自主题的消息,并且我有不同类型的事件进入主题。f、 e、。 我想配置不同的kafka监听器来消费不同类型的事件。我认为有两种方法可以做到这一点,比如使用字符串(json)格式的事件,转换成事件对象,在不同类型之间切换,执行业务逻辑,或者配置不同的kafka监听器工厂 因此,第一种方法不是 ,对于第二种方法,我需要创建许多

  • 问题内容: (对不起,还有很长的问题!)我最近一直在尝试将Go(而不是C ++)用于我作为辅助项目正在开发的游戏服务器模拟器,并质疑我是否以明智的Go术语来实现它。如您所料,服务器通过发送符合特定协议规范的原始数据包(TCP)与一个或多个游戏客户端进行通信。相关部分如下所示: 接收标头->解密->接收字节,直到达到标头长度->解密其余数据包->分派给处理程序->解码数据包->根据需要处理->发送响

  • 线程名称:线程组1-1示例开始时间:2019-09-11 18:52:42英国夏令时加载时间:0连接时间:0延迟时间:0大小以字节为单位:0发送字节:0头大小以字节为单位:0主体大小以字节为单位:0示例计数:1错误计数:1数据类型(“text”“bin”“”):文本响应代码:000响应消息:javax.naming.nameNotFoundException:DynamicQueue/MyQueu