我正在尝试弄清楚Disruptor是否适合我的应用程序:一个相当复杂的负载生成器,用于长时间对高性能数据库进行基准测试。Disruptor模式很有吸引力,因为:
应用程序拓扑的简化版本如下:
C1a
C1b
P1 ----> C1c
C1d \
C1e \
\
\
\
\
\
\
Cx
/
P2 --> C2a --> C2b /
===P1===
由P1
馈送的系统组件有1个生产者馈送许多并行消费者(当前为执行者
)。每个事件应该由这些消费者之一恰好处理一次。排序约束是:如果A在B之前产生,那么A将在B之前开始执行,但是B可能首先完成。
处理完事件后,会将其转发到最后一个阶段,即Cx。将Cx看作一个减速机/记录器:它必须按顺序处理事件,因为它所处理的数据结构不是线程安全的。
问题1:使用中断器处理C1a-C1e的最佳方式是什么?
使用WorkerPool
是我倾向的。
===P2===
P2提供的系统组件是一个简单的工作流链:P2生成事件,C2a对其执行一些计算,
C2b对其执行进一步计算,最后将其转发到Cx。
问题2:是否可能有一个中断器和两个生产者,其中每个生产者提供不同的依赖关系子图?
问题3:如果问题2的答案是否定的,是否有可能在两个不同的颠覆者之间共享一个消费者
Cx
,如果是,如何共享?
问题4:如果问题2的答案都是
提前感谢!
--供参考--
这不是一个解决方案,而是相关的。对于任何感兴趣的人,本博客解释了如何创建半复杂的工作流,例如:
C1a --> C1b
C2a --> C2b
P1 --> C3a --> C3b --> Cx
C4a --> C4b
C5a --> C5b
对于你的第一个问题,我相信WorkerPool
会做到这一点。本质上,当每个工作人员变得空闲时,它会抓取下一个工作项。正如你所说,不一定会按顺序完成。但是,我从来没有愤怒地使用过WorkerPool
,所以我可能错了。
对于您的第二个问题,如果您在概念上用另一个中断器(环形缓冲区)替换了Cx,并将其挂起,这是否满足您的其他要求?也就是说,将C1所做的工作发布给另一个破坏者,并将C2b所做的工作发布给同一个破坏者。
我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个
我在尝试部署web应用程序时遇到以下错误: 在注入点[BackedAnnotatedField]处具有限定符@Default\n的AuthzInfo类型的依赖项不明确 可能的依赖项:\n-Producer方法[AuthzInfo],其限定符[@Any@Default]声明为[[BackedAnnotatedMethod]@producers@RequestScoped 我只创造了一个这样的制作人:
我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。 JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该
我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。
我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统
假设,我有多个Kafka制作者同时为单个Kafka主题生成数据。 有可能得到哪个是给定生产者生产的最后一个偏移吗? 例如: 生产者: 我想找出分别由P1和P2发布的最后一条记录的偏移量。 请注意,我不是在要求全局主题分区偏移量。