当前位置: 首页 > 知识库问答 >
问题:

一个慢速的ActiveMQ使用者导致其他使用者慢速

哈宪
2023-03-14

我正在寻求关于一个奇怪问题的帮助,在这个问题中,队列中的一个慢速使用者会导致同一队列中的所有其他使用者以30秒的间隔开始使用消息。这是所有的消费者,但慢速的消费者不会以最快的速度消费信息,相反,他们在消费前等待一些神奇的30s障碍。

    null

更多背景和调查结果

  • 我已经设法在AMQ 5.8.0、5.9.0(最初注意到该问题的地方)和5.9.1上、在新安装和现有的Ops管理的安装上以及在不同的机器上(有的vm有的不是vm)上可靠地复制了该问题。所有linux安装,不同的OSs和java版本。
  • 它似乎不受任何与预取相关的内容的影响,即:将预取值从1更改为10到1000,并不能阻止问题的发生
  • [红鲱鱼?]在amq实例上启用调试日志将显示与定期检查可能过期的消息相关的日志。队列没有过期策略,因此我只能认为排定的ExpireMessagesPeriode时间只是以这样一种方式唤醒amq,然后它将消息发送给非慢速使用者。
  • 如果进入30s模式,然后离开,然后再次进入,则每分钟过去的秒时间总是相同的,例如每分钟过去的14秒和每分钟过去的44秒。这在所有使用者和承载这些使用者的所有机器上都是正确的。AMQ重新启动后,这些屏障点会发生变化。

共有1个答案

凌嘉志
2023-03-14

虽然严格说来不是解决问题的办法,但进一步的调查已经揭示了这一问题的根本原因。

这是已知的行为,在阿波罗之前不会被修复

更多详细信息

这并不能解释为什么我看到非慢速的消费者每隔30分钟就收到一次信息。事实证明,将消息分页到内存中有两种模式,正常和强制。Normal遵循上面概述的过程,其中集合的大小与MaxPageSize属性相比较,但是,当强制执行时,消息总是被分页到内存中。此模式允许您浏览不在内存中的消息。实际上,这个强制模式也被expiry机制用于允许AMQ使不在内存中的消息过期。

因此,我们现在所拥有的是内存中的消息集合,这些消息都是针对发送给同一个使用者的,而这个使用者不会接受它们,因为它很慢或阻塞。我们也有一个积压的消息等待交付给所有的消费者。任务每运行ExpireMessagesPeriode毫秒,就会强制将消息分页到内存中,以检查它们是否过期。这会将这些消息添加到pages in collection中,该集合现在包含针对慢速使用者的MaxPageSize消息和针对任何使用者的N更多消息。那些信息会被传递出去。

QED.

参考文献

  • 引用此问题但改为消息选择器的票证
  • 与配置属性相关的文档
  • 其他有此问题但用于选择器的人
 类似资料:
  • 出于测试原因,我指定了low memoryusage limit low(35MB)以使问题apear更快,但实际情况是,当activemq的问题出现时,我最终需要它来删除旧消息。 我发现了一个不令人满意的解决方案,即在ActiveMQConnectionFactory中设置useasyncsEnd=true,并指定sendtimeout。这使得producer不会被阻塞,但通过这种方式,最新的消

  • 这里的一些配置:非持久消费者、非持久消息、禁用的流控制、默认预取大小、优化确认=true、异步发送=true、使用jms连接ActiveMQ 例如 一个生产者、一个消费者, 生产者发送速率可以达到6k/s 但是,在这种情况下:一个生产者三个消费者, 生产者发送速率下降到4k/s 这是我的一些关键代码: 发件人类别: sendmain方法: 接收机类代码: 接收器类代码在这里隐藏了一些方法,例如创建

  • 我有一个使用SSL传输的activeMQ代理。我有大约10个消费者正在使用经纪人。我正在使用camel来配置我的路线。 谢谢

  • 在使用disruptor时,可能会有一个(多个)消费者落后,并且由于这个缓慢的消费者,整个应用程序都会受到影响。 请记住,每个生产者(发布者)和消费者(EventProcessor)都在一个线程上运行,如何解决消费者速度慢的问题? 我们可以在单个消费者上使用多个线程吗?如果没有,有什么更好的选择?

  • 我正在用Netty v4写一个TCP服务器。服务器将处理来自客户端的多个连接,并将数据流发送给它们。 我希望能够检测客户端何时以较慢的比率使用数据。我基本上想避免TCP缓冲区变满,只是因为客户端很慢! 这基本上就是ZeroMQ所做的(称为“慢用户检测(自杀蜗牛模式)”)。如何使用Netty做到这一点? 我当前的代码是(我将只显示服务器设置): 这就是< code>SO_BACKLOG选项的作用吗?

  • 下面是代码,我面临的问题是recordRead变量告诉线程应该从哪里开始读取记录的起点。但是我如何为每个线程设置不同的值?例如,对于thread1,它应该是0,recordsToRead应该是300,对于thread2,recordsToRead应该是300+300=600,对于最后一个线程,它应该是600以及更高的结束。pagesize=50pagesize、recordRead和recordT