当前位置: 首页 > 知识库问答 >
问题:

具有1个发布者和4个并行消费者的中断器示例

夏学名
2023-03-14

在本例中https://stackoverflow.com/a/9980346/93647为什么我的破坏者的例子如此缓慢?(在问题的末尾)有一个发布项目的出版商和一个消费者。

但是在我的例子中,消费者的工作要复杂得多,需要一些时间。所以我想要4个并行处理数据的消费者。

例如,如果生产者生产数字:1,2,3,4,5,6,7,8,9,10,11...

我想让消费者1抓住1,5,9,。。。消费者2捕捉2,6,10,。。。消费者要抓住3,7,11,。。。消费者4要抓住4,8,12。。。(好吧,不完全是这些数字,想法是数据应该并行处理,我不在乎哪个消费者处理哪个特定数字)

请记住,这需要并行完成,因为在实际html" target="_blank">应用程序中,消费者的工作非常昂贵。我希望消费者在不同的线程中执行,以使用多核系统的功能。

当然,我可以创建4个环形缓冲区,并将1个消费者连接到1个环形缓冲区。这样我就可以使用原始示例。但我觉得这是不对的。可能创建1个发布服务器(1个ringbuffer)和4个消费者是正确的,因为这正是我所需要的。

在google群组中添加一个非常相似的问题的链接:https://groups.google.com/forum/#!消息/lmax干扰器/-CLapWuwWLU/GHEP4UkxrAEJ

因此,我们有两种选择:

  • 一环多个消费者(每个消费者在每次添加时都会“醒来”,所有消费者都应该有相同的等待策略)

共有2个答案

冯亮
2023-03-14

从环形缓冲区的规格中,您将看到每个消费者都会尝试处理您的ValueEvent。在您的情况下,您不需要它。

我是这样解决的:

将处理过的字段添加到您的ValueEvent中,当消费者获取事件时,他会在该字段上进行测试,如果已经处理过,他会转到下一个字段。

不是最漂亮的方式,但这就是缓冲区的工作方式。

巩枫
2023-03-14

编辑:我忘了提到代码部分取自常见问题解答。我不知道这种方法是比弗兰克的建议好还是坏。

该项目的文档严重不足,这是一个遗憾,因为它看起来不错
无论如何,请尝试以下剪贴(基于您的第一个链接)-在mono上测试,似乎还可以:

using System;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public long Value { get; set; }
    }

    public class MyHandler : IEventHandler<ValueEntry>
    {
        private static int _consumers = 0;
        private readonly int _ordinal;

        public MyHandler()
        {
            this._ordinal = _consumers++;
        }

        public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
        {
            if ((sequence % _consumers) == _ordinal)
                Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal);
            else
                Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal);                     
        }
    }

    class Program
    {
        private static readonly Random _random = new Random();
        private const int SIZE = 16;  // Must be multiple of 2
        private const int WORKERS = 4; 

        static void Main()
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default);
            for (int i=0; i < WORKERS; i++)
                disruptor.HandleEventsWith(new MyHandler());
            var ringBuffer = disruptor.Start();

            while (true)
            {
                long sequenceNo = ringBuffer.Next();
                ringBuffer[sequenceNo].Value =  _random.Next();;
                ringBuffer.Publish(sequenceNo);
                Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value);
                Console.ReadKey();
            }
        }
    }
}
 类似资料:
  • 问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在

  • 我正在尝试了解RxJava并发的一些细节,但我不确定我的想法是否正确。我对SubscribeOn/观察的工作原理有很好的了解,但我正在尝试确定池调度程序的一些细节。为此,我正在考虑尽可能简单地实现一个1-N生产者-消费者链,其中消费者的数量与CPU一样多。 根据文档,Schedulers.computation()由与内核一样多的线程池支持。但是,根据Reactive合约,运算符只能获得顺序调用。

  • 我是Kafka的初学者。我知道具有相同组id的多个消费者不能在一个主题中使用来自同一个分区的消息。我想知道如果来自一个消费组的多个Kafka消费者从一个分区读取相同的消息会发生什么,为什么这是一件坏事。 。

  • 有以下消费者代码: 然后我用脚本生成消息: 问题是,当我将消费者作为两个不同的进程启动时,我会在每个进程中收到新消息。但是,我希望它只发送给一个消费者,而不是广播。 在Kafka的文献中(https://kafka.apache.org/documentation.html)其中写道: 如果所有使用者实例都具有相同的使用者组,则其工作原理就像在使用者之间平衡负载的传统队列一样。 我发现这些消费者的

  • 我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。 JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该

  • 我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。