Dubbo源码分析----Dispatcher和ThreadPool

东郭赞
2023-12-01

Dispatcher

Dispatcher是决定事件如何派发的策略,即将哪些事件派发线程池,还是说直接在当前线程中执行。

先看下接口的定义

@SPI(AllDispatcher.NAME)
public interface Dispatcher {

    @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"}) // 后两个参数为兼容旧配置
    ChannelHandler dispatch(ChannelHandler handler, URL url);

}

Dispatcher的几个实现如下

all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher

根据扩展机制,会根据URL的参数(diapatcher/dispather/channel.handler)获取对应的实现,如果没有设置,那么默认使用SPI上的实现,即all的实现

AllDispatcher

public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }

}

public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void connected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService(); 
        try{
            cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
        }catch (Throwable t)  {//ERROR}
    }

    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService(); 
        try{
            cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
        }catch (Throwable t)  {//ERROR}
    }

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t)  {//ERROR}
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService(); 
        try{
            cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
        }catch (Throwable t) {//ERROR}
    }

    private ExecutorService getExecutorService() {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) { 
            cexecutor = SHARED_EXECUTOR;
        }
        return cexecutor;
    }
}

可以看到这种情况下的事件全部交由线程池处理。
再看下构造方法,其直接调用父类WrappedChannelHandler的进行初始化

    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        //通过扩展机制获取对象线程池
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }

ExecutionDispatcher

/**
 * 除发送全部使用线程池处理
 * 
 * @author chao.liuc
 */
public class ExecutionDispatcher implements Dispatcher {

    public static final String NAME = "execution";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ExecutionChannelHandler(handler, url);
    }

}

public class ExecutionChannelHandler extends WrappedChannelHandler {

    public ExecutionChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void connected(Channel channel) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
    }

    public void disconnected(Channel channel) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
    }

    public void received(Channel channel, Object message) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
    }
}

与All类似(官网说:只请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。感觉不是啊)

DirectDispatcher

/**
 * 不派发线程池。
 * 
 * @author chao.liuc
 */
public class DirectDispatcher implements Dispatcher {

    public static final String NAME = "direct";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return handler;
    }
}

所有消息都不派发到线程池,全部在 IO 线程上直接执行。 这里只是将传入的handler返回,并没有中转到线程池处理,因为在Handler这里,用的是装饰者模式,其他的Dispatcher会将Handler包装一层,这一层是派发到线程池,如果不包装那就是走回原来的流程

MessageOnlyDispatcher

/**
 * 只有message receive使用线程池.
 * 
 * @author chao.liuc
 */
public class MessageOnlyDispatcher implements Dispatcher {

    public static final String NAME = "message";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new MessageOnlyChannelHandler(handler, url);
    }

}
public class MessageOnlyChannelHandler extends WrappedChannelHandler {

    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

}

只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。

ConnectionOrderedDispatcher

public class ConnectionOrderedDispatcher implements Dispatcher {

    public static final String NAME = "connection";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ConnectionOrderedChannelHandler(handler, url);
    }

}
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {

    protected final ThreadPoolExecutor connectionExecutor;
    private final int queuewarninglimit ;

    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(Constants.THREAD_NAME_KEY,Constants.DEFAULT_THREAD_NAME);
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                                     new NamedThreadFactory(threadName, true),
                                     new AbortPolicyWithReport(threadName, url)
            );  // FIXME 没有地方释放connectionExecutor!
        queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }

    public void connected(Channel channel) throws RemotingException {
       //....
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
       //....
    }

    public void disconnected(Channel channel) throws RemotingException {
        //....
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
        //....
    }

    public void received(Channel channel, Object message) throws RemotingException {
                //....
        cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.RECEIVED, exception));
        //....
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        //....
        cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
        //....
    }

    private void checkQueueLength(){
        if (connectionExecutor.getQueue().size() > queuewarninglimit){
            logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: "+connectionExecutor.getQueue().size()+" exceed the warning limit number :"+queuewarninglimit));
        }
    }
}

这种类型的,除了事件执行线程池,还信初始化了一个线程池,这个线程池类内部只有一个线程,然后将连接断开和连上的事件放到了该线程池处理

Dispatcher的内容比较简单,就大概过了一下。
另外,平时使用的是all,这种将所有事件都放到线程池中处理,会出现一种情况,假设一个请求过来的时候,线程池满了,所以报错,但是all会将错误事件也放回线程池返回,如果这个时候线程池还是满了,那么这个错误信息将无法发送回去,导致consumer会一直等待超时

ThreadPool

在Dispatcher中初始化线程池的代码如下:

ExecutorService executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class)
                       .getAdaptiveExtension().getExecutor(url);

通过扩展机制去获取对应类型的ThreadPool,看下接口定义

@SPI("fixed")
public interface ThreadPool {

    @Adaptive({Constants.THREADPOOL_KEY})
    Executor getExecutor(URL url);

}

根据扩展机制,可以知道,默认使用的fixed这个实现,且@Adaptive上声明了threadpool这个key,证明可以配置threadpool这个属性来指定使用哪种实现,下面看下Dubbo内部的3个线程池实现

/**
 * 此线程池启动时即创建固定大小的线程数,不做任何伸缩,来源于:<code>Executors.newFixedThreadPool()</code>
 * 
 * @see java.util.concurrent.Executors#newFixedThreadPool(int)
 * @author william.liangf
 */
public class FixedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 
                queues == 0 ? new SynchronousQueue<Runnable>() : 
                    (queues < 0 ? new LinkedBlockingQueue<Runnable>() 
                            : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}
/**
 * 此线程池可伸缩,线程空闲一分钟后回收,新请求重新创建线程,来源于:<code>Executors.newCachedThreadPool()</code>
 * 
 * @see java.util.concurrent.Executors#newCachedThreadPool()
 * @author william.liangf
 */
public class CachedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, 
                queues == 0 ? new SynchronousQueue<Runnable>() : 
                    (queues < 0 ? new LinkedBlockingQueue<Runnable>() 
                            : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}
/**
 * 此线程池一直增长,直到上限,增长后不收缩。
 * 
 * @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
 */
public class LimitedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
                queues == 0 ? new SynchronousQueue<Runnable>() : 
                    (queues < 0 ? new LinkedBlockingQueue<Runnable>() 
                            : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}
  1. FixedThreadPool:线程池默认核心线程数和最大线程数为200,通过threads属性可以配置;队列默认为0,通过queues属性可以配置,如果队列数为0,那么使用SynchronousQueue对象,小于0,则使用无界的队列LinkedBlockingQueue,否则使用有界的LinkedBlockingQueue

  2. CachedThreadPool:比FixedThreadPool多了个corethreads的属性来配置核心线程数,以及alive属性配置keepAliveTime参数,其他类似

3.LimitedThreadPool:这种类型keepAliveTime为Long.MAX_VALUE,即基本上不会自动减少线程数量

 类似资料: