当前位置: 首页 > 工具软件 > dispatcher > 使用案例 >

网络请求框架:Okhttp:Dispatcher分发器实现同步异步请求源码解析【三】

林德辉
2023-12-01

OKHttp3--Dispatcher分发器实现同步异步请求源码解析【三】_没有鱼了的博客-CSDN博客_okhttp3.dispatcher

对于OkHttp 的 Dispatcher分发器,它在Okhttp网络框架中非常重要,我们需要对它的作用原理理解透彻,他作为整个框架的任务调度员。

一:再OkHttp中 Dispatcher就是用来保存并管理用户产生的 同步请求 (RealCall)和异步请求(AsyncCall)。

二:在同步请求时,将请求添加到runningSyncCalls (正在执行的同步请求队列),并在请求结束时从队列中移除

在异步请求时,将封装成AsyncCall的异步请求线程根据条件添加到 runningAsyncCalls队列中(正在执行的异步请求队列)和 readyAsyncCalls(等待执行的异步请求队列),然后再执行AsyncCall,并且再请求结束时,从队列移除。

三:Dispatcher数据结构

       在Dispatcher中维护了三个队列,队列的类型是 ArrayDeque这是一个双端队列,既可以从队列的头部和尾部操作数据,能够实现 FIFO,不过不是线程安全的,所以在多环境中需要加锁。

       第一个队列:runningSyncCalls ,是一个正在执行的同步请求队列,所有我们添加的同步请求都会被添加到这里面,包括已被取消但没执行完的请求,队列的泛型是 RealCall对象。

      第二个队列:runningAsyncCalls 是一个正在执行的异步请求队列,所有我们添加的异步请求都会被添加到这里,包括被取消但没执行的请求,队列的泛型是 AsyncCall对象,它实现Runnable接口

     第三个队列: readyAsyncCalls 是一个等待执行的异步请求队列,由于Okhttp允许执行的异步请求数量必须在64个以内,且单个 host同时执行的最大请求数量在 5个之内,所以当我们添加的异步请求数量超过它时,该请求就会被添加到该队列,等 runningAsyncCalls 队列有空闲位置后再添加到里面。

   PS : 1:这里的 host 指的是 hostName主机名,每个主机名代表一个主机,每个主机都已一个唯一标识,即IP地址,但是每个主机名并一定是唯一的,多个主机可以通过路由 来共享要给IP 地址所以你可以理解为同时往同一个服务器上发送的请求数量不能超过5个。

2:Okhttp 使用这些队列的好处就是可以轻松实现并发请求,更方便的委会请求数以及后续对这些请求的操作(比如取消请求),大大提高网络请求效率,同时可以更好的管理请求数,防止同时运行的线程过多,导致出现 OOM。

四:线程池:在Okhttp中内部维护了一个线程池,用于执行异步请求 AsyncCall 。

       当异步请求进入 Dispatcher中,如果满足上面两个要求,该请求会被添加到 runningAsyncCall中,然后执行,如果不满足就将其添加到 readAsyncCalls 队列中,当一个异步请求结束时,会遍历 readAsyncCalls队列,再进行条件判断,符合条件就将请求从该队列移动到 runningAsyncCalls对列中并执行它。

五:源码分析

public final class Dispatcher {
  //最大并发请求数
  private int maxRequests = 64;
  //单个主机最大并发请求数
  private int maxRequestsPerHost = 5;
  
  private Runnable idleCallback;

  /** 执行AsyncCall的线程池 */
  private ExecutorService executorService;

  /** 等待执行的异步请求队列. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** 正在执行的异步请求队列,包含已取消但为执行完的请求 */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** 正在执行的同步请求队列,包含已取消但为执行完的请求 */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  //构造方法 接收一个线程池
  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher() {
  }

  //构建一个线程池
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

  /**
   * 设置并发执行的最大请求数
   */
  public synchronized void setMaxRequests(int maxRequests) {
    if (maxRequests < 1) {
      throw new IllegalArgumentException("max < 1: " + maxRequests);
    }
    this.maxRequests = maxRequests;
    promoteCalls();
  }

  public synchronized int getMaxRequests() {
    return maxRequests;
  }

  /**
   * 设置每个主机同时执行的最大请求数
   */
  public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
    if (maxRequestsPerHost < 1) {
      throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
    }
    this.maxRequestsPerHost = maxRequestsPerHost;
    promoteCalls();
  }

  public synchronized int getMaxRequestsPerHost() {
    return maxRequestsPerHost;
  }

  /**
   * 当分发器处于空闲状态下,即没有正在运行的请求,设置回调
   */
  public synchronized void setIdleCallback(Runnable idleCallback) {
    this.idleCallback = idleCallback;
  }

  /**
   * 执行异步请求
   * 当正在执行的异步请求数量小于64且单个host正在执行的请求数量小于5的时候,就执行该请求,并添加到队列
   * 否则添加到等待队列中
   */
  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

  /**
   * 取消所有请求
   */
  public synchronized void cancelAll() {
    for (AsyncCall call : readyAsyncCalls) {
      call.get().cancel();
    }

    for (AsyncCall call : runningAsyncCalls) {
      call.get().cancel();
    }

    for (RealCall call : runningSyncCalls) {
      call.cancel();
    }
  }

  //调整请求队列,将等待队列中的请求放入正在请求的队列
  private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // 如果正在执行请求的队列已经满了,那就不用调整了.
    if (readyAsyncCalls.isEmpty()) return; // 如果等待队列是空的,也不需要调整

    //遍历等待队列
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();
      //单个host正在执行的请求数量小于5的时候,将该请求添加到runningAsyncCalls中并执行它
      //同时从等待队列中删除它
      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // 如果正在执行请求的队列已经满了,就退出循环
    }
  }

  /** 返回单个host的请求数 */
  private int runningCallsForHost(AsyncCall call) {
    int result = 0;
    for (AsyncCall c : runningAsyncCalls) {
      if (c.host().equals(call.host())) result++;
    }
    return result;
  }

  /** 执行同步请求,只是将其添加到队列中 */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

  /** 异步请求执行完成调用. */
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

  /** 同步请求执行完成调用. */
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      //从队列中移除
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //异步请求才会走这个,调整队列
      if (promoteCalls) promoteCalls();
      //计算当前正在执行的同步和异步请求数量
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

  /** 返回当前正在等待执行的异步请求的快照. */
  public synchronized List<Call> queuedCalls() {
    List<Call> result = new ArrayList<>();
    for (AsyncCall asyncCall : readyAsyncCalls) {
      result.add(asyncCall.get());
    }
    return Collections.unmodifiableList(result);
  }

  /** 返回当前正在执行的异步请求的快照. */
  public synchronized List<Call> runningCalls() {
    List<Call> result = new ArrayList<>();
    result.addAll(runningSyncCalls);
    for (AsyncCall asyncCall : runningAsyncCalls) {
      result.add(asyncCall.get());
    }
    return Collections.unmodifiableList(result);
  }

  //返回等待执行的异步请求数量
  public synchronized int queuedCallsCount() {
    return readyAsyncCalls.size();
  }

  //计算正在执行的请求数量
  public synchronized int runningCallsCount() {
    return runningAsyncCalls.size() + runningSyncCalls.size();
  }
}
# OkHttpClient.java 的构造函数中,直接通过Builder 构建 Dispatcher

 OkHttpClient(OkHttpClient.Builder builder) {
        this.dispatcher = builder.dispatcher;
        this.proxy = builder.proxy;
        this.protocols = builder.protocols;
        this.connectionSpecs = builder.connectionSpecs;
        this.interceptors = Util.immutableList(builder.interceptors);
        this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
        this.eventListenerFactory = builder.eventListenerFactory;
        this.proxySelector = builder.proxySelector;
        this.cookieJar = builder.cookieJar;
        this.cache = builder.cache;
        this.internalCache = builder.internalCache;
        this.socketFactory = builder.socketFactory;
        boolean isTLS = false;

        ConnectionSpec spec;
        for(Iterator var3 = this.connectionSpecs.iterator(); var3.hasNext(); isTLS = isTLS || spec.isTls()) {
            spec = (ConnectionSpec)var3.next();
        }

        if (builder.sslSocketFactory == null && isTLS) {
            X509TrustManager trustManager = this.systemDefaultTrustManager();
            this.sslSocketFactory = this.systemDefaultSslSocketFactory(trustManager);
            this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
        } else {
            this.sslSocketFactory = builder.sslSocketFactory;
            this.certificateChainCleaner = builder.certificateChainCleaner;
        }

        this.hostnameVerifier = builder.hostnameVerifier;
        this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(this.certificateChainCleaner);
        this.proxyAuthenticator = builder.proxyAuthenticator;
        this.authenticator = builder.authenticator;
        this.connectionPool = builder.connectionPool;
        this.dns = builder.dns;
        this.followSslRedirects = builder.followSslRedirects;
        this.followRedirects = builder.followRedirects;
        this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
        this.connectTimeout = builder.connectTimeout;
        this.readTimeout = builder.readTimeout;
        this.writeTimeout = builder.writeTimeout;
        this.pingInterval = builder.pingInterval;
    }
# RealCall 的execute() 函数,会从 OkhttpClient中获取 Dispatcher对象,然后执行
  Dispatcher.execute() 函数

public Response execute() throws IOException {
        synchronized(this) {
            if (this.executed) {
                throw new IllegalStateException("Already Executed");
            }

            this.executed = true;
        }

        this.captureCallStackTrace();

        Response var2;
        try {
            this.client.dispatcher().executed(this);
            Response result = this.getResponseWithInterceptorChain();
            if (result == null) {
                throw new IOException("Canceled");
            }

            var2 = result;
        } finally {
            this.client.dispatcher().finished(this);
        }

        return var2;
    }

 类似资料: