Netty是按事件驱动模型来工作的,在涉及Netty的网络通信功能之前,我们先彻底剖析一下它的事件驱动机制,或者说是Netty的并发机制。
netty并发相关类全部位于io.netty.util.concurrent下面,居于核心位置的接口有两个:EventLoopGroup和EventLoop。
由于Netty并发机制相对比较独立,完全可独立于其他功能而被使用,所以这块的分析我们采用自顶向下的方式。Netty的并发机制实际上是java executor框架的一个实现,它比JDK的executor更加强大一些,当然也可以说是满足了Netty事件驱动模型的一些特定需求。
要能读懂这部分源码,需要理解java executor框架的原理。
最顶层的两个接口是EventExecutorGroup和EventExecutor,先看EventExecutorGroup,它扩充了接口java.util.concurrent.ScheduledExecutorService,增加了一些方法。
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
boolean isShuttingDown();
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
Future<?> terminationFuture();
//选择EventExecutor
EventExecutor next();
}
关键点:
再看EventExecutor:
public interface EventExecutor extends EventExecutorGroup {
//要求子类固定返回self
EventExecutor next();
//所属的EventExecutorGroup
EventExecutorGroup parent();
//当前是否执行在该线程内;
boolean inEventLoop();
//参数线程,是否和EventExecutor是同一个线程
boolean inEventLoop(Thread thread);
//创建一个Promise
<V> Promise<V> newPromise();
//创建一个ProgressivePromise
<V> ProgressivePromise<V> newProgressivePromise();
//创建一个已经处于成功完成状态的Future
<V> Future<V> newSucceededFuture(V result);
//创建一个已经处于失败状态的Future
<V> Future<V> newFailedFuture(Throwable cause);
}
比较令人困惑的是,它继承自EventExecutorGroup;其实换个角度也好理解:从概念上,EventExecutorGroup是一个多线程Executor,而EventExecutor就是一个单线程的Executor。
这里体现出netty的EventExecutor框架是比较中立的,并不限于只为channel服务,所以设计理念保持了相对独立性和完整性(这是设计子模块的好理念)。
关键点:
最后一点,为同步任务状态提供了极大的灵活性,比如:可以要求某个任务通过指定的Promise来返回结果,这在java的Executor里是做不到的。后面我们会看到该机制大量的运用案例。
这是一个实现EventExecutorGroup接口的抽象类,完成度比较高,Netty使用的EventLoopGroup就是从它继承的,先看看它的构造方法:
//AbstractEventExecutorGroup implements EventExecutorGroup
public abstract class MultithreadEventExecutorGroup extend AbstractEventExecutorGroup {
//Group所包含的EventExecutor数组
private final EventExecutor[] children;
//children的只读外观集合
private final Set<EventExecutor> readonlyChildren;
//已经关闭child EventExecutor
private final AtomicInteger terminatedChildren = new AtomicInteger();
//一个标记Group关闭状态的Futuire
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
//选择child EventExecutor的选择器对象,用来实现EventExecutorGroup.next()方法
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (executor == null) {
//group将多线程实现,委托了给了ThreadPerTaskExecutor
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//EventExecutor数组长度就是指定线程数量
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//创建EventExecutor的newChild是子类实现的,args参数列表,Group并不使用,而是透传给每个child EventExecutor
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
//一旦这个过程发生失败,就把已经创建的EventLoop全部关闭掉,代码省略了
if (!success) {
...
}
}
}
//初始化EventExecutor选择器,默认的选择逻辑其实很简单,就是轮询
chooser = chooserFactory.newChooser(children);
//监听每个EventExecutor的关闭状态,所有EventExecutor都关闭了,Group也就关闭了
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
//这里搞一个只读的EventLoop集合
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
//具体创建什么样的EventExecutor,留给子类实现
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
//默认的Thread Factory,它创建线程,并起了一个有规则的name
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
//chooser选择下一个EventExecutor
@Override
public EventExecutor next() {
return chooser.next();
}
//一个只读的迭代器
@Override
public Iterator<EventExecutor> iterator() {
return readonlyChildren.iterator();
}
}
小结:MultithreadEventExecutorGroup类似JDK的FixedThreadPool;而EventExecutorGroup和EventExecutor的具体实现类型是配对的,所以Group初始化时,由具体子类来创建所需的EventExecutor。
此时,我们反过来看看MultithreadEventExecutorGroup基类AbstractEventExecutorGroups实现,会更容易一些:
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
@Override
public Future<?> submit(Runnable task) {
return next().submit(task);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return next().schedule(command, delay, unit);
}
//类似方式实现了EventExecutorGroup定义的其他任务执行方法
...
}
小结:如果向EventExecutorGroup提交一个任务,它调度**下一个(next)**child EventExecutor来执行。
MultithreadEventExecutorGroup与之对应的是SingleThreadEventExecutor,我们先稍微了解一下它的基类AbstractScheduledEventExecutor:
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
//方法省略了
...
}
AbstractScheduledEventExecutor实现了任务调度的功能,它完全就是java ScheduleExecutor的翻版,通过一个优先级队列存放调度任务(ScheduledFutureTask),以任务的下次执行时间作为优先级。
由于任务调度功能并非Netty的关键,这个抽象类就不深入了,直接进入SingleThreadEventExecutor。
NioEventLoop就是基于SingleThreadEventExecutor而实现的,非IO相关的功能全部在SingleThreadEventExecutor里,因此我们要重点介绍。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
//Executor的状态常量
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
//一个占位符任务,必要时用来启动Thread
private static final Runnable NOOP_TASK = new Runnable() {
@Override
public void run() {
// Do nothing.
}
};
//一个任务队列
private final Queue<Runnable> taskQueue;
//驱动Executor运行的线程
private volatile Thread thread;
//这个executor充当线程工厂的角色
private final Executor executor;
//interrupted标记
private volatile boolean interrupted;
private final CountDownLatch threadLock = new CountDownLatch(1);
//shutdownHooks,Executor关闭时执行
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
//这是一个标记位:向Executor添加任务能否唤醒阻塞的线程
private final boolean addTaskWakesUp;
//排队等待的任务数量上限
private final int maxPendingTasks;
//当任务无法提交时的处理器
private final RejectedExecutionHandler rejectedExecutionHandler;
//上一次执行任务的时间戳
private long lastExecutionTime;
//该Executor的状态
@SuppressWarnings({ "FieldMayBeFinal", "unused" })
private volatile int state = ST_NOT_STARTED;
//gracefulShutdown过程中的静默期
private volatile long gracefulShutdownQuietPeriod;
//gracefulShutdown总的时间限制
private volatile long gracefulShutdownTimeout;
//gracefulShutdown开始时间戳
private long gracefulShutdownStartTime;
//同步该Executor终结状态的Future对象
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
}
如果你对java executor框架足够了解的话,那么上面大部分字段的语义是很容易理解的,本文也不再做过多的解释。
选取了一个功能比较全的构造方法:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
//这里给executor包装了一层
this.executor = ThreadExecutorMap.apply(executor, this);
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
SingleThreadEventExecutor的构造方法就是初始化一些成员字段,没啥讲的。ThreadExecutorMap运用装饰者模式包装了Executor参数,最终的效果是:SingleThreadEventExecutor里面的task,在运行时可通过ThreadExecutorMap.currentExecutor获得所属SingleThreadEventExecutor实例。ThreadExecutorMap的实现细节具体无关大局,不做分析。
接下来分析SingleThreadEventExecutor几个任务执行相关的重要方法,第一个是takeTask,获取下个需要执行的任务:
protected Runnable takeTask() {
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
for (;;) {
//这里调度队列scheduledTaskQueue里取一个scheduledTask(调度队列在基类里)
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
//如果调度度列里没有任务,那就从taskQueue即可
Runnable task = null;
try {
//taskQueue.take方法是阻塞的,
//基类的schedule()方法会向taskQueue插入一个WAKEUP_TASK唤醒线程
task = taskQueue.take();
if (task == WAKEUP_TASK) {
task = null;
}
} catch (InterruptedException e) {
// Ignore
}
return task;
} else {
//如果存在scheduledTask,还差delayNanos才触发,那么尝试从taskQueue.poll取任务,最多等待delayNanos
long delayNanos = scheduledTask.delayNanos();
Runnable task = null;
if (delayNanos > 0) {
try {
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// Waken up.
return null;
}
}
//如果没有task,此时应该再次判断scheduledTask是否已经触发才对
//但是这里调用了fetchFromScheduledTaskQueue,该方法必有玄机
if (task == null) {
fetchFromScheduledTaskQueue();
task = taskQueue.poll();
}
if (task != null) {
return task;
}
}
}
}
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
//从scheduledTaskQueue取一个应该触发的scheduledTask
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return true;
}
//将scheduledTask加入taskQueue,如果加入失败(空间不足),只好又放回去
if (!taskQueue.offer(scheduledTask)) {
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
}
}
takeTask获取当前应该执行的Task,有可能是异步task,也可能是scheduledTask;如果没有任何task需要执行,该方法会阻塞;在内部实现中,到了触发时间的scheduledTask会被放入taskQueue,所以我们看到返回的总是taskQeue内的task。
同时管理两个taskQueue很容易出bug的,上面的编程技巧值得参考。
启动线程的相关方法是execute,startThread,doStartThread,SingleThreadEventExecutor的执行线程在接受到任务时才会启动,所以execute方法内有启动线程的入口。
//提交任务
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
//将任务加入队列
addTask(task);
//如果当前线程不是Executor线程,那么尝试启动它,否则说明Executor线程已经启动了
if (!inEventLoop) {
startThread();
if (isShutdown()) {
//如果已经关闭了,拒绝任务,代码省略
}
}
//这个逻辑与线程启动无关,它的意思是:addTask无法自动唤醒Executor线程,那么调用wakeup方法来唤醒
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
//唤醒线程,将WAKEUP_TASK任务入队列
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop) {
taskQueue.offer(WAKEUP_TASK);
}
}
//精简了代码
private void startThread() {
//将状态切换至ST_SHUTTING_STARTED
doStartThread()
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
//这里忽略了中断处理
...
boolean success = false;
updateLastExecutionTime();
try {
//关键点只有这里,执行子类实现的run方法
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
//这里有一段贼长的线程退出、异常处理逻辑,总结起来就干了两件事
//1. 执行confirmShutdown尝试优雅关闭
//2. 将状态切换至ST_SHUTTING_DOWN
}
}
});
}
protected abstract void run();
小结:
特别说明:
addTaskWakesUp这个标记特别令人费解,它的值是在构造函数中指定的,换句话说,是由子类来指定的。
它的意思是:如果线程处于阻塞状态,addTask操作能否唤醒它;而不是,addTask操作是否应该唤醒它。它是子类告诉基类的一个事实,而不是子类对基类的配置。
为什么这样呢?因为SingleThreadEventExecutor.run方法是由子类实现的,那么线程到底会如何阻塞,如何唤醒阻塞,只有子类才知道。如果run实现只会阻塞在taskQueue上(addTaskWakesUp=true),此时上面的wakeup方法确实可以唤醒阻塞线程。否则的话,addTaskWakesUp=false,同时子类要重写wakeup方法实现自己的唤醒逻辑。
为什么addTaskWakesUp的逻辑如此令人费解,这是“继承”天然的缺陷,“继承”导致一个本来内聚性很高的逻辑,分散到了两个类里面,让人难以捉摸。github上还有人提了一个issue,抱怨addTaskWakesUp:https://github.com/netty/netty/issues/10023
我们只看shutdownGracefully方法(shutDown方法已经Depreacted了):
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
//已经正在关闭了,返回terminationFuture即可
//说明:重复调用shutdown方法没有副作用
if (isShuttingDown()) {
return terminationFuture();
}
//中间忽略了一段切换state的逻辑:状态切换至ST_SHUTTING_DOWN
...
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
gracefulShutdownTimeout = unit.toNanos(timeout);
//如果线程尚未开始,启动线程
if (ensureThreadStarted(oldState)) {
return terminationFuture;
}
//唤醒线程:让线程自己关闭自己,当然要唤醒它喽
if (wakeup) {
taskQueue.offer(WAKEUP_TASK);
if (!addTaskWakesUp) {
wakeup(inEventLoop);
}
}
return terminationFuture();
}
public boolean isShuttingDown() {
return state >= ST_SHUTTING_DOWN;
}
shutdownGracefully方法看起来有点虎头蛇尾,这是因关闭逻辑必须在SingleThreadEventExecutor的线程内执行才安全,所以shutdownGracefully只干两件事:切换状态至ST_SHUTTING_DOWN,唤醒线程。
SingleThreadEventExecutor真正的关闭逻辑在confirmShutdown方法内;注意:该方法可被多次调用,返回值标识着——关闭是否已经成功,意味着线程能否退出。
protected boolean confirmShutdown() {
//必须已经进ST_SHUTTING_DOWN状态
if (!isShuttingDown()) {
return false;
}
//当前线程必须是SingleThreadEventExecutor的执行线程
if (!inEventLoop()) {
throw new IllegalStateException("must be invoked from an event loop");
}
//取消所的调度任务
cancelScheduledTasks();
//记录shutDown开始时间
if (gracefulShutdownStartTime == 0) {
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
}
//执行任务队列里面的所有任务,有任何任务被执行,runAllTasks返回true;
//或者,执行所有的ShutdownHook,有任何hook被执行,runShutdownHooks返回true
if (runAllTasks() || runShutdownHooks()) {
//如果已经进入ST_SHUT_DOWN状态,关闭成功
if (isShutdown()) {
return true;
}
// 如果 期望静默期==0,说明用户不期望关闭期间有再有任何任务提交,直接退出
if (gracefulShutdownQuietPeriod == 0) {
return true;
}
//否则本次尝试关闭失败,激活线程再尝试
taskQueue.offer(WAKEUP_TASK);
return false;
}
//如果关闭消耗的总时间已经超过gracefulShutdownTimeout,立即成功
final long nanoTime = ScheduledFutureTask.nanoTime();
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
}
//如果 静默期<期望静默期(gracefulShutdownQuietPeriod,本次关闭失败,还要等等看是否有新的任务进来,为了避免忙等,线程sleep了100毫秒
//lastExecutionTime是最近一次任务,或ShutdownHook的执行时间
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
taskQueue.offer(WAKEUP_TASK);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
return false;
}
return true;
}
所谓优雅关闭(gracefulShutDown),要求EventExecutor将队列内的任务全部执行完(未触发的调度任务不执行),以及注册的ShutDownHook全部执行完,
confirmShutdown的逻辑确实有点绕,它的要点如下:
这一章并不打算涉及Socket相关的逻辑,但是为了讲清楚SingleThreadEventExecutor的工作,我们提前瞜一眼NioEventLoop.run方法骨架。
public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
//处理IO事件
//调用runAllTask()执行任务
try {
//看看是否需要关闭
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
从上面的代码可知,SingleThreadEventExecutor只是提供了一个管理、运行任务的框架,具体的执行逻辑全在NioEventLoop内。NioEventLoop.run方法交替地处理IO请事件、执行任务队列里的任务,具体的细节后面还会再分析。
Netty的concurrent模块内包含EventExecutorGroup和EventExecutor的默认实现,就是DefaultEventExecutorGroup和DefaultEventExecutor。
其中DefaultEventExecutor.run实现如下:
public final class DefaultEventExecutor extends SingleThreadEventExecutor {
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
}
}
}
DefaultEventExecutor完全就是一个任务执行器,我们可以在自己的代码里使用它。
Netty为了保证高并发网络通信的性能,设计了一个EventExector框架,可以认为它是java executor框架一个特别的实现版本。相比JDK内Executor的实现,EventExector框架能够能满足Netty的一些额外需求,个人认为主要有两点:
在设计实现层面,EventExector模块和网络通信并没有紧耦合,它是一个中立的框架,可以被单独使用。