当前位置: 首页 > 知识库问答 >
问题:

Apache ActiveMq Artemis客户端重新连接到群集HA复制/共享数据存储区中的下一个可用代理

逑兴安
2023-03-14
<?xml version="1.0" encoding="UTF-8" standalone="no"?>

<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
   <core xmlns="urn:activemq:core">

      <bindings-directory>./data/bindings</bindings-directory>

      <journal-directory>./data/journal</journal-directory>

      <large-messages-directory>./data/largemessages</large-messages-directory>

      <paging-directory>./data/paging</paging-directory>
      <!-- Connectors -->

      <connectors>
         <connector name="netty-connector">tcp://10.64.60.100:61617</connector><!-- direct ip addres of host myhost1 -->
         <connector name="broker2-connector">tcp://myhost2:61616</connector> <!-- ip 10.64.60.101 <- mocked up ip for security reasons -->
      </connectors>

      <!-- Acceptors -->
      <acceptors>
         <acceptor name="amqp">tcp://0.0.0.0:61617?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true</acceptor>
      </acceptors>

      <ha-policy>
       <replication>
          <master/>
       </replication>
    </ha-policy>

      <cluster-connections>
         <cluster-connection name="myhost1-cluster">
            <connector-ref>netty-connector</connector-ref>
            <retry-interval>500</retry-interval>
            <use-duplicate-detection>true</use-duplicate-detection>
            <message-load-balancing>ON_DEMAND</message-load-balancing>
            <max-hops>1</max-hops>
              <static-connectors>
             <connector-ref>broker2-connector</connector-ref> <!-- defined in the connectors -->
            </static-connectors>
         </cluster-connection>
      </cluster-connections>

      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <permission type="manage" roles="amq"/>
         </security-setting>
      </security-settings>

      <address-settings>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
             <auto-delete-queues>false</auto-delete-queues>
            <auto-delete-created-queues>false</auto-delete-created-queues>
            <auto-delete-addresses>false</auto-delete-addresses>
        </address-setting>
      </address-settings>
   </core>
</configuration>

使用Java和producer模板将消息推送到直接endpoint,然后路由到队列的客户端生成消息:

java-camel-producer-client

    package com.demo.artemis;

    import org.apache.camel.CamelContext;
    import org.apache.camel.ProducerTemplate;
    import org.apache.camel.spring.SpringCamelContext;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;

    public class ToRoute {

         public static void main(String[] args) throws Exception { 
                ApplicationContext appContext = new ClassPathXmlApplicationContext(
                        "camel-context-producer.xml");
                ProducerTemplate template = null;
                CamelContext camelContext = SpringCamelContext.springCamelContext(
                        appContext, false);
                try {            
                    camelContext.start();
                     template = camelContext.createProducerTemplate();
                    String msg = null;
                    int loop =0;
                    int i = 0;
                    while(true) {
                        if (i%10000==0) {
                            i=1;
                            loop=loop+1;
                        }
                        if(loop==2) break;
                        msg ="---> "+(++i);             
                        template.sendBody("direct:toDWQueue", msg);
                    }
                } finally {
                    if (template != null) {
                        template.stop();
                    }
                    camelContext.stop();
                }
         }
    }

