当前位置: 首页 > 知识库问答 >
问题:

Apache ActiveMQ上具有持久订阅的WSO2 ESB 4.8 JMS代理

锺离晗昱
2023-03-14

我有一个持久订阅ActiveMQ的JMS代理,在WSO2 ESB V4.5.1和V4.7中运行良好

我试图在WSO2 ESB V4.8.0(里程碑3或4)中使用它,但没有成功

<proxy xmlns="http://ws.apache.org/ns/synapse" name="TestJMSProxy" transports="jms" startOnLoad="true" trace="disable">
    <target>
        <inSequence>
          <log level="full"/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
    <parameter name="transport.jms.ContentType">
        <rules>
            <jmsProperty>contentType</jmsProperty>
            <default>text/xml</default>
        </rules>
    </parameter>
    <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
    <parameter name="transport.jms.DestinationType">topic</parameter>
    <parameter name="transport.jms.SubscriptionDurable">true</parameter>
    <parameter name="transport.jms.Destination">MY_TOPIC</parameter>
    <parameter name="transport.jms.DurableSubscriberName">TestJMSProxy</parameter>
    <parameter name="transport.jms.CacheLevel">consumer</parameter>
    <parameter name="transport.jms.DurableSubscriberClientID">1</parameter>
</proxy>

/repository/conf/axis2/axis2.xml中的JMS配置:

<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
    <parameter name="myTopicConnectionFactory" locked="false">
        <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
      <parameter name="java.naming.provider.url" locked="false">failover:tcp://localhost:61616</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
          <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
    </parameter>

    <parameter name="myQueueConnectionFactory" locked="false">
        <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
      <parameter name="java.naming.provider.url" locked="false">failover:tcp://localhost:61616</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
          <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
    </parameter>

    <parameter name="default" locked="false">
        <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
      <parameter name="java.naming.provider.url" locked="false">failover:tcp://localhost:61616</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
          <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
    </parameter>
</transportReceiver>

-->我一直使用tcp://localhost:61616和nio://localhost:61616来实现sames效果

在ESB控制台中看到的错误:

TID: [0] [ESB] [2013-10-25 12:04:00,077] ERROR {org.apache.axis2.transport.base.threads.NativeWorkerPool} -  Uncaught exception {org.apache.axis2.transport.base.threads.NativeWorkerPool}
org.apache.axis2.transport.jms.AxisJMSException: Error acquiring a JMS connection to : TopicConnectionFactory using JNDI properties : {java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory, serviceType=proxy, transport.jms.ConnectionFactoryType=topic, transport.jms.ConnectionFactory=myTopicConnectionFactory, transport.jms.DestinationType=topic, java.naming.provider.url=failover:tcp://localhost:61616}
    at org.apache.axis2.transport.jms.ServiceTaskManager.handleException(ServiceTaskManager.java:1000)
    at org.apache.axis2.transport.jms.ServiceTaskManager.access$2500(ServiceTaskManager.java:50)
    at org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.createConnection(ServiceTaskManager.java:830)
    at org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.getConnection(ServiceTaskManager.java:703)
    at org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.receiveMessage(ServiceTaskManager.java:493)
    at org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:417)
    at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)
2013-10-25 12:03:58,905 | WARN  | Failed to add Connection ID:JMROUSSEL7-64437-1382695110115-21:1 | org.apache.activemq.broker.TransportConnection | ActiveMQ Transport: tcp:///127.0.0.1:64803@61616
javax.jms.InvalidClientIDException: Broker: localhost - Client: 1 already connected from tcp://127.0.0.1:64802
    at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:243)
    at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:92)
    at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:90)
    at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:92)
    at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:92)
    at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:97)
    at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:733)
    at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:79)
    at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)
    at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:292)
    at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:149)
    at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
    at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
    at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
    at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
    at java.lang.Thread.run(Thread.java:662)

您只需在JMS代理中更改这一行,它就可以在WSO2 ESB V4.8中完美地工作: false

在WSO2 ESB 4.8中,从队列或消息存储/消息处理器读取的JMS代理可以很好地工作

共有1个答案

洪开济
2023-03-14

这是一个bug,我创建了这个jira:https://wso2.org/jira/browse/esbjava-2638,它将在ESB的最终4.8.0版本中解决

 类似资料:
  • 我想配置一个持久主题,但我想配置Apache ActiveMQ Artemis将为该主题的非活动持久订阅服务器保留消息的时间。 例如,类似“为非活动的持久订阅服务器保存持久消息长达30秒”的内容。如果订阅服务器在30秒内没有变为活动状态,则当订阅服务器变为活动状态时,消息将不再可用。

  • 我刚刚开始使用activemq,我有一个关于追溯消费者的问题,为了启用此功能,您需要有一个持久的订阅。但是,在主题上启用和不启用追溯的持久订阅有什么区别?活跃的mq文档说。 http://activemq.apache.org/retroactive-consumer.html 追溯性使用者只是一个普通的 JMS Topic 使用者,它指示在订阅开始时,每次尝试都应该用于返回时间并发送使用者可能错

  • 我正在尝试使用JMS在Azure Service Bus上创建非持久主题订阅。此功能最近已用于高级服务总线层。 我知道连接本身不是问题,因为我可以用这个连接工厂在主题中发布。我想知道是否有义务使用服务总线连接工厂或是否有解决方案。

  • 显示相同数据的多个组件 访问数据的单一服务 现在,当通过Observables接收数据时,我有两种选择: a)订阅observable以获取一次数据,并再次订阅以获取更新

  • 我们有一个ActiveMQ代理,它使用JMS、AMQP和MQTT从非常不同的客户端连接到。出于某种原因,我们还没有弄清楚一组特定的MQTT客户端经常(不总是)持久订阅。这是一个测试环境,客户端经常被添加和删除,后者有时通过拔掉插头或重新启动嵌入式设备,因此它们无法正确取消订阅。效果(IIUC)是代理为可能再也见不到的设备堆积“离线持久订阅”(我可以在超文本传输协议下看到这些),永远保留关于这些主题

  • 在我们的业务需求中,我们需要将更新传输到分布在全国各地的数千个客户端。问题是,许多这些客户端使用3G网络连接到我们,因此,发生了许多连接/断开连接...我们需要提供的更新是诸如“企业A不能再兑现”或“企业B能够再次兑现”之类的东西,我们正在考虑使用ActiveMQ持久主题来提供这些更新。我的理解是,一旦客户端连接到持久主题,即使他断开连接,每当他回来时,他都会在脱机时收到发送到该主题的消息。最大的