在使用disruptor时,可能会有一个(多个)消费者落后,并且由于这个缓慢的消费者,整个应用程序都会受到影响。
请记住,每个生产者(发布者)和消费者(EventProcessor)都在一个线程上运行,如何解决消费者速度慢的问题?
我们可以在单个消费者上使用多个线程吗?如果没有,有什么更好的选择?
使用一组相同的EventHandler。为了避免超过1个eventHandler作用于单个事件,我使用以下方法。
创建一个大小为系统中核数的线程池
Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // a thread pool to which we can assign tasks
然后创建处理程序数组
HttpEventHandler [] handlers = new HttpEventHandler[Runtime.getRuntime().availableProcessors()];
for(int i = 0; i<Runtime.getRuntime().availableProcessors();i++){
handlers[i] = new HttpEventHandler(i);
}
disruptor.handleEventsWith(handlers);
在EventHandler中
public void onEvent(HttpEvent event, long sequence, boolean endOfBatch) throws InterruptedException
{
if( sequence % Runtime.getRuntime().availableProcessors()==id){
System.out.println("-----On event Triggered on thread "+Thread.currentThread().getName()+" on sequence "+sequence+" -----");
//your event handler logic
}
尝试将慢速部分与其他线程分离(I/O,而不是O(1)或O(log)计算等),或在耗电元件过载时施加某种背压(通过让生产商或临时停车,用503或429状态代码重放等):http://mechanical-sympathy.blogspot.com/2012/05/apply-back-pressure-when-overloaded.html
一般来说,使用WorkerPool允许多个池中的工作线程在单个使用者上工作,如果您有独立的任务并且持续时间可能可变(例如:一些短任务,一些长任务),这是很好的。
另一种选择是让多个独立的worker在事件上并行处理,但每个worker只处理模N worker(例如2个线程,一个线程处理奇数,一个线程处理偶数事件id)。如果您有持续时间一致的处理任务,这将非常有效,并允许批处理也非常有效。
另一件需要考虑的事情是,消费者可以进行“批处理”,这在审计中尤其有用。如果您的消费者有10个事件等待,而不是将10个事件单独写入审核日志,那么您可以收集所有10个事件并同时写入它们。根据我的经验,这不仅仅包括运行多个线程的需要。
我是LMAX Disruptor的新手,我正在探索将其用于需要处理巨大流量的多层异步项目。 请求被传递到平台以发送SMS,请求被清理/检查简单错误并被记录。 获取并分析与发送SMS的客户端相关的各种其他信息,并将其他参数添加到SMS信息中。 分析并选择正确的路由。然后从SMS信息中设置特定于平台的协议参数并提交给路由器。 像这样,将有接收和处理短信传递到日志、账单、重试和许多其他的流程。在每个阶段
是否已经有了Python3的实现(或至少绑定到Python3)? 我可以看到Github上有一个原始的Java实现,但我没有看到Python3的任何内容。
对于在x86 Linux上运行的具有多个生产者和单个消费者的lmax中断器(如环形缓冲区)中的慢速消费者,我有一个问题。使用类似于lmax的环形缓冲区模式,您会不断覆盖数据,但如果消费者速度较慢,该怎么办。因此,如果在10个大小的环形缓冲区0-9个环形插槽中,您的消费者位于插槽5上,而您的写入程序现在准备好开始写入插槽15,也就是缓冲区中的插槽5(即:插槽5=15%10),您如何处理这种情况?处理
问题内容: 我运行一个goroutine来增加一个计数器,该计数器可以被命令行输入“ t \ n”打断 在该语句中,如果我选择使用,则计数器变量会飞到前言。对我来说这很正常。 但是,如果我选择使用,则计数器在一秒内最多只能增加60个左右,而不是10,000个。 实际上,无论我输入什么值,在声明中我只能得到大约60Hz的频率。 为什么? 问题答案: 它与的精度有关。查看以下文档: […]等效于New
我正在寻求关于一个奇怪问题的帮助,在这个问题中,队列中的一个慢速使用者会导致同一队列中的所有其他使用者以30秒的间隔开始使用消息。这是所有的消费者,但慢速的消费者不会以最快的速度消费信息,相反,他们在消费前等待一些神奇的30s障碍。 null 更多背景和调查结果 我已经设法在AMQ 5.8.0、5.9.0(最初注意到该问题的地方)和5.9.1上、在新安装和现有的Ops管理的安装上以及在不同的机器上
我有一个环形缓冲区和一个事件处理程序。这用作消息传递服务器的入站缓冲区。 在偶处理程序中,它执行一些AMQP服务调用,有时由于调用没有超时而等待(下划线协议库中调用AMQP服务的错误)。对于2000tps,这使我的4096大小的环形缓冲区在一瞬间就满了。因为由于上述事实,onEvent调用线程似乎没有从该方法返回,并且在onEvent()内无限等待。 目前,不可能为该调用创建超时。 问题是: 当d