当前位置: 首页 > 工具软件 > dispatcher > 使用案例 >

OkHttp源码解析(二)dispatcher

莘欣怿
2023-12-01

        上一篇文章,总结了okhttp的同步请求和异步请求,并跟踪源码进行了分析。我们发现,不管是同步请求还是异步请求,都离不开一个类Dispatcher。事实上,Dispatcher是okhttp的一个非常关键的类,是okhttp请求的分发器。这篇文章,我们重点分析一下Dispatcher的源码。

        Dispatcher的源码不多,200多行。但是,它的设计非常精妙,而且它是okhttp网络请求的核心类之一。对Dispatcher的源码解析,需要结合同步请求和异步请求来进行。其实,我们在上一篇文章对同步请求和异步请求进行源码分析的时候,已经对Dispatcher的关键代码进行了分析。这篇文章,我们更详细的从头到尾去剖析一下Dispatcher的源码。

一、Dispatcher类的几个变量

        首先,我们看一下Dispatcher类源码的一些变量,在这里,我不会深入去解释每个变量在后面的作用,我只是去总结一下这些变量是什么:

private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;

  /** Executes calls. Created lazily. */
  private @Nullable ExecutorService executorService;

  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

     (1)maxRequest:默认64。这是okhttp允许的最大请求数量。

     (2)maxRequestsPerHost :默认5。这是okhttp对同一主机允许的最大请求数量。

     (3)idleCallback:我们可以看到,他是一个Runnable,后面我会说一下他的作用。

     (4)executorService:不用多说,这个是Dispatcher用于执行请求的线程池。

     (5)readyAsyncCalls:"将会"被按顺序执行的异步请求队列。因此,他们只是被加进了一个就绪态的队列。。

     (6)runningAsyncCalls:正在执行的异步请求队列。也就是说该队列里的请求是正在被执行或者被取消掉的没执行完的异步请求。

     (7)runningSyncCalls:正在执行的同步请求队列。也就是说该队列里的请求是正在被执行或者被取消掉的没执行完的同步请求。

二、关键方法

1、executorService

        继续往下跟踪源码,构造函数不多说,再往下是线程池,我们看一下源码:

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

        在这里,简单说一下线程池的实现。我们创建一个线程池,需要几个关键的参数。corePoolSize:核心线程数;maxPoolSize:最大线程数;keepAliveTime:存活时间;workQueue:等待队列以及线程工厂和拒绝策略。对这几个关键参数,我不去做过多的解释,这是线程池的基础,相信大家比我熟悉。

        corePoolSize为0,maxPoolSize为Integer的最大值,keepAliveTime为60秒。在这里,有一个很有意思的设定。为什么说有意思呢,因为corePoolSize设置为0。熟悉线程池的朋友应该知道,当线程池中执行完所有的任务,并且在keepAliveTime的时间里没有来新的任务时,如果核心线程数不为0,那么线程池将无法关闭。如果设置核心线程数为0,那么到了60秒之后,线程池自动关闭。

2、setMaxRequests&&getMaxRequests

        这两个方法意思很明确,就是我们上面说过的最大请求数的get和set方法,然后会执行promoteAndExecute方法,我们后面去看这个方法:

  public void setMaxRequests(int maxRequests) {
    if (maxRequests < 1) {
      throw new IllegalArgumentException("max < 1: " + maxRequests);
    }
    synchronized (this) {
      this.maxRequests = maxRequests;
    }
    promoteAndExecute();
  }
  public synchronized int getMaxRequests() {
    return maxRequests;
  }

3、setMaxRequestsPerHost和getMaxRequestsPerHost

         这两个方法意思很明确,就是我们上面说过的每个主机最大请求数的get和set方法,然后会执行promoteAndExecute方法,我们后面去看这个方法:

public void setMaxRequestsPerHost(int maxRequestsPerHost) {
    if (maxRequestsPerHost < 1) {
      throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
    }
    synchronized (this) {
      this.maxRequestsPerHost = maxRequestsPerHost;
    }
    promoteAndExecute();
  }

  public synchronized int getMaxRequestsPerHost() {
    return maxRequestsPerHost;
  }

4、enqueue

        这就是上一篇文章说过的,dispatcher执行异步请求的方法。把call加入就绪的异步请求队列。这个和同步请求的实现不一样,同步请求是直接加入了runningSyncCalls,而异步请求是放进了readyAsyncCalls。然后会执行promoteAndExecute方法,我们后面去看这个方法:

  void enqueue(AsyncCall call) {
    synchronized (this) {
      readyAsyncCalls.add(call);
    }
    promoteAndExecute();
  }

