前文讲了那么多理论,原理。现在通过实际的代码,来看看Disruptor2.0的几种实际应用代码。
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.0</version>
<scope>test</scope>
</dependency>
Event事件定义:
/**
* Event,RingBuffer的数据
* @author wangxi
* @date 2019-10-16 16:31
*/
@Slf4j
public class DisruptorEvent implements Serializable {
Integer param;
public void setParam(Integer i){
this.param = param;
}
public void say(){
log.info("i===>{},nowTime===>{}, nowThreadId===>{}", param, System.currentTimeMillis()+"", Thread.currentThread().getId()+"");
}
}
Event事件工厂
/**
* @author wangxi
* @date 2019-10-16 16:42
*/
public class DisruptorEventFactory implements EventFactory<DisruptorEvent> {
@Override
public DisruptorEvent newInstance() {
return new DisruptorEvent();
}
}
消费者EventHandler定义:
/**
* 消费者
* @author wangxi
* @date 2019-10-16 16:55
*/
public class DisruptorEventHandler implements WorkHandler<DisruptorEvent> {
@Override
public void onEvent(DisruptorEvent disruptorEvent) throws Exception {
disruptorEvent.say();
}
}
Publisher 生产者和Disruptor的启动
public class DisruptorPublisher {
private static final AtomicLong INDEX = new AtomicLong(1);
private Disruptor<DisruptorEvent> disruptor;
//开启
public void start(final int bufferSize, final int threadSize){
ThreadFactory threadFactory = runnable ->
new Thread(new ThreadGroup("disruptor"), runnable,
"disruptor-thread-" + INDEX.getAndIncrement());
//ProducerType.MULTI 多生产者
disruptor = new Disruptor<>(new DisruptorEventFactory(), bufferSize, threadFactory, ProducerType.MULTI, new BlockingWaitStrategy());
DisruptorEventHandler[] consumers = new DisruptorEventHandler[threadSize];
for (int i = 0; i < threadSize; i++) {
consumers[i] = new DisruptorEventHandler();
}
disruptor.handleEventsWithWorkerPool(consumers);
/**
如上是仅存在单个DisruptorEventHandler 的情况,如果存在一个如前文所属的菱形结构。存在DisruptorEventHandler[] a,DisruptorEventHandler[] b, DisruptorEventHandler[] c。c依赖a和b
则如下的结构:
disruptor.handleEventsWithWorkerPool(a).handleEventsWithWorkerPool(b).thenHandleEventsWithWorkerPool(consumerThree);
如果 c 依赖 b,b依赖a
disruptor.handleEventsWithWorkerPool(a).thenhandleEventsWithWorkerPool(b).thenHandleEventsWithWorkerPool(consumerThree);
**/
disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
disruptor.start();
}
public static void main(String[] args){
DisruptorPublisher disruptorPublisher = new DisruptorPublisher();
disruptorPublisher.start(2*2*2*2*2, 10);
RingBuffer<DisruptorEvent> ringBuffer = disruptorPublisher.disruptor.getRingBuffer();
for(int i =0;i<100;i++) {
ringBuffer.publishEvent(new EventTranslatorOneArg<DisruptorEvent, Integer>() {
/**
* 该回调运行于主线程
* @param o 由DisruptorEventFactory产生的
* @param l 统计,记录回掉的第几个Event
* @param o2 publishEvent 第二个参数回填这里
*/
@Override
public void translateTo(DisruptorEvent o, long l, Integer o2) {
//设置Event的参数
o.setParam(o2);
}
}, i);
}
}
}
对于菱形的依赖结构的实现如下:
DisruptorEventHandlerOne[] consumerOne = new DisruptorEventHandlerOne[threadSize];
DisruptorEventHandlerTwo[] consumerTwo = new DisruptorEventHandlerTwo[threadSize];
DisruptorEventHandlerThree[] consumerThree = new DisruptorEventHandlerThree[threadSize];
for (int i = 0; i < threadSize; i++) {
consumerOne[i] = new DisruptorEventHandlerOne();
consumerTwo[i] = new DisruptorEventHandlerTwo();
consumerThree[i] = new DisruptorEventHandlerThree();
}
disruptor.handleEventsWithWorkerPool(consumerOne).handleEventsWithWorkerPool(consumerTwo).thenHandleEventsWithWorkerPool(consumerThree);