当前位置: 首页 > 知识库问答 >
问题:

ActiveMQ-javax.jms.IllegalStateException:AMQ219019:会话已关闭

壤驷心思
2023-03-14

我有两个运行在不同机器上的ActiveMQ Artemis代理,组成一个简单的集群。我正在使用一个Java应用程序(非常基本)来生成和使用消息,以分析集群的行为。Java代码如下所示:

public void runExample() throws Exception {
    InitialContext initialContext = null;
    Connection connectionA = null;

    try {
        Properties properties = new Properties();
        properties.put("java.naming.factory.initial", "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
        properties.put("connectionFactory.ConnectionFactory", "udp://231.7.7.7:9876");
        properties.put("queue.queue/anotherExampleQueue", "anotherExampleQueue"); 

        initialContext = new InitialContext(properties);
        Queue queue = (Queue) initialContext.lookup("queue/anotherExampleQueue");
        ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("ConnectionFactory");

        Thread.sleep(5000);
        connectionA = connectionFactory.createConnection("admin", "admin");
        
        Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        System.out.println("Session A - " + ((ClientSessionInternal)((org.apache.activemq.artemis.jms.client.ActiveMQSession) sessionA).getCoreSession()).getConnection().getRemoteAddress());
        
        MessageProducer producerA = sessionA.createProducer(queue);
        
        final int numMessages = 10;

        for (int i = 0; i < numMessages; i++) {
            TextMessage messageA = sessionA.createTextMessage("A:This is text message " + i);
            producerA.send(messageA);
            System.out.println("Sent message: " + messageA.getText());
        }

        connectionA.start();
        consume(sessionA, queue, numMessages, "A");
        
    } finally {
        if (connectionA != null) {
            connectionA.close();
        }
        if (initialContext != null) {
            initialContext.close();
        }
    }
}

private static void consume(Session session, Queue queue, int numMessages, String node) throws JMSException {
    MessageConsumer consumer = session.createConsumer(queue);

    for (int i = 0; i < numMessages; i++) {
        TextMessage message = (TextMessage) consumer.receive(2000);
        
        if(message!=null)
            System.out.println("Got message: " + message.getText() + " from node " + node);
    }

    System.out.println("receive other message from node " + node + ": " + consumer.receive(2000));
}

同时在connectiona.start()处使用断点调试上述应用程序。如果我停止我的主代理,那么我会看到从代理接管,所有的消息都像预期的那样被移动到从代理。但是,此时,如果我继续使用我的应用程序,它将抛出javax.jms.IllegalStateException:AMQ219019:Session is closed,而不是在从代理中使用消息。当我再次启动主代理并继续调试时,也会发生同样的情况。文档说自动客户端故障转移会自动发生。

<connectors>
   <connector name="clusterConnectorOne">tcp://10.10.170.5:61616</connector>
</connectors>

<discovery-groups>
   <discovery-group name="my-discovery-group">
      <local-bind-address>10.10.170.5</local-bind-address>
      <group-address>231.7.7.7</group-address>
      <group-port>9876</group-port>
      <refresh-timeout>10000</refresh-timeout>
   </discovery-group>
</discovery-groups>

<cluster-connections>
   <cluster-connection name="my-cluster">
      <connector-ref>clusterConnectorOne</connector-ref>
      <retry-interval>500</retry-interval>
      <use-duplicate-detection>true</use-duplicate-detection>
      <message-load-balancing>STRICT</message-load-balancing>
      <max-hops>1</max-hops>
      <discovery-group-ref discovery-group-name="my-discovery-group"/>
   </cluster-connection>
</cluster-connections>
      
<broadcast-groups>
   <broadcast-group name="my-broadcast-group">
      <local-bind-address>10.10.170.5</local-bind-address>
      <local-bind-port>5432</local-bind-port>
      <group-address>231.7.7.7</group-address>
      <group-port>9876</group-port>
      <broadcast-period>2000</broadcast-period>
      <connector-ref>clusterConnectorOne</connector-ref>
   </broadcast-group>
</broadcast-groups>

<ha-policy>
   <replication>
      <master>
        <check-for-live-server>true</check-for-live-server>
      </master>
   </replication>
</ha-policy>

我不知道这里出了什么问题,有什么建议吗?

共有1个答案

关飞翔
2023-03-14

由于连接工厂使用UDP://231.7.7.7:9876,因此它正在“发现”最终需要使用的连接器。在您的示例中,连接器TCP://10.10.170.5:61616由代理广播,因此客户端将发现并使用该连接器。但是,此连接器没有为HA配置。我需要类似于TCP://10.10.170.5:61616?ha=true;reconnectAttributs=-1,以便在连接丢失时告诉客户端故障转移到备份。在broker.xml中更新连接器配置,故障转移应该可以正常工作。代理附带的许多高可用性示例都演示了这种设置,例如事务-故障转移。

 类似资料:
  • 问题内容: 当我按如下方式调用session.begin事务方法时: 然后我得到以下异常消息 造成此错误的原因是什么? 问题答案: 更新: 我想调用并不能保证该会话实际上是打开的。第一次,您应该使用 代替。该建议实际上与您找到的页面一致。 之前: 根据到目前为止的可用信息,我们可以得出结论,错误的原因是会话未打开;-)

  • 我正在使用ActiveMQ对电子邮件进行排队,消费者读取队列并发送电子邮件。 在启动时,我注册一个生产者,并永远缓存它。 有时,当连接关闭时,生产者无法将消息加入队列。 有人能告诉我处理闭门会议的最佳方式吗?我应该重新注册我的制作人吗?还是有办法重开会话?

  • 我们使用Hibernate(JPA)和Hibernate Envers来保存对象的历史。Web应用程序运行许多线程,其中一些是通过从其他应用程序调用RMI方法创建的,其中一些是由应用程序本身创建的,其中一些是为了处理超文本传输协议请求而创建的(它们生成视图)。 我们还使用视图中的开放会话模式来管理会话,因此我们的web。xml包含: 数据库是使用DAO访问的,它们都有由Spring注入的实体管理器

  • 我们如何优雅地关闭守护进程线程[ActiveMQ会话:ID:PC-63704-1472105244157-1:1:1]? spring-boot activeMQ设置如下所示 对于发送消息,我们只是简单地autowired JmsTemplate并发送消息出去: 对于接收(监听)消息,我们使用Spring DefaultMessageListenerContainer(DMLC) 我们还有其他的设

  • 问题内容: 要获得答案,请向下滚动到此内容的结尾… 基本问题与多次询问相同。我有一个带有两个POJO事件和用户的简单程序-一个用户可以拥有多个事件。 用户: 注意:这是一个示例项目。我 真的 很想在这里使用Lazy抓取。 现在我们需要配置spring和hibernate,并有一个简单的basic-db.xml用于加载: 注意:我玩过CustomScopeConfigurer和SimpleThrea

  • 因此,我尝试使用JSF托管bean上的方法访问HttpSession的属性 但是我得到了一个 为什么啊? 在我的一个会话bean被销毁之前,我需要访问该会话打开的外部服务的连接列表,它们当然存储在会话属性对象上。 我该怎么做?