前文了解了Disruptor与RingBuffer的基础知识后,接下来了解Disruptor对 ringbuffer的访问控制策略。
假设消费者(Consumer)是一个想从 Ring Buffer 里读取数据的线程,它可以访问 ConsumerBarrier对象——这个 对象由 RingBuffer 创建并且代表消费者与 RingBuffer 进行交互。就像 RingBuffer显然需要一个序号才能找到 下一个可用节点一样,消费者也需要知道它将要处理的序号——每个消费者都需要找到下一个它要访问的序 号。例如:消费者处理完了 RingBuffer 里序号8之前(包括 8)的所有数据,那么它期待访问的下一 个序号是 9。
消费者可以调用 ConsumerBarrier 对象的 waitFor() 方法,传递它所需要的下一个序号.
int nextSequence = 9;
final long availableSeq = consumerBarrier.waitFor(nextSequence);
//如果availableSeq为12,则表示节点 9,10,11 和 12 都已写入,消费者可以去处理
ConsumerBarrier 返回 RingBuffer 的最大可访问序号。ConsumerBarrier 有一个 WaitStrategy 方法来决定它如何等待这个序号(即waitFor方法无法返回时如何阻塞)。
WaitStrategy.Option.BLOCKING;
WaitStrategy.Option.YIELDING;
WaitStrategy.Option.BUSY_SPIN;
如上代码所示,当序号9还没有被生产者写入时,消费者会一直原地停留,等待数据被写入RingBuffer。并且,一旦数据写入后消费者会收到通知。消费者可以让 ConsumerBarrier 去拿这些序号节点里的数据了。
拿到了数据后,消费者(Consumer)会更新自己的标识(cursor)。
这样做是怎样有助于平缓延迟的峰值了——以前需要逐个节点地请求访问,现在消费者(Consumer)现在只需要简单的说“你队列里可以访问序号的数字比我请求这个要大的时候请通知我”,函数返回值会告诉它当前最大可访问数据的序号是多少。因为这些新的节点的确已经写入了数据(RingBuffer 本身的序号已经更新),而且消费者对这些节点的唯一操作是读而不是写,因此访问也不用加锁。
另一个好处是——你可以用多个消费者(Consumer)去读同一个 RingBuffer ,不需要加锁,也不需要用另外的队列来协调不同的线程(消费者)。这样你可以在 Disruptor 的协调下实现真正的并发数据处理。
BatchConsumer 代码是一个消费者的例子,通过使用BatchHandler, 就可以用 BatchConsumer 来完成上面提到的复杂工作,对付那些需要成批处理的节点(例如上文中要处理的 9~12的所有节点)而不用单独地去读取每一个节点。
本文的重点是:
- 不要让 Ring 重叠(即覆盖未被消费的数据);
- 如何通知消费者;
- 生产者一端的批处理;
- 以及多个生产者如何协同工作。
Disruptor代码给消费者提供了一些接口和辅助类,同样的RingBuffer还是与消费端一样提供了一个 ProducerBarrier对象,让生产者通过它来写入RingBuffer。
写入的过程涉及到两阶段提交 。生产者需要申请 RingBuffer里的下一个节点。然后,当生产者向节点写完数据,它将会调用 ProducerBarrier的commit方法。
再详细解释写入过程时,先来看看生产者是如何防止写入覆盖掉未被消费者消费的节点上。
ConsumerTrackingProducerBarrier对象拥有所有正在访问RingBuffer的消费者列表(这也是Disruptor与其他队列不同的地方),Disruptor由消费者负责通知它们处理到了哪个序列号,而不是 RingBuffer。所以,如果我们想确定我们没有让 RingBuffer重叠,需要检查所有的消费者们当前处理的位置。
假如有两个消费者C1,C2。C1消费到最大序号12上,C2有点儿落后——可能它在做 I/O 操作之类的——它停在序号3。
现在生产者想要写入 RingBuffer 中序号3占据的节点,因为它是 RingBuffer 当前游标的下一个节点。但是ProducerBarrier明白现在不能写入,因为有一个消费者正在占用它。
现在我们回头来看ProducerBarrier写入节点的过程:
第一步:更新序号
调用ProducerBarrier的nextEntry() 方法,这样会返回给你一个Entry 对象,这个对象就是 RingBuffer 的下一个节点。而如果这个节点被消费者占用(未被消费)则ProducerBarrier 停下来自旋(spins),等待,直到那个消费者离开。
如果到某一时候,C1消费到了序号9,那么此时对生产者来说,序号3将变成可写入状态。它会抢占这个节点上的Entry对象,把下一个序号(上述提到当前最大序号12,则下一个序号为13)更新成Entry的序号,然后把Entry返回给生产者。生产者可以接着往 Entry 里写入数据。
第二步:原子性提交
当生产者结束向Entry写入数据后,它会要求 ProducerBarrier 提交。
ProducerBarrier先等待RingBuffer 的游标追上当前的位置——如当前申请到的新节点是13,这时要保证RingBuffer的游标在12。对于存在多生产者的情况,对单生产者这毫无意义。因为没有其他人正在写入RingBuffer。
然后ProducerBarrier更新RingBuffer的游标到刚才写入的Entry序号(13)。接下来,ProducerBarrier会让消费者知道buffer中有新东西了(根据ConsumerBarrier上的WaitStrategy对象-不同的WaitStrategy实现以不同的方式来实现提醒)
现在C1就可以读Entry13数据,C2 可以读 Entry13以及前面的所有数据。
Disruptor 可以同时在生产者和消费者两端实现批处理。前文提到C2最后达到了序号 9 的数据。
而ProducerBarrier知道 RingBuffer 的大小,也知道最慢的消费者C2位置-9。因此它能够发现当前有哪些节点是可用的,即3,4,5,6,7,8。中间不需要再次检查消费者的位置。
前述场景中,读者可能认为ProducerBarrier拿到的序号直接来自RingBuffer的游标。然而,如果你看过代码的话,你会发现它是通过 ClaimStrategy获取的。我省略这个对象是为了简化描述并且在单个生产 者的情况下它不是很重要。
而如果是在多个生产者的场景下,你还需要其他东西来追踪序号。这个序号是指当前可写入的序号。这和“向RingBuffer的游标加1”不一样。如果你有一个以上的生产者同时在向RingBuffer写入,就有可能出现某些Entry正在被生产者写入但还没有提交的情况。
重新复习一下如何申请写入节点。每个生产者都向 ClaimStrategy申请下一个可用的节点。生产者1拿到序号13,这和上面单个生产者的情况一样。生产者2拿到序号 14,尽管 RingBuffer的当前游标仅仅指向 12。这是因为ClaimSequence不但负责分发序号,而且负责跟踪哪些序号已经被分配。
假设生产者1没有来得及提交数据。生产者2已经准备好提交了,并且向ProducerBarrier发出了请求。就像我们先前描述的一样,ProducerBarrier只有在RingBuffer游标到达准备提交的节点的前一个节点时它才会提交。在当前情况下,游标必须先到达序号13我们才能提交节点14的数据。但是我们不能这样做,因为生产者1没来得及提交。因此ClaimStrategy 就停在那儿自旋(spins),直到RingBuffer游标到达它应该在的位置。
你会发现,尽管生产者在不同的时间完成数据写入,但是RingBuffer的内容顺序总是会遵循nextEntry()的初始调用顺序。也就是说,如果一个生产者在写入RingBuffer的时候暂停了,只有当它解除暂停后,其他等待中的提交才会立即执行。
写入数据可见的先后顺序是由线程所抢占的位置的先后顺序决定的(nextEntry方法),而不是由它的提交先后决定的。但你可以想象这些线程从网络层中获取消息,这是和消息按照时间到达的先后顺序是没什么不同的,而两个线程竞争获取一个不同循序的位置。