当前位置: 首页 > 知识库问答 >
问题:

Disruptor有2个生产者,其中每个生产者提供不同的依赖子图?

齐高阳
2023-03-14

我正在尝试弄清楚Disruptor是否适合我的应用程序:一个相当复杂的负载生成器,用于长时间对高性能数据库进行基准测试。Disruptor模式很有吸引力,因为:

  • 高吞吐量
  • 低延迟
  • 无垃圾/可预测的长时间运行行为

应用程序拓扑的简化版本如下:

          C1a
          C1b  
 P1 ----> C1c 
          C1d \
          C1e  \
                \
                 \   
                  \    
                   \ 
                    \ 
                     \
                      Cx
                     /
 P2 --> C2a --> C2b /

===P1===

P1馈送的系统组件有1个生产者馈送许多并行消费者(当前为执行者)。每个事件应该由这些消费者之一恰好处理一次。排序约束是:如果A在B之前产生,那么A将在B之前开始执行,但是B可能首先完成。

处理完事件后,会将其转发到最后一个阶段,即Cx。将Cx看作一个减速机/记录器:它必须按顺序处理事件,因为它所处理的数据结构不是线程安全的。

问题1:使用中断器处理C1a-C1e的最佳方式是什么?

使用WorkerPool是我倾向的。

===P2===

P2提供的系统组件是一个简单的工作流链:P2生成事件,C2a对其执行一些计算,C2b对其执行进一步计算,最后将其转发到Cx。

问题2:是否可能有一个中断器和两个生产者,其中每个生产者提供不同的依赖关系子图?

问题3:如果问题2的答案是否定的,是否有可能在两个不同的颠覆者之间共享一个消费者Cx,如果是,如何共享?

问题4:如果问题2的答案都是

提前感谢!

--供参考--

这不是一个解决方案,而是相关的。对于任何感兴趣的人,本博客解释了如何创建半复杂的工作流,例如:

        C1a --> C1b
        C2a --> C2b  
 P1 --> C3a --> C3b --> Cx
        C4a --> C4b
        C5a --> C5b

共有1个答案

贝洲
2023-03-14

对于你的第一个问题,我相信WorkerPool会做到这一点。本质上,当每个工作人员变得空闲时,它会抓取下一个工作项。正如你所说,不一定会按顺序完成。但是,我从来没有愤怒地使用过WorkerPool,所以我可能错了。

对于您的第二个问题,如果您在概念上用另一个中断器(环形缓冲区)替换了Cx,并将其挂起,这是否满足您的其他要求?也就是说,将C1所做的工作发布给另一个破坏者,并将C2b所做的工作发布给同一个破坏者。

 类似资料:
  • 我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个

  • 我在尝试部署web应用程序时遇到以下错误: 在注入点[BackedAnnotatedField]处具有限定符@Default\n的AuthzInfo类型的依赖项不明确 可能的依赖项:\n-Producer方法[AuthzInfo],其限定符[@Any@Default]声明为[[BackedAnnotatedMethod]@producers@RequestScoped 我只创造了一个这样的制作人:

  • 我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。 JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该

  • 我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。

  • 我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统

  • 假设,我有多个Kafka制作者同时为单个Kafka主题生成数据。 有可能得到哪个是给定生产者生产的最后一个偏移吗? 例如: 生产者: 我想找出分别由P1和P2发布的最后一条记录的偏移量。 请注意,我不是在要求全局主题分区偏移量。