【Dubbo】Consumer线程模型

柴意智
2023-12-01

1. 前言

之前的文章分析了Provider线程模型,本文开始分析客户端Consumer的线程模型,其实两者还是有很多相似之处的。

Consumer同样有IO线程和业务线程两类,IO线程负责和服务端建立连接和IO数据读写,业务线程主要处理Body反序列化,应该还包括服务端回调客户端的逻辑。相比于服务端,其实客户端的业务线程做的事很少,主要是解析响应结果。

Consumer在创建NettyClient时,也会对ChannelHandler进行包装,其中就包括具有线程派发能力的ChannelHandler,这意味着Consumer也拥有和Provider一样的线程派发功能,策略也可以自定义。

2. IO线程

之前的文章分析过,Consumer在引用远程服务时,会创建RegistryDirectory,它会去注册中心订阅服务,然后就能拿到ProviderUrls,再将ProviderUrls通过Protocol转换成对应的Invoker。以DubboInvoker为例,创建DubboInvoker对象时,Dubbo会创建ExchangeClient客户端,ExchangeClient会和服务端建立好连接,以便于发送后续的RPC调用请求。

默认情况下,针对同一个Provider节点,Consumer只会创建一个ExchangeClient实例,和服务端建立一条长连接,大家共用这一条连接。当然,也支持建立多条共享连接,参数shareconnections进行设置。Dubbo还支持针对某个Service额外建立独占连接,通过参数connections设置,不配或0表示使用共享连接。一般来说,使用默认的共享连接就好,不建议修改此配置,除非网络传输是性能瓶颈。

private ExchangeClient[] getClients(URL url) {
    // 是否使用共享连接
    boolean useShareConnect = false;
    // 连接数=0代表使用共享连接,否则创建独占连接
    int connections = url.getParameter(CONNECTIONS_KEY, 0);
    List<ReferenceCountExchangeClient> shareClients = null;
    if (connections == 0) {
        useShareConnect = true;
        // 使用共享连接,针对同一个address,会共享同一批Client
        String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
        connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
        shareClients = getSharedClient(url, connections);
    }
    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (useShareConnect) {
            clients[i] = shareClients.get(i);
        } else {
            // 初始化客户端
            clients[i] = initClient(url);
        }
    }
    return clients;
}

initClient()方法会创建客户端并和服务端建立连接,这里以NettyClient为例,NettyClient构造函数里会创建Bootstrap,绑定EventLoopGroup,随后和服务端建立连接。

private static final EventLoopGroup NIO_EVENT_LOOP_GROUP = eventLoopGroup(Constants.DEFAULT_IO_THREADS, "NettyClientWorker");

bootstrap = new Bootstrap();
bootstrap.group(NIO_EVENT_LOOP_GROUP)

NIO_EVENT_LOOP_GROUP是静态常量,可以把它看作是一个线程池,线程数是DEFAULT_IO_THREADS,它的值是CPU核心数+1但最多不超过32。
由此可见,所有的NettyClient都会共用这一个EventLoopGroup,意味着Consumer不管和多少台远程节点通信,最多只会创建DEFAULT_IO_THREADS个IO线程。

至此,IO线程创建完毕,它主要负责和服务端建立连接、消息的编解码、Body序列化、处理心跳等操作。

3. 业务线程

Consumer业务线程是在NettyClient构造函数里创建的,父类的构造函数会调用initExecutor()初始化业务线程池,默认使用cached线程池。

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    // 初始化线程池
    initExecutor(url);
    // 创建Bootstrap
    doOpen();
    // 连接到远程
    connect();
}

Dubbo创建线程池依赖DefaultExecutorRepository类,它会把创建好的线程池缓存到Map容器中,Key是远程服务端口Port,这意味着不同的Port会单独创建一个线程池。这里我有点不解,为啥要单独创建线程池?如果是为了线程隔离,那应该用Address作为Key才对啊,而且还得设置最大线程数,因为cached线程池的线程数默认是没有限制的,那理论上创建1个和N个没有区别,我不理解!!!
这里以CachedThreadPool为例,下面是创建线程池的代码:

public Executor getExecutor(URL url) {
    // 线程名前缀
    String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
    // 核心线程数
    int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
    // 最大线程数
    int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
    // 队列数 默认0
    int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
    // 线程活跃时间 默认60s
    int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
    return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                                  queues == 0 ? new SynchronousQueue<Runnable>() :
                                  (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                   : new LinkedBlockingQueue<Runnable>(queues)),
                                  new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}

相较于服务端来说,客户端的业务线程做的事情要少的多,主要的就是对服务端返回的数据做反序列化。然后Dubbo“参数回调”的高级特性,服务端可以回调客户端的逻辑,应该也是在业务线程上执行的。

4. Dispatcher

Consumer发送RPC调用请求是由IO线程完成的,客户端在接收到服务端响应的数据后,开始做消息解码得到Response,然后将消息派发到业务线程池,这个操作是由Dispatcher完成的。
Dispatcher接口定义很简单,它本身并不具备线程派发的能力,而是通过SPI自适应的方式,加载具有线程派发能力的ChannelHandler,再交给它去处理。

@SPI(AllDispatcher.NAME)
public interface Dispatcher {

    @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})
    ChannelHandler dispatch(ChannelHandler handler, URL url);
}

具有线程派发能力的ChannelHandler,是什么时候创建的呢?答案在NettyClient构造函数里,它会对ChannelHandler做包装。

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                                        .getAdaptiveExtension().dispatch(handler, url)));
}

Dubbo提供了五种线程派发策略,默认策略是all,即所有消息都会被派发到业务线程池执行。

策略说明
all所有消息都派发到线程池,包括请求,响应,连接事件,断开事件等
direct所有消息都不派发到线程池,全部在 IO 线程上直接执行
message只有请求和响应消息派发到线程池,其它消息均在 IO 线程上执行
execution只有请求消息派发到线程池,其它消息均在 IO 线程上执行
connection在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池

以AllChannelHandler为例,它对于所有的事件,都会创建ChannelEventRunnable,通过异步任务的方式提交到业务线程池执行,IO线程到此就结束了它的工作。以下是received()方法代码:

public void received(Channel channel, Object message) throws RemotingException {
    // 获取业务线程池
    ExecutorService executor = getPreferredExecutorService(message);
    try {
        // 提交异步任务,处理消息 IO线程到此结束
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
}

Dubbo Consumer端ChannelHandler的包装过程,如下所示:

NettyClientHandler#channelRead
>>MultiMessageHandler#received
>>>>HeartbeatHandler#received
>>>>>>AllChannelHandler#received
>>>>>>>>DecodeHandler#received
>>>>>>>>>>HeaderExchangeHandler#received

现在我们已经知道,在AllChannelHandler之前,消息是在IO线程上处理的,之后是在业务线程上处理的。

5. 总结

Consumer端也分IO线程和业务线程,IO线程负责和服务端建立连接、IO读写、消息编解码、心跳处理等。以Netty为例,所有的连接都共用同一个EventLoopGroup,线程数默认是CPU核心数+1但不会超过32。
默认的业务线程池使用cached线程池,线程数没有限制,但好在客户端的业务线程做的事情不多,倒也没什么太大的问题。
Consumer发送RPC请求,Request对象由IO线程负责编码并发送,然后服务端响应结果,IO线程负责接收数据并解码成Response对象,然后交给AllChannelHandler,它再把Response派发到业务线程,业务线程对Response里的Result进行反序列化,最终将结果返回给调用线程。

 类似资料: