当前位置: 首页 > 知识库问答 >
问题:

在Wildfly 12中配置ActiveMQ Artemis以实现集群中消息的时间关键传递

寿意远
2023-03-14

我不知道如何让Wildfly12中的消息传递子系统在原始节点失败时将队列消息重新传递到不同的节点。我需要一种方法,当第一个尝试的节点没有足够快地确认/提交时,将消息定向到不同的节点。

我有一个3节点的Wildfly12集群,只有一个队列(TestQueue)。我使用单个bean部署应用程序,该bean获取JMS连接并创建与队列中的使用者的会话;下面是构造函数:

public TestMessageListener( ConnectionFactory connectionFactory, Destination destination )
{
    this.context = connectionFactory.createContext( JMSContext.DUPS_OK_ACKNOWLEDGE );
    this.consumer = this.context.createConsumer( destination );
    this.consumer.setMessageListener( this );
    this.context.start();
}

连接工厂和目的地被注入到其他地方:

@Resource( lookup = "java:/ConnectionFactory" ) private ConnectionFactory connectionFactory;
@Resource( lookup = "java:/jms/queue/TestQueue" ) private Destination destination;

回到监听器中,我只记录它收到的内容:

@Override
public void onMessage( Message message )
{
    try
    {
        message.acknowledge();
        String body = new String( message.getBody( byte[].class ), StandardCharsets.UTF_8 );
        LOG.info( body );
    }
    catch ( JMSException e )
    {
        LOG.warning( e.toString() );
    }
}

最后,我在消息传递子系统配置中启用了STOMP:

<socket-binding-groups>
    <socket-binding-group name="full-ha-sockets" default-interface="public">
        <socket-binding name="stomp" port="6164"/>
        ... 
    </socket-binding-group>
</socket-binding-groups>

<subsystem xmlns="urn:jboss:domain:messaging-activemq:3.0">
    <server name="default">
        <remote-acceptor name="stomp-acceptor" socket-binding="stomp">
            <param name="protocols" value="STOMP"/>
        </remote-acceptor>            
        <address-setting name="jms.queue.TestQueue" redistribution-delay="0"/>
        ...
    </server>

我通过stomp连接,并每2秒发送一个测试消息,带有唯一的标识符。3个节点中的每一个依次接收一个,循环往复。然后我从其中一个节点上拔掉网线。

1分钟后(我假设是connection-ttl),我会在其他2个节点上得到关于连接失败的错误消息:

2018-07-11 20:02:18,813 INFO  [TestMessageListener] (Thread-1 (ActiveMQ-client-global-threads)) TEST 435
2018-07-11 20:02:21,448 WARN  [org.apache.activemq.artemis.core.client] (Thread-8 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$3@3070595f)) AMQ212037: Connection failure has been detected: AMQ119014: Did not receive data from /192.168.1.82:51046 within the 60,000ms connection TTL. The connection will now be closed. [code=CONNECTION_TIMEDOUT]
2018-07-11 20:02:21,449 WARN  [org.apache.activemq.artemis.core.server] (Thread-8 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$3@3070595f)) AMQ222061: Client connection failed, clearing up resources for session b7be7d58-855c-11e8-91dd-6c626d5557a6
2018-07-11 20:02:21,449 WARN  [org.apache.activemq.artemis.core.server] (Thread-8 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$3@3070595f)) AMQ222107: Cleared up resources for session b7be7d58-855c-11e8-91dd-6c626d5557a6
2018-07-11 20:02:21,449 WARN  [org.apache.activemq.artemis.core.server] (Thread-8 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$3@3070595f))     AMQ222061: Client connection failed, clearing up resources for session b7becb79-855c-11e8-91dd-6c626d5557a6
2018-07-11 20:02:21,449 WARN  [org.apache.activemq.artemis.core.server] (Thread-8 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$3@3070595f)) AMQ222107: Cleared up resources for session b7becb79-855c-11e8-91dd-6c626d5557a6

在额外的30秒后,我得到了另一轮关于连接失败的错误消息:

2018-07-11 20:02:49,443 WARN  [org.apache.activemq.artemis.core.client] (Thread-1 (ActiveMQ-client-global-threads)) AMQ212037: Connection failure has been detected: AMQ119011: Did not receive data from server for org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection@5e696d4b[local= /192.168.1.27:39202, remote=/192.168.1.82:8080] [code=CONNECTION_TIMEDOUT]
2018-07-11 20:02:49,444 WARN  [org.apache.activemq.artemis.core.server] (Thread-1 (ActiveMQ-client-global-threads)) AMQ222095: Connection failed with failedOver=false
2018-07-11 20:02:49,446 WARN  [org.apache.activemq.artemis.core.server] (Thread-1 (ActiveMQ-client-global-threads)) AMQ222095: Connection failed with failedOver=false

请注意,我的STOMP客户机连接到一个良好的节点,并继续向队列发送消息,而“Failed”框是断开连接的。

我的问题是:

  • 在90秒内,Artemis继续向未插电的盒子发送消息。
  • 我不知道如何让Artemis尝试在90秒后将旧消息重新传递到另一个节点。
  • 我不明白为什么在60s出现第一轮连接错误后,它继续尝试将消息传递到未插入的框。
  • 设置重分发-延迟没有影响,但我认为由于https://activemq.apache.org/artemis/docs/latest/clusters.html.
  • 它会很有用
<address-setting name="jms.queue.TestQueue" redistribution-delay="0"/>
    null

对于这个特定的队列,我不仅希望重新传递尝试到另一个节点,而且希望消息确认超时来触发此重新传递,如果失败,则需要一个750-1000ms的小连接-TTL。如果我将connection-ttl设置为15000ms,那么所有节点之间的所有连接(即使在整个集群正常的情况下)在15000ms之后都会抛出错误。根据https://activemq.apache.org/artemis/docs/latest/configuration-index.html中的文档,该参数是“TTL for the bridge.this应该大于ping周期。”不清楚“ping period”是什么参数,更不清楚这样的参数如何映射到Wildfly子系统配置。我假设connection-ttl在这里,我将其设置为15000:

<cluster-connection name="my-cluster" address="jms" connector-name="http-connector" connection-ttl="60000" retry-interval-multiplier="1.5" max-retry-interval="60000" discovery-group="dg-group1"/>

我完全可以接收和处理重复的信息;我认为jmscontext.dups_ok_acknowledge和redistribution-delay=“0”的组合至少可以解决它的重新交付部分。

我尝试了jmscontext.transact,并使用了jmscontext.commit()和jmscontext.rollback()。显然,rollback()不适用于故障节点与集群的其他部分断开时,但它是触发重新交付的唯一方法。

共有1个答案

吴弘壮
2023-03-14

我认为这里发生的是-1的默认reconnect-attres用于集群连接,只要集群连接试图重新连接到一个向下节点,那么该节点的消息将保留在特殊的“存储转发”队列中。您应该能够将reconnect-attrements设置为-1以外的其他内容,这样集群连接将放弃重新连接的尝试,此时,用于其他节点的消息将通常可用。

 类似资料:
  • 现在我在复制的缓存上使用SQL select语句。现在这些缓存的写入同步模式是FULL_SYNC。 现在,我们只能在一个DC中工作客户端节点,而不能同时在两个DC中工作。假设我们有两个客户在DC1。 因此,节点总数为6个(在DC1中有2个客户端节点和2个服务器节点,在DC2中有2个服务器节点)。 我们的用例是这样一种方式… 2个客户端应该只查询DC1中的2个服务器节点,而不是DC2中的其他2个服务

  • 我试图在Apache Artemis集群中实现消息排序。连接到集群的生产者/消费者实现了高可用性。因此,在某个时间点,将有两个相同应用程序的实例连接到主题或队列。到目前为止,我可以发现以下两种方法可用于在Red Hat AMQ/Artemis集群中实现排序: 消息组(根据文档,只有当集群中每个节点有一个使用者时才是可靠的) 独占队列(仅在单个节点上保留消息顺序)。 我完全理解使用集群和期望消息排序

  • 问题内容: 在Android中,我看到这样的代码: 这在Java中合法吗?这里的View.OnClickListener到底是什么。 问题答案: 是用于接收点击事件的界面。您需要重写该方法并实现自己的代码来处理它。http://developer.android.com/reference/android/view/View.OnClickListener.html 您可以在这里查看类结构:htt

  • 我需要在不同的机器上配置一个Kafka集群,但它不起作用,当我启动生产者和消费者时,将显示以下错误: 你能帮帮我吗。

  • 我想在每个响应中添加etag属性。我已经在响应中添加了varie-header和cache-control header(最大age=600,public),但是我没有找到任何在响应中添加etag的解决方案。有人能帮帮我吗?