当前位置: 首页 > 知识库问答 >
问题:

限制将持久消息批量发送到队列的 ActiveMQ 创建者的正确方法是什么?

邬弘化
2023-03-14

我有一个生产者,它利用JMS事务将持久消息批量发送到队列。

我已经进行了测试,发现当使用批处理大小为1时,会应用Producer Flow Control。我可以看到我的生产者正在按照我为队列配置的内存限制进行限制。这是我的Producer Flow Control配置:

<policyEntry queue="foo" optimizedDispatch="true"
     producerFlowControl="true" memoryLimit="1mb">
</policyEntry>

队列中待处理消息的数量是可控的,我认为这是生产者流控制运行的证据。

但是,当批处理大小增加到2时,我发现此内存限制不受尊重,并且生产者根本没有受到限制。证据是队列中挂起的消息数继续增加,直到达到配置的 storeUsage 限制。

我知道这可能是因为当批处理大小大于1时,消息以异步方式发送,即使我没有显式地将useAsyncSend设置为true。

ActiveMQ的生产者流控制文档提到,为了抑制异步发布者,我们需要在生产者中配置生产者窗口大小,一旦达到窗口限制,这将强制生产者等待确认。

但是,当我在生产者中配置生产者窗口大小并尝试批量发送消息时,会引发异常并且没有发送消息。

这让我思考并提出了一个问题,“在批量发送持久消息时,是否可以配置生产者窗口大小?”。

如果不是,那么什么是正确的方法来限制批量发送持久消息的生产者?

共有2个答案

令狐高洁
2023-03-14

我在v5.8.0中发现了这个问题,但发现这个问题在v5.9.0及更高版本中得到了解决。从 v5.9.0 开始,我发现 PFC 开箱即用,即使对于异步发送消息的生产者也是如此。自批量发送(其中批量大小

我的制作者在没有任何额外配置的情况下被限制了。队列中的挂起邮件数没有增加,并且处于控制之下。

通过一个简单的Camel消费者消费消息(并将它们附加到文件中),我发现在v5.8.0中(我遇到了这个问题),我可以在36秒内发送有效负载2k的100k消息。但是它们中的大多数最终都是待处理的消息。

但在v5.9.0中,发送同一组消息以证明PFC所起的作用需要176秒。在我的例子中,挂起的消息数量从未超过1000。

我还使用v5.10.0和v5.12.0(撰写本文时的最新版本)进行了测试,它们按预期工作。

所以如果你面临这个问题,很可能你正在运行ActiveMQ v5.8.0或更早版本。简单地升级到最新版本应该可以解决这个问题。

我感谢非常有帮助的ActiveMQ邮件列表人员的建议和帮助。

谢谢你的回答。对不起,我没有在我的问题中提到我使用的版本,否则我相信您可以立即发现问题。

元修然
2023-03-14

没有真正的方法来限制“每秒最大消息数”或类似的东西。您要做的是启用生产者流控制和vm游标,然后将该队列(如果您愿意,也可以是所有队列)的内存限制设置为某个合理的级别。

您可以在配置中决定当达到队列内存限制时,生成器是应该挂起还是抛出异常。

<policyEntry queue="MY.BATCH.QUEUE" memoryLimit="100mb" producerFlowControl="true">
  <pendingQueuePolicy>
    <vmQueueCursor/>
  </pendingQueuePolicy>
</policyEntry>
 类似资料:
  • 我是activeMQ的新手,在将消息从驻留在另一台服务器上的消息生成器推送到activeMQ定义的队列时遇到问题。 我在activeMQ上使用camel routes创建的应用程序中有几个队列。我尝试从另一台服务器上的应用程序对这些队列执行远程JNDI查找。我使用了来自http://activemq.apache.org/jndi-support.html页面的activemq文档片段。 我可以连

  • 我们有一个用例,其中我们只创建一个消费者来处理队列中的消息。消息处理器在确认之前积累一定数量的消息。以异步方式接收消息并使用事务会话。消息的大小非常小。 在一定数量的消息之后,主动MQ停止向唯一的消费者发送进一步的消息,并等待确认。我们尝试过像consumer.prefetchSize,consumer . maximumpendingmessagelimit;但是什么都不管用。我们用一个只有一个

  • 问题内容: 我试图将字符串消息发送到在weblogic服务器中创建的JMS队列中。我使用Eclipse IDE,当我运行Web应用程序时,出现以下错误,tomcat服务器关闭。错误是 请帮助我。谢谢和最诚挚的问候 问题答案: 基于对该问题的一些快速研究,它似乎是由于在应用服务器和客户端之间使用不同的JDK级别引起的。我看到的大多数示例都表明,在Java 5上运行Weblogic时在客户端上使用Ja

  • “ActiveMQ中Blob消息传递的持久性”? "我们不能使用数据库(KahaDB)来Blob消息URL吗?" “我们可以像在远程activemq服务器中一样在嵌入式代理中创建文件服务器吗?”

  • 因为正如我在Active MQ Artemis文档中看到的,持久值是一个布尔值,但在amqpnetlite库中它是一个uint,我的理解是,超过0的所有内容都应该是true,而0应该是false。 起初,这种行为非常奇怪:即使当Aretemis Web界面显示为持久队列时,一旦没有用户连接,它也会被删除。 我发现:ActiveMQ Artemis queue在关闭消费客户机后被删除,这描述了即使是

  • 我正在尝试测试ActiveMQ的队列持久性。 我有一个具有唯一消费者的嵌入式ActiveMQ服务器。这个嵌入式服务器接收来自许多其他JVM应用程序的JMS消息。 它工作正常,消费者应用程序接收通知。 所以我试着测试消息的持久性。我在消费者的MessageListener上设置了一个(远程)断点,这样我就可以让许多消息排队,并使ActiveMQ服务器崩溃。在服务器重启时,我希望所有排队的消息都能被使