当前位置: 首页 > 知识库问答 >
问题:

ActiveMQ、经纪人网络、闲置消费者

龙永福
2023-03-14

我们正在运行活动 MQ 5.6.0。在我们的测试环境中,我们有 3 个代理在静态网络中运行。下面是当前方案。我们有6个消费者随机连接到3个经纪人。一个经纪人有3个消费者,第二个有2个,第三个有1个。当我们向队列堆积消息时,我们看到消息积压在第三个代理上,有 1 个使用者,另外两个代理没有获得任何积压,其余 5 个使用者处于空闲状态。

在下面,您将找到我们所有一个代理(dev.queue01)的配置,其他两个类似于静态主机名的适当更改。

我希望消息会自动分发给其他代理,供空闲的消费者使用。请告诉我,在我对问题的描述中我是否遗漏了什么。提前感谢任何指导。

http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
        <value>file:${activemq.conf}/credentials.properties</value>
    </property>
</bean>

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="prd.queue01" dataDirectory="${activemq.data}">

    <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">" producerFlowControl="false" memoryLimit="1mb"> 
              <pendingSubscriberPolicy>
                <vmCursor />
              </pendingSubscriberPolicy>
            </policyEntry>
            <policyEntry queue=">" producerFlowControl="false" memoryLimit="64mb" optimizedDispatch="true" enableAudit="false" prioritizedMessages="true"> 
              <networkBridgeFilterFactory>
                <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
              </networkBridgeFilterFactory>
            </policyEntry>
          </policyEntries>
        </policyMap>
    </destinationPolicy>

    <managementContext>
        <managementContext createConnector="true"/>
    </managementContext>

    <persistenceAdapter>
        <amqPersistenceAdapter directory="${activemq.data}/data/amqdb"/>
    </persistenceAdapter>

      <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryUsage limit="256 mb"/>
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="750 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="750 gb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>
    <transportConnectors>
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" updateClusterClients="true" updateClusterClientsOnRemove="true" rebalanceClusterClients="true"/>
    </transportConnectors>

    <networkConnectors>
      <networkConnector uri="static:(tcp://dev.queue02:61616,tcp://dev.queue03:61616)" name="queues_only" conduitSubscriptions="false" decreaseNetworkConsumerPriority="false" networkTTL="4">
      <dynamicallyIncludedDestinations>
        <queue physicalName=">"/> 
      </dynamicallyIncludedDestinations>
      <excludedDestinations>
        <topic physicalName=">"/> 
      </excludedDestinations>
    </networkConnector>
</networkConnectors>


</broker>
<import resource="jetty.xml"/>

共有3个答案

沈建柏
2023-03-14

如果我理解正确的话,代理意味着这里的队列。

  • 你所有的经纪人都有相同类型的对象。
  • 你所有的消费者都做同样的过程。
  • 并且您希望在消费者之间平均分担工作html" target="_blank">负载。
  • 操作顺序不是那么重要。

我尝试在Active MQ 5.5.1上做同样的事情。我所做的只是创建了一个队列,并创建了多个消费者。我将所有消费者指向同一个队列。

主动 MQ 自动负责分发。

我观察到以下示例:

如果我有一个队列-有2000条记录。如果我同时将2个消费者指向这个队列,第一个消费者将处理从0开始的对象。第二个消费者将在随机偏移后(比如从700开始)开始处理对象。

一旦第一个消费者已经完成了从0 - 700的对象处理,并且第二个消费者已经处理了200个记录(700 - 900),第一个消费者可以开始从任何随机偏移(可能是1200)获取对象。

偏移量的调整由主动MQ自动控制。

我注意到了这一点。我非常肯定这种情况会发生。

希望我已经回答了你的问题(或者至少正确理解了你的问题。).

我在这里不明白的是,如果Active-MQ创建队列,它是如何为介于两者之间的对象提供服务的?

欧阳安阳
2023-03-14

一个牵强,因为我不是很确定,但在你的配置中,你排除了所有主题

<excludedDestinations>
    <topic physicalName=">"/> 
</excludedDestinations>