5、cancelCall

        这个方法是取消请求的方法。通过源码我们不难发现,会遍历就绪异步队列,执行异步队列,执行同步队列,对其中所有的call进行取消的操作:

  public synchronized void cancelAll() {
    for (AsyncCall call : readyAsyncCalls) {
      call.get().cancel();
    }

    for (AsyncCall call : runningAsyncCalls) {
      call.get().cancel();
    }

    for (RealCall call : runningSyncCalls) {
      call.cancel();
    }
  }

6、promoteAndExecute

        接下来,我们看一下上面多次被提及到的promoteAndExecute方法。这个方法我们在上一篇文章中也分析过。在这里再去总结一下:

  private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

        这个方法,首选会去遍历就绪队列中的call,在遍历的过程中,会去做两个判断:(1)如果正在执行队列的请求数超过了最大的请求数容量则break,如果一个Host的请求比允许的每个host的请求容量小,那么将会continue,并且把异步请求添加到两个队列:可执行队列和正在执行异步请求队列。后面,去遍历可执行队列中的请求,并且通过executeOn方法,去执行异步请求。

 7、runningCallsForHost

        这个方法,返回的是对一个主机的请求数量,这个返回值就是用于上面的判断:

  /** Returns the number of running calls that share a host with {@code call}. */
  private int runningCallsForHost(AsyncCall call) {
    int result = 0;
    for (AsyncCall c : runningAsyncCalls) {
      if (c.get().forWebSocket) continue;
      if (c.host().equals(call.host())) result++;
    }
    return result;
  }

8、executed

        很熟悉的一个方法咯,执行同步请求的方法,把同步请求加入到同步请求执行队列。在后面,我们会看到同步请求队列和异步请求队列的用处。

  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

9、finished

        finished方法,我们在上一篇文章也多次提到过,在执行完同步请求或者异步请求后,会在finally中去执行finished。我们看一下finished具体做了什么:

  /** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call);
  }

  /** Used by {@code Call#execute} to signal completion. */
  void finished(RealCall call) {
    finished(runningSyncCalls, call);
  }

  private <T> void finished(Deque<T> calls, T call) {
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }

    boolean isRunning = promoteAndExecute();

    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
  }

        在这里,我们看到,首先进行移除call的操作。如果已经移除了,那么抛出一个异常,如果没有移除则进行移除。isRunning是promoteAndExecute方法的返回值,这个返回值的意思是当前是否正在执行一个请求。后面是一个判断,如果没有在执行请求并且idldCallback不为空,则执行run方法。在这里,用到了我们最上面提到的一个变量idleCallback,我们知道他是一个Runnable对象。我们看一下这个callBack的作用:

Set a callback to be invoked each time the dispatcher becomes idle (when the number of running calls returns to zero)

        设置每次调度程序空闲时调用的回调(当正在运行的调用数返回到零时)。也就是说,当正在执行的请求数为0的时候,Dispatcher处于idle状态,这时候,通过回调这个callback我们知道Dispatcher是否处于空闲状态。

10、runningCalls&&queuedCalls

        这两个方法,也就是各自把就绪队列和正在执行队列的请求加到了一个队列。这样,外部可以调用这两个方法,获取到就绪队列和正在执行队列中的请求:

  /** Returns a snapshot of the calls currently awaiting execution. */
  public synchronized List<Call> queuedCalls() {
    List<Call> result = new ArrayList<>();
    for (AsyncCall asyncCall : readyAsyncCalls) {
      result.add(asyncCall.get());
    }
    return Collections.unmodifiableList(result);
  }

  /** Returns a snapshot of the calls currently being executed. */
  public synchronized List<Call> runningCalls() {
    List<Call> result = new ArrayList<>();
    result.addAll(runningSyncCalls);
    for (AsyncCall asyncCall : runningAsyncCalls) {
      result.add(asyncCall.get());
    }
    return Collections.unmodifiableList(result);
  }

11、queuedCallsCount&&runningCallsCount

        这两个方法也是可以供外部调用的,返回的是就绪队列的请求数量和执行队列的请求数量。注意,这里的执行队列的请求数量,是同步请求和异步请求的和:

  public synchronized int queuedCallsCount() {
    return readyAsyncCalls.size();
  }

  public synchronized int runningCallsCount() {
    return runningAsyncCalls.size() + runningSyncCalls.size();
  }

        以上就是对Dispatcher类所有变量和方法的具体分析。最后,我们总结一下:不管是同步请求还是异步请求,都是由dispatcher来进行任务调度的,最终是由其内部维护的线程池执行网络请求。

 类似资料: