当前位置: 首页 > 工具软件 > Executor > 使用案例 >

Netty详解之四:EventExecutor框架

姬熙云
2023-12-01

Netty是按事件驱动模型来工作的,在涉及Netty的网络通信功能之前,我们先彻底剖析一下它的事件驱动机制,或者说是Netty的并发机制。
netty并发相关类全部位于io.netty.util.concurrent下面,居于核心位置的接口有两个:EventLoopGroup和EventLoop。

由于Netty并发机制相对比较独立,完全可独立于其他功能而被使用,所以这块的分析我们采用自顶向下的方式。Netty的并发机制实际上是java executor框架的一个实现,它比JDK的executor更加强大一些,当然也可以说是满足了Netty事件驱动模型的一些特定需求。

要能读懂这部分源码,需要理解java executor框架的原理。

EventExecutorGroup和EventExecutor

最顶层的两个接口是EventExecutorGroup和EventExecutor,先看EventExecutorGroup,它扩充了接口java.util.concurrent.ScheduledExecutorService,增加了一些方法。

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
    boolean isShuttingDown();
    Future<?> shutdownGracefully();
    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
    Future<?> terminationFuture();
    
	 //选择EventExecutor
    EventExecutor next();
}

关键点:

  • EventExecutorGroup继承自java的ScheduledExecutorService;
  • 从shutdown相关方法可以看出,它对线程组关闭过程有更精确的控制,客户代码可异步监听关闭事件;
  • 该接口还扩充了Iterable,也就内部线程的管理对客户代码不再是完全封闭的(这点对Netty很关键,因为需要将Channe绑定到指定的EventExecutor)。

再看EventExecutor:

public interface EventExecutor extends EventExecutorGroup {

    //要求子类固定返回self
    EventExecutor next();

    //所属的EventExecutorGroup
    EventExecutorGroup parent();

    //当前是否执行在该线程内;
    boolean inEventLoop();

	 //参数线程,是否和EventExecutor是同一个线程
    boolean inEventLoop(Thread thread);

	//创建一个Promise
    <V> Promise<V> newPromise();

	 //创建一个ProgressivePromise
    <V> ProgressivePromise<V> newProgressivePromise();

  	 //创建一个已经处于成功完成状态的Future
    <V> Future<V> newSucceededFuture(V result);

  	 //创建一个已经处于失败状态的Future
    <V> Future<V> newFailedFuture(Throwable cause);
}

比较令人困惑的是,它继承自EventExecutorGroup;其实换个角度也好理解:从概念上,EventExecutorGroup是一个多线程Executor,而EventExecutor就是一个单线程的Executor。

这里体现出netty的EventExecutor框架是比较中立的,并不限于只为channel服务,所以设计理念保持了相对独立性和完整性(这是设计子模块的好理念)。

关键点:

  • inEventLoop()方法判断当前线程是否在EventExecutor中,netty通过它可以限制某些IO操作必须在eventLoop执行中;
  • Promise,ProgressivePromise都是Future的子类,用来进行任务执行结果异步通知;
  • 外部代码可调用EventExecutor创建Promise;

最后一点,为同步任务状态提供了极大的灵活性,比如:可以要求某个任务通过指定的Promise来返回结果,这在java的Executor里是做不到的。后面我们会看到该机制大量的运用案例。

MultithreadEventExecutorGroup

这是一个实现EventExecutorGroup接口的抽象类,完成度比较高,Netty使用的EventLoopGroup就是从它继承的,先看看它的构造方法:

//AbstractEventExecutorGroup implements EventExecutorGroup 
public abstract class MultithreadEventExecutorGroup extend AbstractEventExecutorGroup {

    //Group所包含的EventExecutor数组
	 private final EventExecutor[] children;
	 
	 //children的只读外观集合
    private final Set<EventExecutor> readonlyChildren;
    
    //已经关闭child EventExecutor
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    
    //一个标记Group关闭状态的Futuire
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    
    //选择child EventExecutor的选择器对象,用来实现EventExecutorGroup.next()方法
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;


    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
		
		  
        if (executor == null) {
        	 //group将多线程实现,委托了给了ThreadPerTaskExecutor
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

		 //EventExecutor数组长度就是指定线程数量
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
            	   //创建EventExecutor的newChild是子类实现的,args参数列表,Group并不使用,而是透传给每个child EventExecutor
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
            	   //一旦这个过程发生失败,就把已经创建的EventLoop全部关闭掉,代码省略了
                if (!success) {
						...
                }
            }
        }
        
        //初始化EventExecutor选择器,默认的选择逻辑其实很简单,就是轮询
        chooser = chooserFactory.newChooser(children);

		 //监听每个EventExecutor的关闭状态,所有EventExecutor都关闭了,Group也就关闭了
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

		 //这里搞一个只读的EventLoop集合
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
    
    //具体创建什么样的EventExecutor,留给子类实现
    protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
    
    
    //默认的Thread Factory,它创建线程,并起了一个有规则的name
    protected ThreadFactory newDefaultThreadFactory() {
        return new DefaultThreadFactory(getClass());
    }

	//chooser选择下一个EventExecutor
    @Override
    public EventExecutor next() {
        return chooser.next();
    }

    //一个只读的迭代器
    @Override
    public Iterator<EventExecutor> iterator() {
        return readonlyChildren.iterator();
    }
}

小结:MultithreadEventExecutorGroup类似JDK的FixedThreadPool;而EventExecutorGroup和EventExecutor的具体实现类型是配对的,所以Group初始化时,由具体子类来创建所需的EventExecutor。

此时,我们反过来看看MultithreadEventExecutorGroup基类AbstractEventExecutorGroups实现,会更容易一些:

public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
    @Override
    public Future<?> submit(Runnable task) {
        return next().submit(task);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return next().schedule(command, delay, unit);
    }
    
    //类似方式实现了EventExecutorGroup定义的其他任务执行方法
    ...
}

小结:如果向EventExecutorGroup提交一个任务,它调度**下一个(next)**child EventExecutor来执行。

SingleThreadEventExecutor

MultithreadEventExecutorGroup与之对应的是SingleThreadEventExecutor,我们先稍微了解一下它的基类AbstractScheduledEventExecutor:

public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
    //方法省略了
    ...
}

AbstractScheduledEventExecutor实现了任务调度的功能,它完全就是java ScheduleExecutor的翻版,通过一个优先级队列存放调度任务(ScheduledFutureTask),以任务的下次执行时间作为优先级。

由于任务调度功能并非Netty的关键,这个抽象类就不深入了,直接进入SingleThreadEventExecutor。

NioEventLoop就是基于SingleThreadEventExecutor而实现的,非IO相关的功能全部在SingleThreadEventExecutor里,因此我们要重点介绍。

成员字段:

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

	 //Executor的状态常量
    private static final int ST_NOT_STARTED = 1;
    private static final int ST_STARTED = 2;
    private static final int ST_SHUTTING_DOWN = 3;
    private static final int ST_SHUTDOWN = 4;
    private static final int ST_TERMINATED = 5;

	 //一个占位符任务,必要时用来启动Thread
    private static final Runnable NOOP_TASK = new Runnable() {
        @Override
        public void run() {
            // Do nothing.
        }
    };

  	 //一个任务队列
    private final Queue<Runnable> taskQueue;

    //驱动Executor运行的线程
    private volatile Thread thread;
    
    //这个executor充当线程工厂的角色
    private final Executor executor;
    
    //interrupted标记
    private volatile boolean interrupted;

    private final CountDownLatch threadLock = new CountDownLatch(1);
    
    //shutdownHooks,Executor关闭时执行
    private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
    
    //这是一个标记位:向Executor添加任务能否唤醒阻塞的线程
    private final boolean addTaskWakesUp;
    
    //排队等待的任务数量上限
    private final int maxPendingTasks;
    
    //当任务无法提交时的处理器
    private final RejectedExecutionHandler rejectedExecutionHandler;

	 //上一次执行任务的时间戳
    private long lastExecutionTime;

    //该Executor的状态
    @SuppressWarnings({ "FieldMayBeFinal", "unused" })
    private volatile int state = ST_NOT_STARTED;

    //gracefulShutdown过程中的静默期
    private volatile long gracefulShutdownQuietPeriod;
    //gracefulShutdown总的时间限制
    private volatile long gracefulShutdownTimeout;
    //gracefulShutdown开始时间戳
    private long gracefulShutdownStartTime;

	 //同步该Executor终结状态的Future对象
    private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
}

如果你对java executor框架足够了解的话,那么上面大部分字段的语义是很容易理解的,本文也不再做过多的解释。

构造方法

选取了一个功能比较全的构造方法:

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
    
    //这里给executor包装了一层
    this.executor = ThreadExecutorMap.apply(executor, this);
    this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
    this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

SingleThreadEventExecutor的构造方法就是初始化一些成员字段,没啥讲的。ThreadExecutorMap运用装饰者模式包装了Executor参数,最终的效果是:SingleThreadEventExecutor里面的task,在运行时可通过ThreadExecutorMap.currentExecutor获得所属SingleThreadEventExecutor实例。ThreadExecutorMap的实现细节具体无关大局,不做分析。

拉取任务:takeTask

接下来分析SingleThreadEventExecutor几个任务执行相关的重要方法,第一个是takeTask,获取下个需要执行的任务:

protected Runnable takeTask() {
    BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
    for (;;) {
        //这里调度队列scheduledTaskQueue里取一个scheduledTask(调度队列在基类里)
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        if (scheduledTask == null) {
            //如果调度度列里没有任务,那就从taskQueue即可
            Runnable task = null;
            try {
                //taskQueue.take方法是阻塞的,
                //基类的schedule()方法会向taskQueue插入一个WAKEUP_TASK唤醒线程
                task = taskQueue.take();
                if (task == WAKEUP_TASK) {
                    task = null;
                }
            } catch (InterruptedException e) {
                // Ignore
            }
            return task;
        } else {
            //如果存在scheduledTask,还差delayNanos才触发,那么尝试从taskQueue.poll取任务,最多等待delayNanos
            long delayNanos = scheduledTask.delayNanos();
            Runnable task = null;
            if (delayNanos > 0) {
                try {
                    task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
                } catch (InterruptedException e) {
                    // Waken up.
                    return null;
                }
            }
            //如果没有task,此时应该再次判断scheduledTask是否已经触发才对
            //但是这里调用了fetchFromScheduledTaskQueue,该方法必有玄机
            if (task == null) {
                fetchFromScheduledTaskQueue();
                task = taskQueue.poll();
            }

            if (task != null) {
                return task;
            }
        }
    }
}


private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    for (;;) {
        //从scheduledTaskQueue取一个应该触发的scheduledTask
        Runnable scheduledTask = pollScheduledTask(nanoTime);
        if (scheduledTask == null) {
            return true;
        }
        //将scheduledTask加入taskQueue,如果加入失败(空间不足),只好又放回去
        if (!taskQueue.offer(scheduledTask)) {
            scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
    }
}

takeTask获取当前应该执行的Task,有可能是异步task,也可能是scheduledTask;如果没有任何task需要执行,该方法会阻塞;在内部实现中,到了触发时间的scheduledTask会被放入taskQueue,所以我们看到返回的总是taskQeue内的task。

同时管理两个taskQueue很容易出bug的,上面的编程技巧值得参考。

启动线程

启动线程的相关方法是execute,startThread,doStartThread,SingleThreadEventExecutor的执行线程在接受到任务时才会启动,所以execute方法内有启动线程的入口。

//提交任务
private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    //将任务加入队列
    addTask(task);
    //如果当前线程不是Executor线程,那么尝试启动它,否则说明Executor线程已经启动了
    if (!inEventLoop) {
        startThread();
        if (isShutdown()) {
			 //如果已经关闭了,拒绝任务,代码省略
        }
    }
	 //这个逻辑与线程启动无关,它的意思是:addTask无法自动唤醒Executor线程,那么调用wakeup方法来唤醒
    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

//唤醒线程,将WAKEUP_TASK任务入队列
protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop) {
        taskQueue.offer(WAKEUP_TASK);
    }
}

//精简了代码
private void startThread() {
	//将状态切换至ST_SHUTTING_STARTED
	doStartThread()
}
    
private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
			  //这里忽略了中断处理
			  ...
            boolean success = false;
            updateLastExecutionTime();
            try {
            	   //关键点只有这里,执行子类实现的run方法
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
            	  //这里有一段贼长的线程退出、异常处理逻辑,总结起来就干了两件事
            	  //1. 执行confirmShutdown尝试优雅关闭
            	  //2. 将状态切换至ST_SHUTTING_DOWN
            }
        }
    });
}

protected abstract void run();

小结:

  • 向SingleThreadEventExecutor提交任务时,会尝试启动线程;
  • SingleThreadEventExecutor虽然管理了两个taskQueue,又创建了线程,但是它的线程逻辑完全是由子类来实现的。**

特别说明:

addTaskWakesUp这个标记特别令人费解,它的值是在构造函数中指定的,换句话说,是由子类来指定的。
它的意思是:如果线程处于阻塞状态,addTask操作能否唤醒它;而不是,addTask操作是否应该唤醒它。它是子类告诉基类的一个事实,而不是子类对基类的配置。

为什么这样呢?因为SingleThreadEventExecutor.run方法是由子类实现的,那么线程到底会如何阻塞,如何唤醒阻塞,只有子类才知道。如果run实现只会阻塞在taskQueue上(addTaskWakesUp=true),此时上面的wakeup方法确实可以唤醒阻塞线程。否则的话,addTaskWakesUp=false,同时子类要重写wakeup方法实现自己的唤醒逻辑。

为什么addTaskWakesUp的逻辑如此令人费解,这是“继承”天然的缺陷,“继承”导致一个本来内聚性很高的逻辑,分散到了两个类里面,让人难以捉摸。github上还有人提了一个issue,抱怨addTaskWakesUp:https://github.com/netty/netty/issues/10023

ShutDown

我们只看shutdownGracefully方法(shutDown方法已经Depreacted了):

public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
    //已经正在关闭了,返回terminationFuture即可
    //说明:重复调用shutdown方法没有副作用
    if (isShuttingDown()) {
        return terminationFuture();
    }

 	//中间忽略了一段切换state的逻辑:状态切换至ST_SHUTTING_DOWN
 	...
 	
    gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
    gracefulShutdownTimeout = unit.toNanos(timeout);

    //如果线程尚未开始,启动线程  
    if (ensureThreadStarted(oldState)) {
        return terminationFuture;
    }

	 //唤醒线程:让线程自己关闭自己,当然要唤醒它喽
    if (wakeup) {
        taskQueue.offer(WAKEUP_TASK);
        if (!addTaskWakesUp) {
            wakeup(inEventLoop);
        }
    }
    return terminationFuture();
}

public boolean isShuttingDown() {
    return state >= ST_SHUTTING_DOWN;
}

shutdownGracefully方法看起来有点虎头蛇尾,这是因关闭逻辑必须在SingleThreadEventExecutor的线程内执行才安全,所以shutdownGracefully只干两件事:切换状态至ST_SHUTTING_DOWN,唤醒线程。

SingleThreadEventExecutor真正的关闭逻辑在confirmShutdown方法内;注意:该方法可被多次调用,返回值标识着——关闭是否已经成功,意味着线程能否退出。

protected boolean confirmShutdown() {
    //必须已经进ST_SHUTTING_DOWN状态
    if (!isShuttingDown()) {
        return false;
    }
    //当前线程必须是SingleThreadEventExecutor的执行线程
    if (!inEventLoop()) {
        throw new IllegalStateException("must be invoked from an event loop");
    }
	 //取消所的调度任务
    cancelScheduledTasks();

	 //记录shutDown开始时间
    if (gracefulShutdownStartTime == 0) {
        gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
    }

    //执行任务队列里面的所有任务,有任何任务被执行,runAllTasks返回true;
    //或者,执行所有的ShutdownHook,有任何hook被执行,runShutdownHooks返回true
    if (runAllTasks() || runShutdownHooks()) {
    
        //如果已经进入ST_SHUT_DOWN状态,关闭成功
        if (isShutdown()) {
            return true;
        }

		 // 如果 期望静默期==0,说明用户不期望关闭期间有再有任何任务提交,直接退出
        if (gracefulShutdownQuietPeriod == 0) {
            return true;
        }
        //否则本次尝试关闭失败,激活线程再尝试
        taskQueue.offer(WAKEUP_TASK);
        return false;
    }

    //如果关闭消耗的总时间已经超过gracefulShutdownTimeout,立即成功
    final long nanoTime = ScheduledFutureTask.nanoTime();
    if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
        return true;
    }

    //如果 静默期<期望静默期(gracefulShutdownQuietPeriod,本次关闭失败,还要等等看是否有新的任务进来,为了避免忙等,线程sleep了100毫秒
    //lastExecutionTime是最近一次任务,或ShutdownHook的执行时间
    if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
        taskQueue.offer(WAKEUP_TASK);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // Ignore
        }
        return false;
    }

    return true;
}

所谓优雅关闭(gracefulShutDown),要求EventExecutor将队列内的任务全部执行完(未触发的调度任务不执行),以及注册的ShutDownHook全部执行完,
confirmShutdown的逻辑确实有点绕,它的要点如下:

  • gracefulShutdownTimeout:是从shutDown开始,到关闭成功总的时间限制,并不代表关闭需要这么长时间;当然也不能保证关闭不超过这个时间,因为万一有个耗时任务就不好说了;
  • gracefulShutdownQuietPeriod:关闭静默期,Executor保证在这个时间段内,提交新的任务仍然会被执行;
  • confirmShutdown是一个尝试关闭方法,它会调用多次。

NioEventLoop.run

这一章并不打算涉及Socket相关的逻辑,但是为了讲清楚SingleThreadEventExecutor的工作,我们提前瞜一眼NioEventLoop.run方法骨架。

public final class NioEventLoop extends SingleThreadEventLoop {
    @Override
    protected void run() {
        int selectCnt = 0;
        for (;;) {
			  //处理IO事件
			  //调用runAllTask()执行任务
            try {
                //看看是否需要关闭
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
}

从上面的代码可知,SingleThreadEventExecutor只是提供了一个管理、运行任务的框架,具体的执行逻辑全在NioEventLoop内。NioEventLoop.run方法交替地处理IO请事件、执行任务队列里的任务,具体的细节后面还会再分析。

DefaultEventExecutorGroup和DefaultEventExecutor

Netty的concurrent模块内包含EventExecutorGroup和EventExecutor的默认实现,就是DefaultEventExecutorGroup和DefaultEventExecutor。

其中DefaultEventExecutor.run实现如下:

public final class DefaultEventExecutor extends SingleThreadEventExecutor {
    @Override
    protected void run() {
        for (;;) {
            Runnable task = takeTask();
            if (task != null) {
                task.run();
                updateLastExecutionTime();
            }

            if (confirmShutdown()) {
                break;
            }
        }
    }
}

DefaultEventExecutor完全就是一个任务执行器,我们可以在自己的代码里使用它。

总结

Netty为了保证高并发网络通信的性能,设计了一个EventExector框架,可以认为它是java executor框架一个特别的实现版本。相比JDK内Executor的实现,EventExector框架能够能满足Netty的一些额外需求,个人认为主要有两点:

  • 能够精确控制任务执行的线程;
  • 将任务执行与IO事件处理融合起来,放入一个统一的executor框架内。

在设计实现层面,EventExector模块和网络通信并没有紧耦合,它是一个中立的框架,可以被单独使用。

 类似资料: