我们正在运行活动 MQ 5.6.0。在我们的测试环境中,我们有 3 个代理在静态网络中运行。下面是当前方案。我们有6个消费者随机连接到3个经纪人。一个经纪人有3个消费者,第二个有2个,第三个有1个。当我们向队列堆积消息时,我们看到消息积压在第三个代理上,有 1 个使用者,另外两个代理没有获得任何积压,其余 5 个使用者处于空闲状态。
在下面,您将找到我们所有一个代理(dev.queue01)的配置,其他两个类似于静态主机名的适当更改。
我希望消息会自动分发给其他代理,供空闲的消费者使用。请告诉我,在我对问题的描述中我是否遗漏了什么。提前感谢任何指导。
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="prd.queue01" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="false" memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="false" memoryLimit="64mb" optimizedDispatch="true" enableAudit="false" prioritizedMessages="true">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="true"/>
</managementContext>
<persistenceAdapter>
<amqPersistenceAdapter directory="${activemq.data}/data/amqdb"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="256 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="750 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="750 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616" updateClusterClients="true" updateClusterClientsOnRemove="true" rebalanceClusterClients="true"/>
</transportConnectors>
<networkConnectors>
<networkConnector uri="static:(tcp://dev.queue02:61616,tcp://dev.queue03:61616)" name="queues_only" conduitSubscriptions="false" decreaseNetworkConsumerPriority="false" networkTTL="4">
<dynamicallyIncludedDestinations>
<queue physicalName=">"/>
</dynamicallyIncludedDestinations>
<excludedDestinations>
<topic physicalName=">"/>
</excludedDestinations>
</networkConnector>
</networkConnectors>
</broker>
<import resource="jetty.xml"/>
如果我理解正确的话,代理意味着这里的队列。
我尝试在Active MQ 5.5.1上做同样的事情。我所做的只是创建了一个队列,并创建了多个消费者。我将所有消费者指向同一个队列。
主动 MQ 自动负责分发。
我观察到以下示例:
如果我有一个队列-有2000条记录。如果我同时将2个消费者指向这个队列,第一个消费者将处理从0开始的对象。第二个消费者将在随机偏移后(比如从700开始)开始处理对象。
一旦第一个消费者已经完成了从0 - 700的对象处理,并且第二个消费者已经处理了200个记录(700 - 900),第一个消费者可以开始从任何随机偏移(可能是1200)获取对象。
偏移量的调整由主动MQ自动控制。
我注意到了这一点。我非常肯定这种情况会发生。
希望我已经回答了你的问题(或者至少正确理解了你的问题。).
我在这里不明白的是,如果Active-MQ创建队列,它是如何为介于两者之间的对象提供服务的?
一个牵强,因为我不是很确定,但在你的配置中,你排除了所有主题
<excludedDestinations>
<topic physicalName=">"/>
</excludedDestinations>
你能取消测试限制吗?当客户端连接到特定队列/主题时,Activemq使用咨询主题进行通信。因此,您的第三个代理可能不知道其他客户端,因为您阻止了咨询主题。
迟来的回答,但希望对未来的读者有所帮助。
你已经描述了一个经纪人网络环,其中B1、B2和B3都相互交谈,B1上有3个消费者(C1-C3),B1上有2个消费者(C4
B3有三个附属消费者:C6、B1和B2。该代理将在这些消费者之间循环传递消息,因此1/3的消息将发送到C6,1/3发送到B1,1/3发送到B2。
B1 有五个附加的使用者:C1、C2、C3、B2 和 B3。但是消息不会传递到它们刚刚来自的同一代理,因此有 4 个使用者计算来自 B3 的消息:C1、C2、C3 和 B2。因此,在总消息数的 1/3 中,C1、C2 和 C3 将分别获得 1/4(总数的 1/12),而 B2 将获得相同的 1/12。稍后将对此进行详细介绍。
B2有四个附加的消费者:C4、C5、B1和B3。但是消息不会传递给它们刚刚来自的同一个代理,所以来自B3的消息有三个消费者:C4、C5和B1。所以在总消息的1/3中,C4和C5将分别获得1/3(总数的1/9),B1将获得相同的1/9。稍后还会有更多信息。
到目前为止,我们已经看到 C6 获得总消息数的 1/3,C1-C3 获得总消息数的 1/12,C4-C5 获得总消息数的 1/9,以及路由到第二个代理的总消息数的 1/12 1/9 = 7/36。现在让我们回到这些消息。
在B3之后的消息中-
类似地,B3之后的消息-
在B3之后的消息中-
我们现在正在进入收益递减的阶段,但是你现在可以看到,C6获得了总消息的超大份额(36%),而连接到B1(拥有最多消费者)的消费者各自获得了较小的份额(不到10%),这导致C6有很多工作要做,而C1-5的工作和空闲时间要少得多,正如你所观察到的。你还可以看到一些消息可能会在网络中经过很长一段路,导致高延迟,但这不是你的问题。
我在一台Windows主机上安装了两个Kafka 2.1.0代理。默认复制因子设置为2。所有其他设置均为默认设置。 networkClient:[Consumer ClientID=Consumer-1,GroupID=SOUT]无法建立到节点-2(/192.168.0.1:19092)的连接。代理可能不可用。 消费者: 一个制作人:
我是使用ActiveMQ的新手,正在试图理解经纪人网络是如何工作的。我已经阅读了activemq文档,并浏览了互联网上的一些文章,如http://www . jakubkorab . net/2011/11/understanding-ActiveMQ-broker-networks . html 我在本地Windows桌面上设置了以下内容。有2台tomcat服务器分别运行在8080和9080上,
我有两台机器localhost和192.168.1.110来运行两台独立的单机kafka。 kafka2.11-0.10.0.0 bin/kafka-console-producer.sh--broker-list 192.168.1.110:9092--topic test这是一条消息[2016-08-24 18:15:27,441]错误将消息发送到topic test时出错,关键字:null,
我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?
问题内容: 我有一个使用SSL传输的activeMQ代理。我大约有10位使用代理的消费者。我正在使用骆驼配置路线。 即使我重新启动使用者,它总是挂断并且不会使用新消息,即使队列中有待处理的消息也是如此。 我开始尝试一次遍历我的消费者,试图找出问题所在,以找出问题的根源。我终于找到了一个消费者,我可以重新解决这个问题。一段时间后它将挂起,但是,如果我进入活动的MQ管理控制台并尝试查看队列中的消息,它
我使用网络连接器配置了两个代理 A 和 B。如果我使用独占使用者(单个使用者)或消息组(JMXgroupID),消息顺序是否保留? 在经纪人文档的网络中,我发现: 代理网络不会保留总消息排序。总排序适用于单个使用者,但网络桥引入第二个使用者。此外,网络桥接使用者通过 producer.send(..) 转发消息,因此它们从转发代理上的队列头转到目标上队列的尾部。如果单个使用者在联网代理之间移动,则