当前位置: 首页 > 知识库问答 >
问题:

Qpid客户端Apache Artemis 2.14.0高可用性不工作

袁开宇
2023-03-14

使用Qpid客户端连接Apache Artemis代理以实现高可用性。

代理实例在两个节点中运行,并在broker.xml中列出复制配置

Brokers实例在node1(主)和node2(从)上启动,并且运行时没有任何问题。

驼峰qpid jms客户端使用URL配置为故障切换:(amqp://localhost:5672,amqp://localhost:5673),在执行camel客户端时,上下文启动时没有出现任何问题,并注意到Broker UI控制台中的连接被列为AMQP协议。[配置详情如下]

使用以下配置,一切正常。

为了测试高可用性,我停止了node1上的代理实例,并期望Qpid Camel客户端自动发现node2代理并处理消息。但它没有按预期连接。

但是当我使用带有URL(包括tcp方案连接)的aretmis-jms camel客户端时,我能够成功验证高可用性,当在node1中运行的代理因某种原因停止时,客户端会自动发现node2上的代理。此外,当node1启动备份客户端时,会自动连接到node1。

Qpid客户端无法发现备份代理。以下配置中的任何问题

Master:有关详细配置,请参阅链接

<ha-policy>
   <replication>
      <master>
         <check-for-live-server>true</check-for-live-server>
      </master>
   </replication>
</ha-policy>

从机:

<ha-policy>
   <replication>
      <slave>
         <allow-failback>true</allow-failback>
      </slave>
   </replication>
</ha-policy>

客户端使用camel

<bean id="jmsampqConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
       <property name="remoteURI" value="failover:(ampq://localhost:5672,ampq://localhost:5673)" />
       <property name="username" value="user"/>
       <property name="password" value="pass"/>
   </bean>

  <bean id="jmsPooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop">
    <property name="maxConnections" value="5" />
    <property name="connectionFactory" ref="jmsamqpConnectionFactory" />
  </bean>

  <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="jmsPooledConnectionFactory" />
    <property name="concurrentConsumers" value="5" />
  </bean>

   <bean id="jms" class="org.apache.camel.component.amqp.AMQPComponent">
    <property name="configuration" ref="jmsConfig" />
  </bean>
  
   <bean id="CustomBean1" class="org.specific.process.class" /> 
   <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <route>
      <from uri="jms:queue:enterprise1.queue" />
      <convertBodyTo type="java.lang.String" />
      <bean ref="CustomBean1" method="processCamelExchangeData" />
    </route>
    </camelContext>

Maven依赖

   <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-amqp-starter</artifactId>
      <version>2.23.0</version>
    </dependency>
    <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-jms-client</artifactId>
       <version>0.54.0</version>
     </dependency> 
     <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>artemis-jms-client</artifactId>
      <version>2.14.0</version>
    </dependency>
    <dependency>
      <groupId>org.messaginghub</groupId>
      <artifactId>pooled-jms</artifactId>
      <version>1.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-amqp</artifactId>
      <version>2.23.0</version> 
    </dependency>

日志:

2020-09-17 20:15:18,684: main DEBUG (FailoverProvider.java:153) - Initiating initial connection attempt task
2020-09-17 20:15:18,690: main DEBUG (AbstractJmsListeningContainer.java:382) - Established shared JMS Connection
2020-09-17 20:15:18,692: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@3962ec84
2020-09-17 20:15:18,692: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@147e0734
2020-09-17 20:15:18,693: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@2bdab835
2020-09-17 20:15:18,694: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@7b8aebd0
2020-09-17 20:15:18,695: main DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@55222ee9
2020-09-17 20:15:18,743: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[1] to: amqp://localhost:5672 in-progress
2020-09-17 20:15:19,102: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (SaslMechanismFinder.java:106) - Best match for SASL auth was: SASL-PLAIN
2020-09-17 20:15:19,119: FailoverProvider: async work thread DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsConnectionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1, configuredURI = failover:(amqp://localhost:5672,amqp://localhost:5673), connectedURI = null } (1)
2020-09-17 20:15:19,162: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (AmqpConnectionBuilder.java:84) - AmqpConnection { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1 } is now open:
2020-09-17 20:15:19,164: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (FailoverProvider.java:884) - Processing alternates uris:URI Pool { [amqp://localhost:5673, amqp://localhost:5672] } with new set: [amqp://localhost:61617?amqp.vhost=localhost]
2020-09-17 20:15:19,164: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (FailoverProvider.java:899) - Replacing uris:URI Pool { [amqp://localhost:5673, amqp://localhost:5672] } with new set: [amqp://localhost:5672, amqp://localhost:61617?amqp.vhost=localhost]
2020-09-17 20:15:19,164: AmqpProvider :(1):[amqp://localhost:5672] DEBUG (FailoverProvider.java:913) - Processing alternates done new uris:URI Pool { [amqp://localhost:5672, amqp://localhost:61617?amqp.vhost=localhost] }
2020-09-17 20:15:19,165: AmqpProvider :(1):[amqp://localhost:5672] INFO (JmsConnection.java:1339) - Connection ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1 connected to server: amqp://localhost:5672
2020-09-17 20:15:19,168: main DEBUG (JmsConsumer.java:106) - Started listener container org.apache.camel.component.jms.DefaultJmsMessageListenerContainer@5e9bf744 on destination enterprise1.queue
2020-09-17 20:15:19,168: main INFO (DefaultCamelContext.java:4013) - Route: route1 started and consuming from: jms://queue:enterprise1.queue
2020-09-17 20:15:19,169: main DEBUG (DefaultCamelContext.java:3989) - Route: route2 >>> EventDrivenConsumerRoute[jmsTopic://topic:enterprise2.topic -> Pipeline[[Channel[convertBodyTo[java.lang.String]], Channel[org.apache.camel.component.bean.BeanProcessor@69d103f0]]]]
2020-09-17 20:15:19,169: main DEBUG (DefaultCamelContext.java:3993) - Starting consumer (order: 1001) on route: route2
2020-09-17 20:15:19,216: Camel (camel) thread #4 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:1 } (2)
2020-09-17 20:15:19,216: Camel (camel) thread #5 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:2 } (3)
2020-09-17 20:15:19,216: Camel (camel) thread #6 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:3 } (4)
2020-09-17 20:15:19,217: Camel (camel) thread #7 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:4 } (5)
2020-09-17 20:15:19,238: Camel (camel) thread #8 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsSessionInfo { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:5 } (6)
2020-09-17 20:15:19,361: Camel (camel) thread #4 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: create -> JmsConsumerInfo: { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:1:1, destination = enterprise1.queue } (7)
2020-09-17 20:15:19,362: main DEBUG (DefaultManagementAgent.java:470) - Registered MBean with ObjectName: org.apache.camel:context=camel,type=consumers,name=JmsConsumer(0x1132baa3)
2020-09-17 20:15:19,362: main DEBUG (DefaultConsumer.java:144) - Starting consumer: Consumer[jmsTopic://topic:enterprise2.topic]
2020-09-17 20:15:19,364: main DEBUG (FailoverProvider.java:153) - Initiating initial connection attempt task
2020-09-17 20:15:19,365: main DEBUG (AbstractJmsListeningContainer.java:382) - Established shared JMS Connection
...
2020-09-17 20:15:22,486: Camel (camel) thread #7 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: start -> JmsConsumerInfo: { ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:4:1, destination = enterprise1.queue } (40)
2020-09-17 20:15:22,490: Camel (camel) thread #5 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: message pull -> ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:2:1 (41)
2020-09-17 20:15:22,491: Camel (camel) thread #4 - JmsConsumer[enterprise1.queue] DEBUG (FailoverProvider.java:1159) - Executing Failover Task: message pull -> ID:0229e4fb-1885-4a10-8b55-04a7a0a450a5:1:1:1 (42)
...

共有1个答案

任元青
2023-03-14

客户端正在从其连接的Artemis服务器接收已知代理URI的更新,该服务器似乎只知道运行端口为61617的服务器。这将导致客户端使用从代理返回的新集替换URI,该新集显示在日志中:

Replacing uris:URI Pool { [amqp://localhost:5673, amqp://localhost:5672] } with new set: [amqp://localhost:5672, amqp://localhost:61617?amqp.vhost=localhost]

因此,如果要重新连接的正确服务器是端口5673上的原始备用服务器,那么它将永远不会尝试该服务器,因为第一个代理告诉它,只有已知的代理才会发送备用服务器。通过将下面的故障切换URI选项设置为“添加”或“忽略”,可以更改客户端行为,使其不替换已知主机的原始配置。

故障转移。amqpOpenServerListAction控制当远程对等方的连接打开框架向客户端提供故障转移主机列表时,故障转移传输的行为。此选项接受三个值之一;替换、添加或忽略(默认值为REPLACE)。如果配置了REPLACE,则除当前服务器的故障切换URI以外的所有故障切换URI都将替换为远程对等方提供的URI。如果配置了ADD,则会将远程设备提供的URI添加到现有的故障切换URI集中,并进行重复数据消除。如果配置了IGNORE,则会删除来自远程的任何更新,并且不会更改正在使用的故障切换URI集。

有关更多信息,请参阅Qpid JMS客户端配置页面。

 类似资料:
  • 我想使用Apache Qpid订阅Java消息服务(JMS)发布订阅服务。然而,我不想使用Java,而是想使用C。我的客户告诉我这是可能的(甚至说是微不足道的)。它们是否正确?有人能给我举个例子吗?我所看到的每一处都表明,要使用JMS,我必须使用Java。这里的要点是,该服务是第三方服务(因此我不能将其更改为使用AMQP或JMS以外的任何其他协议)。

  • 跟随wiki页面https://cwiki.apache.org/qpid/amqp-java-jms-messaging-client.html到https://cwiki.apache.org/qpid/connection-url-format.html. Simple me需要qpid-amqp-1-0-client-jms-0.20-sources。jar接受URI格式,但它似乎忽视了一

  • 我使用如下URI配置了一个JmsConnectionFactory: 故障切换:(amqps://11.22.33.44?amqp.idleTimeout=120000 请注意jms。预回迁策略。all=10参数,根据官方文件 ... 控制远程对等方可以发送到客户端并保存在每个使用者实例的预回迁缓冲区中的消息数量。 所以我应该不会在客户端中看到超过10条消息,对吧?好吧,那不起作用。 我最终使用反

  • 我正在尝试使用Apache Camel和Qpid JMS客户端连接到在两个不同节点(VM)中运行的ActiveMQ Artemis主动-主动集群。我正在使用ActiveMQ Artemis 2.17.0。 我正在试图找出我的组织的远程URI配置应该是什么。阿帕奇。qpid。jms。JmsConnectionFactory实例。使用<代码>ampq://host1:5672,ampq://host2

  • 我有一个wsdl: 我想提交信息以获得回应。我创建了client.php如下: 但它在浏览器中显示错误: SoapFault对象([消息:受保护]= 我错在哪里?对此,可能的解决方案是什么? 编辑: 我已经创建了一个php文件:client。php 但它产生了这个错误: 调用错误:响应不是文本/xml类型:应用程序/wsdl xmlHTTP/1.1 200确定日期:星期二,9月17日2013 15

  • 这是我的第一个问题。我必须使用ES rest高级客户端。我的ES服务器是6.8。x、 所以我写了我的构建。gradle文件。 但我的项目依赖性如下所示。 Gradle:org.elasticsearch.client:elasticsearch ch-rest-客户端:7.6.2 Gradle:org.elasticsearch.client:elasticsearch ch-rest-高级别-客