你能取消测试限制吗?当客户端连接到特定队列/主题时,Activemq使用咨询主题进行通信。因此,您的第三个代理可能不知道其他客户端,因为您阻止了咨询主题。

齐迪
2023-03-14

迟来的回答,但希望对未来的读者有所帮助。

你已经描述了一个经纪人网络环,其中B1、B2和B3都相互交谈,B1上有3个消费者(C1-C3),B1上有2个消费者(C4

B3有三个附属消费者:C6、B1和B2。该代理将在这些消费者之间循环传递消息,因此1/3的消息将发送到C6,1/3发送到B1,1/3发送到B2。

B1 有五个附加的使用者:C1、C2、C3、B2 和 B3。但是消息不会传递到它们刚刚来自的同一代理,因此有 4 个使用者计算来自 B3 的消息:C1、C2、C3 和 B2。因此,在总消息数的 1/3 中,C1、C2 和 C3 将分别获得 1/4(总数的 1/12),而 B2 将获得相同的 1/12。稍后将对此进行详细介绍。

B2有四个附加的消费者:C4、C5、B1和B3。但是消息不会传递给它们刚刚来自的同一个代理,所以来自B3的消息有三个消费者:C4、C5和B1。所以在总消息的1/3中,C4和C5将分别获得1/3(总数的1/9),B1将获得相同的1/9。稍后还会有更多信息。

到目前为止,我们已经看到 C6 获得总消息数的 1/3,C1-C3 获得总消息数的 1/12,C4-C5 获得总消息数的 1/9,以及路由到第二个代理的总消息数的 1/12 1/9 = 7/36。现在让我们回到这些消息。

在B3之后的消息中-

类似地,B3之后的消息-

在B3之后的消息中-

我们现在正在进入收益递减的阶段,但是你现在可以看到,C6获得了总消息的超大份额(36%),而连接到B1(拥有最多消费者)的消费者各自获得了较小的份额(不到10%),这导致C6有很多工作要做,而C1-5的工作和空闲时间要少得多,正如你所观察到的。你还可以看到一些消息可能会在网络中经过很长一段路,导致高延迟,但这不是你的问题。

 类似资料:
  • 我在一台Windows主机上安装了两个Kafka 2.1.0代理。默认复制因子设置为2。所有其他设置均为默认设置。 networkClient:[Consumer ClientID=Consumer-1,GroupID=SOUT]无法建立到节点-2(/192.168.0.1:19092)的连接。代理可能不可用。 消费者: 一个制作人:

  • 我是使用ActiveMQ的新手,正在试图理解经纪人网络是如何工作的。我已经阅读了activemq文档,并浏览了互联网上的一些文章,如http://www . jakubkorab . net/2011/11/understanding-ActiveMQ-broker-networks . html 我在本地Windows桌面上设置了以下内容。有2台tomcat服务器分别运行在8080和9080上,

  • 我有两台机器localhost和192.168.1.110来运行两台独立的单机kafka。 kafka2.11-0.10.0.0 bin/kafka-console-producer.sh--broker-list 192.168.1.110:9092--topic test这是一条消息[2016-08-24 18:15:27,441]错误将消息发送到topic test时出错,关键字:null,

  • 我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?

  • 问题内容: 我有一个使用SSL传输的activeMQ代理。我大约有10位使用代理的消费者。我正在使用骆驼配置路线。 即使我重新启动使用者,它总是挂断并且不会使用新消息,即使队列中有待处理的消息也是如此。 我开始尝试一次遍历我的消费者,试图找出问题所在,以找出问题的根源。我终于找到了一个消费者,我可以重新解决这个问题。一段时间后它将挂起,但是,如果我进入活动的MQ管理控制台并尝试查看队列中的消息,它

  • 我使用网络连接器配置了两个代理 A 和 B。如果我使用独占使用者(单个使用者)或消息组(JMXgroupID),消息顺序是否保留? 在经纪人文档的网络中,我发现: 代理网络不会保留总消息排序。总排序适用于单个使用者,但网络桥引入第二个使用者。此外,网络桥接使用者通过 producer.send(..) 转发消息,因此它们从转发代理上的队列头转到目标上队列的尾部。如果单个使用者在联网代理之间移动,则