向队列发送消息的Camel上下文:camel-producer-client

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:jee="http://www.springframework.org/schema/jee" 
    xmlns:tx="http://www.springframework.org/schema/tx" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/aop 
        http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.1.xsd
        http://www.springframework.org/schema/jee
        http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
        http://camel.apache.org/schema/spring
        http://camel.apache.org/schema/spring/camel-spring.xsd">
     <bean id="jmsConnectionFactory" class="org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory">
      <constructor-arg index="0" value="(tcp://myhost:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;"/>
    </bean>

    <bean id="jmsPooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop">
      <property name="maxConnections" value="10" />
      <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>

    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
      <property name="connectionFactory" ref="jmsPooledConnectionFactory" />
      <property name="concurrentConsumers" value="5" />
    </bean>

    <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
      <property name="configuration" ref="jmsConfig" />
      <property name="streamMessageTypeEnabled" value="true"/>
    </bean>

        <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
      <endpoint id="myqueue" uri="jms:queue:myExampleQueue" />

      <route>
        <from uri="direct:toMyExample"/>
       <transform>
      <simple>MSG FRM DIRECT TO MyExampleQueue : ${bodyAs(String)}</simple>
        </transform>
        <to uri="ref:myqueue"/>
      </route>
    </camelContext>

  </beans>

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:jee="http://www.springframework.org/schema/jee" 
    xmlns:tx="http://www.springframework.org/schema/tx" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/aop 
        http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.1.xsd
        http://www.springframework.org/schema/jee
        http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
        http://camel.apache.org/schema/spring
        http://camel.apache.org/schema/spring/camel-spring.xsd">

        <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
            <property name="environment">
                <props>
                    <prop key="java.naming.factory.initial">org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory</prop>
                                <prop key="connectionFactory.ConnectionFactory">(tcp://myhost1:61617,tcp://myhost2:61616)?ha=true;retryInterval=1000;retryIntervalMultiplier=1.0;reconnectAttempts=-1;</prop>
                                  <prop key="queue.queue/MyExampleQueue">MyExampleQueue</prop>
                    <prop key="queue.queue/OutBoundQueue">OutBoundQueue</prop>
                </props>
            </property>
        </bean>
           <bean id="jndiFactoryBean" class="org.springframework.jndi.JndiObjectFactoryBean">
            <property name="jndiName" value="ConnectionFactory"/> 
            <property name="jndiTemplate" ref="jndiTemplate"/>
              <property name="cache" value="true"/>     
        </bean> 

        <bean id="jndiDestinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
            <property name="jndiTemplate" ref="jndiTemplate"/>
              <property name="cache" value="true"/>
    <!-- dynamic destination if the destination name  is not found in JNDI -->
             <property name="fallbackToDynamicDestination" value="true"/>
        </bean>

       <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
            <property name="connectionFactory" ref="jndiFactoryBean"/>
            <property name="destinationResolver" ref="jndiDestinationResolver"/>
                 <property name="concurrentConsumers" value="10" />
        </bean>

        <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
               <property name="configuration" ref="jmsConfig" />
        </bean>

        <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
      <endpoint id="inqueue" uri="jms:queue:MyExampleQueue" />
      <endpoint id="outqueue" uri="jms:queue:OutBoundQueue" />

      <route>
        <from uri="ref:inqueue" />
        <convertBodyTo type="java.lang.String" />
        <transform>
          <simple>MSG FRM MyExampleQueue TO OutboundQueue : ${bodyAs(String)}</simple>
        </transform>
        <to uri="ref:outqueue" />
      </route>
    </camelContext>
  </beans>

在使用Camel上下文时,我看到当主服务器不可用时,客户机使用者被重定向到从服务器(我使用JNDI和JMSPoolConnectionFactory)。

java-producer-client使用producer模板将消息发送到经过转换并路由到队列的直接endpoint。当主服务器关闭时,客户端无法进行故障转移并连接到从服务器。(问题:当代理处于HA模式时,这是预期的行为吗)。

在重新连接尝试后,我看到下面的异常,当主停机时。

Caused by: javax.jms.JMSException: AMQ219016: Connection failure detected. Unblocking a blocking call that will never get a response

另一方面,使用Main启动的camel-context-consumer能够自动故障转移到slave。在控制台中,我确实注意到从主机和处理数据中有消费者计数10。但是当主节点备份时,客户端没有切换到活动主节点。问题:这也是意料之中的吗?

为了创建connectionfactory,使用以下约定。

(tcp://myhost:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;

问题:即使使用静态连接器,我们是否需要配置broacast和discovery组?

共有1个答案

金飞
2023-03-14

如果在客户端处于阻塞调用中间(例如发送持久消息并等待代理的ack、提交事务等)时发生故障转移,则会出现“解除阻塞永远不会得到响应的阻塞调用”的错误。这将在文档中进一步讨论。

根据您的配置,客户机在主代理返回时不会切换回主代理的事实也是意料之中的。简而言之,您没有正确配置故障备份。你的主人应该:

<ha-policy>
   <replication>
      <master>
         <check-for-live-server>true</check-for-live-server>
      </master>
   </replication>
</ha-policy>

你的奴隶应该:

<ha-policy>
   <replication>
      <slave>
         <allow-failback>true</allow-failback>
      </slave>
   </replication>
</ha-policy>
 类似资料:
  • 我正在尝试使用Apache Camel和Qpid JMS客户端连接到在两个不同节点(VM)中运行的ActiveMQ Artemis主动-主动集群。我正在使用ActiveMQ Artemis 2.17.0。 我正在试图找出我的组织的远程URI配置应该是什么。阿帕奇。qpid。jms。JmsConnectionFactory实例。使用<代码>ampq://host1:5672,ampq://host2

  • 我想在由安全kafka集群的kafka主题支持的Flink SQL表上执行一个查询。我能够以编程方式执行查询,但无法通过Flink SQL客户端执行。我不知道如何通过Flink SQL客户端传递JAAS配置()和其他系统属性。 FlinkSQL以编程方式查询 这很好。 通过SQL客户端Flink SQL查询 运行此命令将导致以下错误。 中没有任何内容,除了以下注释 SQL客户端运行命令 Flink

  • 我在Vert发展。x(基于Netty和Hazelcast),我正在尝试在两个服务器实例之间共享数据(在同一局域网上的不同机器中的每个实例)。 我的问题是我不知道如何配置vert. x服务器以允许它们共享它们的并发内存映射(理论上说这是可能的)。 我已经阅读了Vert. x和Hazelcast的许多文档,但还没有结果。(我不知道如何强制vert. x加载hazelcast xml配置文件)。 提前感

  • 我已经使用KOPS安装了kubernetes集群。 从kops安装kubectl的节点开始,kubectl全部工作完美(假设节点A)。 我正在尝试从另一个安装了kubectl的服务器(节点B)连接到kubernetes集群。我已经将~/.kube从A节点复制到B节点,但当我尝试执行以下基本命令时: 我的配置文件是: 感谢任何帮助

  • 我正在使用POSIX共享内存和未命名信号量实现客户机服务器。服务器可以同时处理多个客户端。该代码适用于单个客户端,但不适用于多个客户端。POSIX操作是用,

  • 我正在使用ActiveMQ客户端库将我的服务器应用程序连接到ActiveMQ。几个不同的消费者和生产者在单个线程中运行。、和之间的关系应该如何? 每个JVM一个连接工厂 每个JVM一个到代理的连接或n个连接,每个使用者一个 n个会话,每个消费者一个(Javadoc似乎强烈建议这样做)