我正在寻求关于一个奇怪问题的帮助,在这个问题中,队列中的一个慢速使用者会导致同一队列中的所有其他使用者以30秒的间隔开始使用消息。这是所有的消费者,但慢速的消费者不会以最快的速度消费信息,相反,他们在消费前等待一些神奇的30s障碍。
更多背景和调查结果
ExpireMessagesPeriode
时间只是以这样一种方式唤醒amq,然后它将消息发送给非慢速使用者。虽然严格说来不是解决问题的办法,但进一步的调查已经揭示了这一问题的根本原因。
这是已知的行为,在阿波罗之前不会被修复
更多详细信息
这并不能解释为什么我看到非慢速的消费者每隔30分钟就收到一次信息。事实证明,将消息分页到内存中有两种模式,正常和强制。Normal遵循上面概述的过程,其中集合的大小与MaxPageSize
属性相比较,但是,当强制执行时,消息总是被分页到内存中。此模式允许您浏览不在内存中的消息。实际上,这个强制模式也被expiry机制用于允许AMQ使不在内存中的消息过期。
因此,我们现在所拥有的是内存中的消息集合,这些消息都是针对发送给同一个使用者的,而这个使用者不会接受它们,因为它很慢或阻塞。我们也有一个积压的消息等待交付给所有的消费者。任务每运行ExpireMessagesPeriode
毫秒,就会强制将消息分页到内存中,以检查它们是否过期。这会将这些消息添加到pages in collection中,该集合现在包含针对慢速使用者的MaxPageSize
消息和针对任何使用者的N
更多消息。那些信息会被传递出去。
QED.
参考文献
出于测试原因,我指定了low memoryusage limit low(35MB)以使问题apear更快,但实际情况是,当activemq的问题出现时,我最终需要它来删除旧消息。 我发现了一个不令人满意的解决方案,即在ActiveMQConnectionFactory中设置useasyncsEnd=true,并指定sendtimeout。这使得producer不会被阻塞,但通过这种方式,最新的消
这里的一些配置:非持久消费者、非持久消息、禁用的流控制、默认预取大小、优化确认=true、异步发送=true、使用jms连接ActiveMQ 例如 一个生产者、一个消费者, 生产者发送速率可以达到6k/s 但是,在这种情况下:一个生产者三个消费者, 生产者发送速率下降到4k/s 这是我的一些关键代码: 发件人类别: sendmain方法: 接收机类代码: 接收器类代码在这里隐藏了一些方法,例如创建
我有一个使用SSL传输的activeMQ代理。我有大约10个消费者正在使用经纪人。我正在使用camel来配置我的路线。 谢谢
在使用disruptor时,可能会有一个(多个)消费者落后,并且由于这个缓慢的消费者,整个应用程序都会受到影响。 请记住,每个生产者(发布者)和消费者(EventProcessor)都在一个线程上运行,如何解决消费者速度慢的问题? 我们可以在单个消费者上使用多个线程吗?如果没有,有什么更好的选择?
我正在用Netty v4写一个TCP服务器。服务器将处理来自客户端的多个连接,并将数据流发送给它们。 我希望能够检测客户端何时以较慢的比率使用数据。我基本上想避免TCP缓冲区变满,只是因为客户端很慢! 这基本上就是ZeroMQ所做的(称为“慢用户检测(自杀蜗牛模式)”)。如何使用Netty做到这一点? 我当前的代码是(我将只显示服务器设置): 这就是< code>SO_BACKLOG选项的作用吗?
下面是代码,我面临的问题是recordRead变量告诉线程应该从哪里开始读取记录的起点。但是我如何为每个线程设置不同的值?例如,对于thread1,它应该是0,recordsToRead应该是300,对于thread2,recordsToRead应该是300+300=600,对于最后一个线程,它应该是600以及更高的结束。pagesize=50pagesize、recordRead和recordT