在本例中https://stackoverflow.com/a/9980346/93647为什么我的破坏者的例子如此缓慢?(在问题的末尾)有一个发布项目的出版商和一个消费者。
但是在我的例子中,消费者的工作要复杂得多,需要一些时间。所以我想要4个并行处理数据的消费者。
例如,如果生产者生产数字:1,2,3,4,5,6,7,8,9,10,11...
我想让消费者1抓住1,5,9,。。。消费者2捕捉2,6,10,。。。消费者要抓住3,7,11,。。。消费者4要抓住4,8,12。。。(好吧,不完全是这些数字,想法是数据应该并行处理,我不在乎哪个消费者处理哪个特定数字)
请记住,这需要并行完成,因为在实际html" target="_blank">应用程序中,消费者的工作非常昂贵。我希望消费者在不同的线程中执行,以使用多核系统的功能。
当然,我可以创建4个环形缓冲区,并将1个消费者连接到1个环形缓冲区。这样我就可以使用原始示例。但我觉得这是不对的。可能创建1个发布服务器(1个ringbuffer)和4个消费者是正确的,因为这正是我所需要的。
在google群组中添加一个非常相似的问题的链接:https://groups.google.com/forum/#!消息/lmax干扰器/-CLapWuwWLU/GHEP4UkxrAEJ
因此,我们有两种选择:
从环形缓冲区的规格中,您将看到每个消费者都会尝试处理您的ValueEvent
。在您的情况下,您不需要它。
我是这样解决的:
将处理过的字段添加到您的ValueEvent
中,当消费者获取事件时,他会在该字段上进行测试,如果已经处理过,他会转到下一个字段。
不是最漂亮的方式,但这就是缓冲区的工作方式。
编辑:我忘了提到代码部分取自常见问题解答。我不知道这种方法是比弗兰克的建议好还是坏。
该项目的文档严重不足,这是一个遗憾,因为它看起来不错
无论如何,请尝试以下剪贴(基于您的第一个链接)-在mono上测试,似乎还可以:
using System;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;
namespace DisruptorTest
{
public sealed class ValueEntry
{
public long Value { get; set; }
}
public class MyHandler : IEventHandler<ValueEntry>
{
private static int _consumers = 0;
private readonly int _ordinal;
public MyHandler()
{
this._ordinal = _consumers++;
}
public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
{
if ((sequence % _consumers) == _ordinal)
Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal);
else
Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal);
}
}
class Program
{
private static readonly Random _random = new Random();
private const int SIZE = 16; // Must be multiple of 2
private const int WORKERS = 4;
static void Main()
{
var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default);
for (int i=0; i < WORKERS; i++)
disruptor.HandleEventsWith(new MyHandler());
var ringBuffer = disruptor.Start();
while (true)
{
long sequenceNo = ringBuffer.Next();
ringBuffer[sequenceNo].Value = _random.Next();;
ringBuffer.Publish(sequenceNo);
Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value);
Console.ReadKey();
}
}
}
}
问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在
我正在尝试了解RxJava并发的一些细节,但我不确定我的想法是否正确。我对SubscribeOn/观察的工作原理有很好的了解,但我正在尝试确定池调度程序的一些细节。为此,我正在考虑尽可能简单地实现一个1-N生产者-消费者链,其中消费者的数量与CPU一样多。 根据文档,Schedulers.computation()由与内核一样多的线程池支持。但是,根据Reactive合约,运算符只能获得顺序调用。
我是Kafka的初学者。我知道具有相同组id的多个消费者不能在一个主题中使用来自同一个分区的消息。我想知道如果来自一个消费组的多个Kafka消费者从一个分区读取相同的消息会发生什么,为什么这是一件坏事。 。
有以下消费者代码: 然后我用脚本生成消息: 问题是,当我将消费者作为两个不同的进程启动时,我会在每个进程中收到新消息。但是,我希望它只发送给一个消费者,而不是广播。 在Kafka的文献中(https://kafka.apache.org/documentation.html)其中写道: 如果所有使用者实例都具有相同的使用者组,则其工作原理就像在使用者之间平衡负载的传统队列一样。 我发现这些消费者的
我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。 JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该
我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。