当前位置: 首页 > 知识库问答 >
问题:

Disruptor-消费者是多线程的吗?

湛博易
2023-03-14

关于破坏者,我有以下问题:

  1. 消费者(事件处理器)没有实现他们实现EventHandler的任何可调用或可运行接口,那么他们如何能够并行运行,因此,例如,我有一个disruptor实现,其中有这样一个菱形模式
     c1
P1 - c2 - c4 - c5
     c3

其中c1到c3可以在p1之后并联工作,C4和C5在p1之后工作。

所以通常我会有这样的东西(P1和C1-C5是可运行/可调用的)

p1.start();
p1.join();

c1.start();
c2.start();
c3.start();
c1.join();
c2.join();
c3.join();

c4.start();
c4.join();
c5.start();
c5.join();

但是在Disruptor的情况下,我的事件处理程序都没有实现Runnable或Callable,那么中断器框架最终是如何并行运行它们的呢?

采取以下sceanrio:

我的消费者C2需要调用webservice来对事件进行一些注释,在SEDA中,我可以为这10个C2请求启动10个线程[用于将消息从队列中拉出,进行Webservice调用并更新下一个SEDA队列],这将确保我不会依次等待10个请求中的每个请求的web服务响应,在这种情况下,作为单个实例的eventprocessor C2(if)将依次等待10个C2请求。

共有2个答案

方寒
2023-03-14

Disruptor的默认策略是多线程的,所以如果您的每个处理器都在不同的处理器(使用者)中工作,那么它应该是好的,并且您的处理器是多线程的。

姜建德
2023-03-14

EventHandler被组合到BatchEventProcessor的实例中,该实例是可运行的。

  • BatchEventProcessor实现run()循环

使用DSL时,Disruptor负责通过Executor实例创建这些线程。

  • 您在Disruptor构造函数中提供了一个执行器

关于您的特定场景(即:长时间运行的事件处理程序),您可以参考以下问题:

  • 解决LMAX中断器模式中的慢速使用者(eventProcessor)问题
 类似资料:
  • 我有两个线程的问题,似乎没有正确同步。我基本上有一个布尔值名为“已占用”。当没有线程启动时,它被设置为false。但是当一个线程启动时,线程集被占用是真的,我有一个类,它有线程(run),它们调用下面的函数。 这是一个模拟银行的示例,它接收一个金额(初始余额),然后随机执行取款和存款。我的教授提到了一些关于从取款线程到存款线程的信号?这是怎么回事?在提取线程中,它应该运行到余额为2低,并等待存款线

  • 我正在使用spark结构化流媒体、合流开源Kafka集群开发spark流媒体应用程序,并在AWS EMR中运行spark job。我们至少有20个Kafka主题,以AVRO格式将数据生成单个Kafka主题,每个主题在3到4个分区之间进行了分区。我正在使用Spark阅读所有20个主题(逗号分隔的主题值)。然后从生成的数据帧中过滤每个消息行,使用正确的Avro模式应用每个消息,并将生成的写入S3和Ca

  • 我正在使用Spring Kafka消费者。我已将并发设置为10,并创建了5个消费者(用于5个主题)。所以有50个Spring Kafka消费者线程。 Kafka消费者可以使用的最大线程数是多少?如何增加此线程池的大小?我查阅了spring文档,但没有发现任何相关内容。

  • 我编写了一个Kafka消费者(使用Spring Kafka),它从单个主题读取并且是消费者组的一部分。一旦消息被消费,它将执行所有下游操作并继续下一个消息偏移量。我已将其打包为WAR文件,我的部署管道将其推送到单个实例。使用我的部署管道,我可能可以将此工件部署到我的部署池中的多个实例。 但是,当我希望多个消费者作为我的基础设施的一部分时,我无法理解以下内容- > 实际上,我可以在部署池中定义多个实

  • 现在,让我们考虑另一个场景(我没有尝试过,但我很好奇),在这个场景中,我启动了两个使用者进程和,这两个进程都具有相同的组,并且它们都是一个单线程进程。现在我的问题是: > 在这种情况下,两个独立的使用者进程(在同一个组下)将如何与分区相关?与上面的单进程多线程场景有何不同? 一般来说,使用者线程或进程如何与主题中的分区映射/相关? 关于将消费者实现为进程与线程,我在这里遗漏了什么微妙的事情吗?提前

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?