目 录CONTENT

文章目录

终于懂了系列之OKHTTP源码解析

小王同学
2024-08-03 / 0 评论 / 2 点赞 / 154 阅读 / 0 字

终于懂了系列之OKHTTP源码解析

本章主要介绍 OkHttp的实现,代码基于okhttp-3.8.0
使用 OkHttp实现 GET请求非常简单,例子如下:

public class GetExample {
  OkHttpClient client = new OkHttpClient();

  String run(String url) throws IOException {
    Request request = new Request.Builder()
        .url(url)
        .build();

    try (Response response = client.newCall(request).execute()) {
      return response.body().string();
    }
  }

  public static void main(String[] args) throws IOException {
    GetExample example = new GetExample();
    String response = example.run("https://raw.github.com/square/okhttp/master/README.md");
    System.out.println(response);
  }
}

这个简单的例子,展示了OkHttp的使用过程:

  1. 创建 OkHttpClient对象
  2. 构造请求 Request
  3. 调用 OkHttpClient发送 Request
  4. 解析请求结果

下面,我们以这个简单的例子为切入点,开始分析OkHttp内部的实现。

1. OkHttpClient以及Request的构造器

先看看 OkHttpClient 的构造器:

public OkHttpClient() {
  // 使用默认的 Builder 初始化 OkHttpClient
  this(new Builder());
}

OkHttpClient(Builder builder) {
  // 从 Builder 中获取并初始化各个属性
  this.dispatcher = builder.dispatcher;
  this.proxy = builder.proxy;
  this.protocols = builder.protocols;
  this.connectionSpecs = builder.connectionSpecs;
  this.interceptors = Util.immutableList(builder.interceptors);
  this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
  this.eventListenerFactory = builder.eventListenerFactory;
  this.proxySelector = builder.proxySelector;
  this.cookieJar = builder.cookieJar;
  this.cache = builder.cache;
  this.internalCache = builder.internalCache;
  this.socketFactory = builder.socketFactory;

  boolean isTLS = false;
  // 检查 connectionSpecs 是否包含 TLS 配置
  for (ConnectionSpec spec : connectionSpecs) {
    isTLS = isTLS || spec.isTls();
  }

  // 如果用户提供了 sslSocketFactory 或者不需要 TLS,则使用 builder 中的配置
  if (builder.sslSocketFactory != null || !isTLS) {
    this.sslSocketFactory = builder.sslSocketFactory;
    this.certificateChainCleaner = builder.certificateChainCleaner;
  } else {
    // 否则使用系统默认的 SSL 配置
    X509TrustManager trustManager = systemDefaultTrustManager();
    this.sslSocketFactory = systemDefaultSslSocketFactory(trustManager);
    this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
  }

  // 初始化剩余的属性
  this.hostnameVerifier = builder.hostnameVerifier;
  this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
      certificateChainCleaner);
  this.proxyAuthenticator = builder.proxyAuthenticator;
  this.authenticator = builder.authenticator;
  this.connectionPool = builder.connectionPool;
  this.dns = builder.dns;
  this.followSslRedirects = builder.followSslRedirects;
  this.followRedirects = builder.followRedirects;
  this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
  this.connectTimeout = builder.connectTimeout;
  this.readTimeout = builder.readTimeout;
  this.writeTimeout = builder.writeTimeout;
  this.pingInterval = builder.pingInterval;
}

构造器实现很简单,在默认构造器中传入了一个 OkHttpClient.Builder 建造者对象,然后将其中的参数复制给自己。

在 OkHttpClient.Builder 的构造器中有很多默认的值,如下注释:

public Builder() {
  dispatcher = new Dispatcher();    // 分发器,另有一个带线程池参数的构造器
  protocols = DEFAULT_PROTOCOLS;    // 支持的协议,默认为HTTP_2、HTTP_1_1
  connectionSpecs = DEFAULT_CONNECTION_SPECS;  // 传输层版本、连接协议
  // 事件监听器,3.8版本set方法还是package级别的,暂时不能设置
  eventListenerFactory = EventListener.factory(EventListener.NONE);
  proxySelector = ProxySelector.getDefault();   // 代理选择器
  cookieJar = CookieJar.NO_COOKIES;             // 读写Cookie的容器
  socketFactory = SocketFactory.getDefault();   // Socket工厂
  hostnameVerifier = OkHostnameVerifier.INSTANCE;// 主机名验证器
  certificatePinner = CertificatePinner.DEFAULT;
  proxyAuthenticator = Authenticator.NONE;      // 代理认证器
  authenticator = Authenticator.NONE;           // 本地认证器
  connectionPool = new ConnectionPool();        // 连接池
  dns = Dns.SYSTEM;                             // 域名
  followSslRedirects = true;                    // SSL重定向
  followRedirects = true;                       // 普通重定向
  retryOnConnectionFailure = true;              // 连接失败重试
  connectTimeout = 10_000;                      // 连接超时时间
  readTimeout = 10_000;                         // 读超时时间
  writeTimeout = 10_000;                        // 写超时时间
  pingInterval = 0;
}

上面各种属性都从名字大致可以知道是干啥用的。

此外,Request 的构造也很简单,字段如下:

final HttpUrl url;                // 请求的url
final String method;              // 请求方式
final Headers headers;            // 请求头
final @Nullable RequestBody body; // 请求体
final Object tag;                 // 请求的tag

我们接着看 client.newCall(request).execute() 这一行代码。
首先是 OkHttpCient.newCall(Request) 方法:

/**
  * Prepares the {@code request} to be executed at some point in the future.
  */
@Override public Call newCall(Request request) {
  return new RealCall(this, request, false /* for web socket */);
}

这里创建了一个 RealCall 对象,而 RealCall 实现了 Call 接口。Call 接口声明如下:

public interface Call extends Cloneable {
  /** 获得原始请求 */
  Request request();

  /** 同步执行请求 */
  Response execute() throws IOException;

  /** 异步执行请求 */
  void enqueue(Callback responseCallback);

  /** 尽可能取消请求。已经完成了的请求不能被取消 */
  void cancel();

  /**
   * 调用了execute()或者enqueue(Callback)后都是true
   */
  boolean isExecuted();

  boolean isCanceled();

  /** 创建一个新的、完全一样的Call对象,即使原对象状态为enqueued或者executed */
  Call clone();

  interface Factory {
    Call newCall(Request request);
  }
}

回到 RealCall ,看看它的构造器以及成员变量:

final OkHttpClient client;
final RetryAndFollowUpInterceptor retryAndFollowUpInterceptor;
final EventListener eventListener;

/** The application's original request unadulterated by redirects or auth headers. */
final Request originalRequest;
final boolean forWebSocket;

// Guarded by this.
private boolean executed;

RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
  // 事件监听器,默认为null,目前3.8版本也无法设置
  final EventListener.Factory eventListenerFactory = client.eventListenerFactory();

  this.client = client;
  this.originalRequest = originalRequest;
  this.forWebSocket = forWebSocket;
  // 创建一个重试、重定向拦截器
  this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);

  // TODO(jwilson): this is unsafe publication and not threadsafe.
  this.eventListener = eventListenerFactory.create(this);
}

3. RealCall.execute

接着看看重点 RealCall.execute 方法:

@Override public Response execute() throws IOException {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  captureCallStackTrace();
  try {
    client.dispatcher().executed(this);
    Response result = getResponseWithInterceptorChain();
    if (result == null) throw new IOException("Canceled");
    return result;
  } finally {
    client.dispatcher().finished(this);
  }
}

在上面的方法中我们可以知道,一旦 Call.execute 方法被执行,那么其 executed 就会被设置为 true,如果多次调用就会报错。
然后调用 client.dispatcher().executed(this);,该方法就是将 call 加入到 Dispatcher.runningSyncCalls队列中,没有其他多余的代码:

/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
  runningSyncCalls.add(call);
}

接着调用 getResponseWithInterceptorChain 进行网络请求并获取Response,该方法是 OkHttp 中的最重要的方法,我们稍后在介绍 RealCall.enqueue方法时再一起说。
紧接着就是finally代码块里面的 client.dispatcher().finished(this)。该方法也很简单,就是 Call 执行完毕后将其从 Dispatcher.runningSyncCalls 队列中移除,见下面的代码;同时,如果 promoteCalls 为true(此处入参false),还会执行 promoteCalls 方法,此方法是给异步调用准备的,具体代码后面会谈到;最后如果 runningAsyncCalls、runningSyncCalls 这俩正在执行的同步、异步队列之和为0,说明dispatcher处理空闲状态,那么调用 idleCallback.run通知外界dispatcher已经空闲了,该方法目前仅在测试用例中发现

/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
  finished(runningSyncCalls, call, false);
}

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
  int runningCallsCount;
  Runnable idleCallback;
  synchronized (this) {
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    if (promoteCalls) promoteCalls();
    runningCallsCount = runningCallsCount();
    idleCallback = this.idleCallback;
  }

  if (runningCallsCount == 0 && idleCallback != null) {
    idleCallback.run();
  }
}

4. RealCall.enqueue

顺便也说一下 RealCall.enqueue的方法。
同 execute方法类似,一旦 Call.enqueue 方法被执行,那么其 executed 就会被设置为 true,如果多次调用就会报错。
然后调用

client.dispatcher().enqueue(new AsyncCall(responseCallback));

方法开始了异步调用。

@Override public void enqueue(Callback responseCallback) {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  captureCallStackTrace();
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

这里先看一下 AsyncCall 的相关代码,enqueue 方法具体代码下面再说:

final class AsyncCall extends NamedRunnable {
  private final Callback responseCallback;

  AsyncCall(Callback responseCallback) {
    super("OkHttp %s", redactedUrl());
    this.responseCallback = responseCallback;
  }
  ...
  @Override protected void execute() {
    ...
  }
}

AsyncCall 的父类 NamedRunnable 是一个有 name 属性的 Runnable 抽象类,在执行代码前,会将当前线程名设置为 name,执行完毕后恢复。

/**
 * Runnable implementation which always sets its thread name.
 */
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

在了解了 AsyncCall 的大致结构,我们返回 Dispatcher.enqueue 方法:

void enqueue(RealCall.AsyncCall call) {
    // 同步块,确保线程安全
    synchronized(this) {
        // 将异步调用添加到准备队列中
        this.readyAsyncCalls.add(call);
    }

    // 提升并执行准备好的异步调用
    this.promoteAndExecute();
}

private boolean promoteAndExecute() {
    // 断言当前线程没有持有此对象的锁
    assert !Thread.holdsLock(this);

    // 创建一个可执行异步调用的列表
    List<RealCall.AsyncCall> executableCalls = new ArrayList();
    boolean isRunning;
    RealCall.AsyncCall asyncCall;
    // 同步块,确保线程安全
    synchronized(this) {
        // 获取准备队列的迭代器
        Iterator<RealCall.AsyncCall> i = this.readyAsyncCalls.iterator();

        // 无限循环,直到准备队列处理完毕
        while(true) {
            // 如果准备队列中还有异步调用
            if (i.hasNext()) {
                asyncCall = (RealCall.AsyncCall)i.next();
                // 如果正在运行的异步调用数小于最大请求数
                if (this.runningAsyncCalls.size() < this.maxRequests) {
                    // 如果当前主机的异步调用数小于每主机最大请求数
                    if (this.runningCallsForHost(asyncCall) < this.maxRequestsPerHost) {
                        // 从准备队列中移除此异步调用
                        i.remove();
                        // 将此异步调用添加到可执行列表中
                        executableCalls.add(asyncCall);
                        // 将此异步调用添加到正在运行队列中
                        this.runningAsyncCalls.add(asyncCall);
                    }
                    // 继续处理下一个异步调用
                    continue;
                }
            }

            // 判断当前是否有正在运行的调用
            isRunning = this.runningCallsCount() > 0;
            break;
        }
    }

    // 遍历可执行列表并执行每个异步调用
    int i = 0;
    for(int size = executableCalls.size(); i < size; ++i) {
        asyncCall = (RealCall.AsyncCall)executableCalls.get(i);
        // 在执行服务上执行异步调用
        asyncCall.executeOn(this.executorService());
    }

    // 返回当前是否有正在运行的调用
    return isRunning;
}

maxRequests 和 maxRequestsPerHost 都有默认值,且有setter方法可以设置具体值,两者的setter方法最后会执行 promoteCalls 方法尝试执行异步任务。
在 enqueue 方法中,首先将异步任务加到准备队列readyAsyncCalls中,紧接着就调用promoteAndExecute

接着在promoteAndExecute中,会遍历准备队列,检查「正在运行的异步请求数」以及「call对应的host上的异步请求数」是否达到了阈值。如果还没有达到阈值,那么加入到 runningAsyncCalls 队列中,同时开始执行请求;否则加入到 readyAsyncCalls 队列中进行等待。

这里出现了一个 executorService() ,这是一个单例实现的线程池,该线程池也可以在 Dispatcher 的构造器中注入。

/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;

public Dispatcher(ExecutorService executorService) {
  this.executorService = executorService;
}

public synchronized ExecutorService executorService() {
  if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
  }
  return executorService;
}

这里我们可以看出来,这是一个典型的 CachedThreadPool 。
这是一个线程数量不定的线程池,他只有非核心线程,并且其最大线程数为 Integer.MAX_VALUE 。线程池中的空闲线程都有超时机制,这个超时时常为60s,超过这个时间的闲置线程就会被回收。SynchronousQueue 可以简单的理解为一个无法存储元素的队列,因此这将导致任何任务都会立刻执行。
从其特性来看,这类线程池适合执行大量耗时较少的任务。当整个线程池处理闲置状态时,线程池中的线程都会因为超时而被停止,这个时候 CachedThreadPool 之中实际上是没有线程的,它几乎不占用任何系统资源。

void executeOn(ExecutorService executorService) {
    // 断言当前线程没有持有 RealCall.this.client.dispatcher() 的锁
    assert !Thread.holdsLock(RealCall.this.client.dispatcher());

    // 定义一个布尔变量 success,用于标记任务是否成功提交
    boolean success = false;

    try {
        // 尝试在给定的执行器服务上执行此任务
        executorService.execute(this);
        // 如果没有抛出异常,则标记任务提交成功
        success = true;
    } catch (RejectedExecutionException var8) {
        // 如果执行被拒绝,捕获 RejectedExecutionException 异常
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        // 将原始异常作为原因附加到新的 IOException 中
        ioException.initCause(var8);
        // 通知监听器调用失败
        RealCall.this.eventListener.callFailed(RealCall.this, ioException);
        // 回调响应失败
        this.responseCallback.onFailure(RealCall.this, ioException);
    } finally {
        // 如果任务没有成功提交
        if (!success) {
            // 通知调度器此任务已完成
            RealCall.this.client.dispatcher().finished(this);
        }
    }
}

提交到线程池后,AsyncCall.run 方法就会被调用,又因为 AsyncCall 继承了 NamedRunnable ,所以最后执行的是 AsyncCall.execute 方法:

protected void execute() {
    boolean signalledCallback = false;
    RealCall.this.timeout.enter();
    try {
        // 获取带有拦截器链的响应
        Response response = RealCall.this.getResponseWithInterceptorChain();
        // 检查请求是否已被取消
        if (RealCall.this.retryAndFollowUpInterceptor.isCanceled()) {
            // 标记已经回调
            signalledCallback = true;
            // 回调失败,传递取消异常
            this.responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
            // 标记已经回调
            signalledCallback = true;
            // 回调成功,传递响应对象
            this.responseCallback.onResponse(RealCall.this, response);
        }
    } catch (IOException var6) {
  
        IOException e = RealCall.this.timeoutExit(var6);
        if (signalledCallback) {
    
            Platform.get().log(4, "Callback failure for " + RealCall.this.toLoggableString(), e);
        } else {
      
            RealCall.this.eventListener.callFailed(RealCall.this, e);
            this.responseCallback.onFailure(RealCall.this, e);
        }
    } finally {
    
        RealCall.this.client.dispatcher().finished(this);
    }
}

首先调用 getResponseWithInterceptorChain 进行网络请求并获取Response,然后根据请求是否被取消,调用对应的回调方法。最后调用 client.dispatcher().finished(this) 方法在 runningAsyncCalls 方法中移除call,并尝试执行其他的异步方法。

我们先看 client.dispatcher().finished(this) 方法,最后再看 getResponseWithInterceptorChain 方法的实现:

private <T> void finished(Deque<T> calls, T call) {
        Runnable idleCallback;
        synchronized(this) {
            if (!calls.remove(call)) {
                throw new AssertionError("Call wasn't in-flight!");
            }

            idleCallback = this.idleCallback;
        }

        boolean isRunning = this.promoteAndExecute();
        if (!isRunning && idleCallback != null) {
            idleCallback.run();
        }

    }

private boolean promoteAndExecute() {
        assert !Thread.holdsLock(this);

        List<RealCall.AsyncCall> executableCalls = new ArrayList();
        boolean isRunning;
        RealCall.AsyncCall asyncCall;
        synchronized(this) {
            Iterator<RealCall.AsyncCall> i = this.readyAsyncCalls.iterator();

            while(true) {
                if (i.hasNext()) {
                    asyncCall = (RealCall.AsyncCall)i.next();
                    if (this.runningAsyncCalls.size() < this.maxRequests) {
                        if (this.runningCallsForHost(asyncCall) < this.maxRequestsPerHost) {
                            i.remove();
                            executableCalls.add(asyncCall);
                            this.runningAsyncCalls.add(asyncCall);
                        }
                        continue;
                    }
                }

                isRunning = this.runningCallsCount() > 0;
                break;
            }
        }

        int i = 0;

        for(int size = executableCalls.size(); i < size; ++i) {
            asyncCall = (RealCall.AsyncCall)executableCalls.get(i);
            asyncCall.executeOn(this.executorService());
        }

        return isRunning;
    }

finished 方法不用多说;promoteCalls 方法的代码也很清晰明了,这是一个将等待队列中的任务移到 runningAsyncCalls 且执行的过程。

5. getResponseWithInterceptorChain(重头戏)

最后,终于来到了 getResponseWithInterceptorChain方法,如下所示。

Response getResponseWithInterceptorChain() throws IOException {
        List<Interceptor> interceptors = new ArrayList();
        interceptors.addAll(this.client.interceptors());
        interceptors.add(this.retryAndFollowUpInterceptor);
        interceptors.add(new BridgeInterceptor(this.client.cookieJar()));
        interceptors.add(new CacheInterceptor(this.client.internalCache()));
        interceptors.add(new ConnectInterceptor(this.client));
        if (!this.forWebSocket) {
            interceptors.addAll(this.client.networkInterceptors());
        }

        interceptors.add(new CallServerInterceptor(this.forWebSocket));
        Interceptor.Chain chain = new RealInterceptorChain(interceptors, (StreamAllocation)null, (HttpCodec)null, (RealConnection)null, 0, this.originalRequest, this, this.eventListener, this.client.connectTimeoutMillis(), this.client.readTimeoutMillis(), this.client.writeTimeoutMillis());
        return chain.proceed(this.originalRequest);
    }

首先将以下拦截器依次加入到List中:

  1. OkHttpClient设置的拦截器 interceptors()
  2. 重试、重定向拦截器 RetryAndFollowUpInterceptor
  3. 把用户请求转换为服务器请求、把服务器返响应转换为用户响应的 BridgeInterceptor
  4. 读取缓存直接返回、将响应写入到缓存中的 CacheInterceptor
  5. 与服务器建立连接的 ConnectInterceptor
  6. OkHttpClient设置的网络拦截器 networkInterceptors()
  7. 真正执行网络请求的 CallServerInterceptor

将所有的拦截器保存在 interceptors后,创建一个拦截器责任链 RealInterceptorChain,并调用其 proceed 开始处理网络请求。

下面解释一下责任链模式是如何表现出来的?

首先看上面创建 RealInterceptorChain 的方法以及 RealInterceptorChain的代码:

// RealCall.java
Interceptor.Chain chain = new RealInterceptorChain(
    interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);

// RealInterceptorChain.java
public final class RealInterceptorChain implements Interceptor.Chain {
  private final List<Interceptor> interceptors;
  ...
  private final int index;
  private final Request request;

  public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
      HttpCodec httpCodec, RealConnection connection, int index, Request request) {
    this.interceptors = interceptors;
    ...
    this.index = index;
    this.request = request;
  }
  ...
  @Override public Response proceed(Request request) throws IOException {
    return proceed(request, streamAllocation, httpCodec, connection);
  }

  public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();
    ...
    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, streamAllocation, httpCodec, connection, index + 1, request);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);
    ...
    return response;
  }
}

在不考虑 OkHttpClient.interceptor()的情况下,上面这段代码的解释如下:

  1. 在 getResponseWithInterceptorChain 创建了一个 index为0的 RealInterceptorChain(下称链),接着就调用了其 proceed方法
  2. 在 RealInterceptorChain.proceed方法中:index=0
  3. 创建了一个 index为index+1的链 next
  4. 然后对当前 index的拦截器(即 RetryAndFollowUpInterceptor)执行 interceptor.intercept(next)。
  5. 在 RetryAndFollowUpInterceptor 方法中执行了 chain.proceed方法,而这里的 chain是 RealInterceptorChain 实例,所以回到了 RealInterceptorChain.proceed方法中
  6. 此时 index=1,同理链条可以一直执行下去;index=2..n-1
  7. 直到遇到最后一个拦截器 CallServerInterceptor,因此这是最后一个拦截器了,因此链肯定不能继续下去,不然就报错了 if (index >= interceptors.size()) throw new AssertionError();,而且在 CallServerInterceptor.intercept方法中也所搜不到 proceed关键字。又因为这是最后一个拦截器,所以肯定是负责和服务器建立实际通讯的。
  8. Response的返回与Request相反,会从最后一个开始依次往前经过这些 Intercetor

下图为OkHttp工作的大致流程:

OkHttp总体工作流程

接下来,我们看一下各个拦截器具体的代码。

6. RetryAndFollowUpInterceptor

@Override public Response intercept(Chain chain) throws IOException {
  Request request = chain.request();

  streamAllocation = new StreamAllocation(
      client.connectionPool(), createAddress(request.url()), callStackTrace);

  int followUpCount = 0;
  Response priorResponse = null;
  while (true) {
    if (canceled) {
      streamAllocation.release();
      throw new IOException("Canceled");
    }

    Response response = null;
    boolean releaseConnection = true;
    // 1️⃣
    try {
      response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
      releaseConnection = false;
    } catch (RouteException e) {
      // The attempt to connect via a route failed. The request will not have been sent.
      if (!recover(e.getLastConnectException(), false, request)) {
        throw e.getLastConnectException();
      }
      releaseConnection = false;
      continue;
    } catch (IOException e) {
      // An attempt to communicate with a server failed. The request may have been sent.
      boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
      if (!recover(e, requestSendStarted, request)) throw e;
      releaseConnection = false;
      continue;
    } finally {
      // We're throwing an unchecked exception. Release any resources.
      if (releaseConnection) {
        streamAllocation.streamFailed(null);
        streamAllocation.release();
      }
    }

    // 2️⃣
    // Attach the prior response if it exists. Such responses never have a body.
    if (priorResponse != null) {
      response = response.newBuilder()
          .priorResponse(priorResponse.newBuilder()
                  .body(null)
                  .build())
          .build();
    }

    // 3️⃣
    Request followUp = followUpRequest(response);

    if (followUp == null) {
      if (!forWebSocket) {
        streamAllocation.release();
      }
      return response;
    }

    closeQuietly(response.body());

    if (++followUpCount > MAX_FOLLOW_UPS) {
      streamAllocation.release();
      throw new ProtocolException("Too many follow-up requests: " + followUpCount);
    }

    if (followUp.body() instanceof UnrepeatableRequestBody) {
      streamAllocation.release();
      throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
    }

    if (!sameConnection(response, followUp.url())) {
      streamAllocation.release();
      streamAllocation = new StreamAllocation(
          client.connectionPool(), createAddress(followUp.url()), callStackTrace);
    } else if (streamAllocation.codec() != null) {
      throw new IllegalStateException("Closing the body of " + response
          + " didn't close its backing stream. Bad interceptor?");
    }

    request = followUp;
    priorResponse = response;
  }
}

上面这些代码比较简单

  1. 首先正常进行请求,如果遇到了异常,根据情况看是否可以恢复:若不能恢复,则抛出异常,结束请求;若正常请求成功,那么在finally块中释放资源
  2. 如果请求是重试后的请求,那么将重试前请求的响应体设置为null并加到当前响应体 priorResponse字段中
  3. 根据Response的响应码判断是否需要重试:否不需要,则返回respone;若需要,则会检查重试次数是否达到阈值、是否可以继续重试
  4. 继续执行while循环,返回步骤1

这里需要注意的是,在调用 chain.proceed方法时,将创建好的 StreamAllocation对象作为参数传入了 proceed方法,所以之后的 RealInterceptorChain.streamAllocation就可以使用了。

streamAllocation = new StreamAllocation(
    client.connectionPool(), createAddress(request.url()), callStackTrace);
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);

7. BridgeInterceptor

直接上代码:

@Override public Response intercept(Chain chain) throws IOException {
  Request userRequest = chain.request();
  Request.Builder requestBuilder = userRequest.newBuilder();

  // 1️⃣
  // 获取用户请求的请求体
RequestBody body = userRequest.body();
if (body != null) {
    // 获取请求体的内容类型
    MediaType contentType = body.contentType();
    if (contentType != null) {
        // 设置请求头中的 "Content-Type"
        requestBuilder.header("Content-Type", contentType.toString());
    }

    // 获取请求体的内容长度
    long contentLength = body.contentLength();
    if (contentLength != -1) {
        // 如果内容长度有效,设置请求头中的 "Content-Length"
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        // 移除请求头中的 "Transfer-Encoding"
        requestBuilder.removeHeader("Transfer-Encoding");
    } else {
        // 如果内容长度无效,设置请求头中的 "Transfer-Encoding" 为 "chunked"
        requestBuilder.header("Transfer-Encoding", "chunked");
        // 移除请求头中的 "Content-Length"
        requestBuilder.removeHeader("Content-Length");
    }
}

// 如果用户请求的请求头中没有 "Host",设置 "Host" 请求头
if (userRequest.header("Host") == null) {
    requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}

// 如果用户请求的请求头中没有 "Connection",设置 "Connection" 请求头为 "Keep-Alive"
if (userRequest.header("Connection") == null) {
    requestBuilder.header("Connection", "Keep-Alive");
}

// 如果我们添加了 "Accept-Encoding: gzip" 请求头,我们也要负责解压传输流
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
    transparentGzip = true;
    // 设置请求头中的 "Accept-Encoding" 为 "gzip"
    requestBuilder.header("Accept-Encoding", "gzip");
}

// 从 cookieJar 中加载与请求 URL 相关的 Cookie
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
    // 如果有 Cookie,设置请求头中的 "Cookie"
    requestBuilder.header("Cookie", cookieHeader(cookies));
}

// 如果用户请求的请求头中没有 "User-Agent",设置 "User-Agent" 请求头
if (userRequest.header("User-Agent") == null) {
    requestBuilder.header("User-Agent", Version.userAgent());
}

// 发送网络请求并获取响应
Response networkResponse = chain.proceed(requestBuilder.build());

// 处理响应头中的 Cookies
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
  //构建响应的基本信息
  //使用 networkResponse.newBuilder() 方法创建一个响应构建器,并将原始请求 userRequest 添加到响应中。
  Response.Builder responseBuilder = networkResponse.newBuilder()
      .request(userRequest);
  // 处理 gzip 压缩的响应体
  // 检查是否启用了透明 gzip 压缩 (transparentGzip),并且响应头中的 Content-Encoding 是否为 gzip,以及响应体是否存在。
如果条件满足,说明响应体是经过 gzip 压缩的,需要进行解压缩处理。
  if (transparentGzip
      && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
      && HttpHeaders.hasBody(networkResponse)) {
    //创建一个 GzipSource 对象,用于解压缩响应体。
    GzipSource responseBody = new GzipSource(networkResponse.body().source());
    //创建一个新的响应头对象,并移除 Content-Encoding 和 Content-Length 头,因为解压缩后的内容长度会改变,且内容编码已不再是 gzip。
    Headers strippedHeaders = networkResponse.headers().newBuilder()
        .removeAll("Content-Encoding")
        .removeAll("Content-Length")
        .build();
    //将处理过的响应头设置到响应构建器中。
    responseBuilder.headers(strippedHeaders);
    //创建一个新的 RealResponseBody 对象,用解压缩后的响应体替换原来的响应体,并设置到响应构建器中。
    responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
  }

  return responseBuilder.build();
}

上面代码总体来说干了两件事:

  1. 对原始的Request进行检查,设置 Content-Type、Content-Length、Transfer-Encoding、Host、Connection、Accept-Encoding、Cookie、User-Agent这些header

  2. 进行网络请求。若是gzip编码,则对响应进行Gzip处理;否则直接返回

在上面的过程中,用到了 CookieJar 的实例。
在请求前,会调用下面的方法读取url的Cookie:

List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
  requestBuilder.header("Cookie", cookieHeader(cookies));
}

cookieHeader 就是将cookies里面的键值对拼接成一个字符串 k1=v1; k2=v2,其实现如下:

/** Returns a 'Cookie' HTTP request header with all cookies, like {@code a=b; c=d}. */
private String cookieHeader(List<Cookie> cookies) {
  StringBuilder cookieHeader = new StringBuilder();
  for (int i = 0, size = cookies.size(); i < size; i++) {
    if (i > 0) {
      cookieHeader.append("; ");
    }
    Cookie cookie = cookies.get(i);
    cookieHeader.append(cookie.name()).append('=').append(cookie.value());
  }
  return cookieHeader.toString();
}

收到响应后,会调用下面的方法存储url的Cookie:

HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

HttpHeaders.receiveHeaders 实现如下:

// HttpHeaders.java
public static void receiveHeaders(CookieJar cookieJar, HttpUrl url, Headers headers) {
  if (cookieJar == CookieJar.NO_COOKIES) return;

  List<Cookie> cookies = Cookie.parseAll(url, headers);
  if (cookies.isEmpty()) return;

  cookieJar.saveFromResponse(url, cookies);
}

// Cookie.java
/** Returns all of the cookies from a set of HTTP response headers. */
public static List<Cookie> parseAll(HttpUrl url, Headers headers) {
  List<String> cookieStrings = headers.values("Set-Cookie");
  List<Cookie> cookies = null;

  for (int i = 0, size = cookieStrings.size(); i < size; i++) {
    Cookie cookie = Cookie.parse(url, cookieStrings.get(i));
    if (cookie == null) continue;
    if (cookies == null) cookies = new ArrayList<>();
    cookies.add(cookie);
  }

  return cookies != null
      ? Collections.unmodifiableList(cookies)
      : Collections.<Cookie>emptyList();
}

Cookie.parse 的实现比较复杂,我们只需要知道这是将每一个 Set-Cookie的值取出来,然后解析成为一个个 Cookie对象即可。

所以,使用OkHttp的Cookie功能时,自定义一个CookieJar就好了,不需要新增拦截器专门处理Cookie问题。

8. CacheInterceptor

首先需要注意的是,OkHttp中的Cache策略采用的是 DiskLruCache。key的计算为:

ByteString.encodeUtf8(url.toString()).md5().hex()

Cache的一般策略如下图:

Cache的一般策略

下面是 CacheInterceptor 的主要代码:

@Override public Response intercept(Chain chain) throws IOException {
  // 根据是否设置了缓存以及网络请求,得到一个候选缓存
  Response cacheCandidate = cache != null
      ? cache.get(chain.request())
      : null;

  long now = System.currentTimeMillis();

  // 根据当前时间、请求对象以及候选缓存,获取缓存策略
  // 在缓存策略中,有两个重要的对象
  // networkRequest: 网络请求,若为null表示不使用网络
  // cacheResponse: 响应缓存,若为null表示不使用缓存
  CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
  Request networkRequest = strategy.networkRequest;
  Response cacheResponse = strategy.cacheResponse;

  if (cache != null) {
    cache.trackResponse(strategy);
  }

  // 如果有候选缓存但是没有响应缓存,说明候选缓存不可用
  // 关闭它,以免内存泄漏
  if (cacheCandidate != null && cacheResponse == null) {
    closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
  }

  // If we're forbidden from using the network and the cache is insufficient, fail.
  // 不进行网络请求,且缓存以及过期了,返回504错误
  if (networkRequest == null && cacheResponse == null) {
    return new Response.Builder()
        .request(chain.request())
        .protocol(Protocol.HTTP_1_1)
        .code(504)
        .message("Unsatisfiable Request (only-if-cached)")
        .body(Util.EMPTY_RESPONSE)
        .sentRequestAtMillis(-1L)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
  }

  // If we don't need the network, we're done.
  // 不需要网络请求,此时缓存命中,直接返回缓存,后面的拦截器的步骤也不会经过了
  if (networkRequest == null) {
    return cacheResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .build();
  }

  // 没有缓存命中,则进行网络请求
  Response networkResponse = null;
  try {
    networkResponse = chain.proceed(networkRequest);
  } finally {
    // If we're crashing on I/O or otherwise, don't leak the cache body.
    if (networkResponse == null && cacheCandidate != null) {
      closeQuietly(cacheCandidate.body());
    }
  }

  // If we have a cache response too, then we're doing a conditional get.
  // 如果缓存策略中,网络响应和响应缓存都不为null,需要更新响应缓存
  // (比如 需要向服务器确认缓存是否可用的情况)
  if (cacheResponse != null) {
    // 304的返回是不带body的,此时必须获取cache的body
    if (networkResponse.code() == HTTP_NOT_MODIFIED) {
      Response response = cacheResponse.newBuilder()
          .headers(combine(cacheResponse.headers(), networkResponse.headers()))
          .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
          .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
          .cacheResponse(stripBody(cacheResponse))
          .networkResponse(stripBody(networkResponse))
          .build();
      networkResponse.body().close();

      // Update the cache after combining headers but before stripping the
      // Content-Encoding header (as performed by initContentStream()).
      cache.trackConditionalCacheHit();
      cache.update(cacheResponse, response);
      return response;
    } else {
      closeQuietly(cacheResponse.body());
    }
  }

  // 走到这里,已经进行过网络请求了,请求响应不为空
  // 构建响应对象,等待返回
  Response response = networkResponse.newBuilder()
      .cacheResponse(stripBody(cacheResponse))
      .networkResponse(stripBody(networkResponse))
      .build();

  if (cache != null) {
    // 将请求放到缓存中
    if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
      // Offer this request to the cache.
      CacheRequest cacheRequest = cache.put(response);
      return cacheWritingResponse(cacheRequest, response);
    }

    // 如果请求不能被缓存,则移除该请求
    if (HttpMethod.invalidatesCache(networkRequest.method())) {
      try {
        cache.remove(networkRequest);
      } catch (IOException ignored) {
        // The cache cannot be written.
      }
    }
  }

  return response;
}

接下来看看另外一个要点:CacheStrategy 是如何决定使不使用网络请求、响应缓存的。

CacheStrategy.java

/** Returns a strategy to use assuming the request can use the network. */
private CacheStrategy getCandidate() {
  // No cached response.
  // 没有缓存
  if (cacheResponse == null) {
    return new CacheStrategy(request, null);
  }

  // Drop the cached response if it's missing a required handshake.
  // https请求,但没有必要的握手信息,丢弃缓存
  if (request.isHttps() && cacheResponse.handshake() == null) {
    return new CacheStrategy(request, null);
  }

  // If this response shouldn't have been stored, it should never be used
  // as a response source. This check should be redundant as long as the
  // persistence store is well-behaved and the rules are constant.
  // 不应该被缓存,丢弃缓存
  if (!isCacheable(cacheResponse, request)) {
    return new CacheStrategy(request, null);
  }

  CacheControl requestCaching = request.cacheControl();
  ...
  // 根据缓存的缓存时间,缓存可接受最大过期时间等等HTTP协议上的规范,来判断缓存是否可用
  if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
    Response.Builder builder = cacheResponse.newBuilder();
    if (ageMillis + minFreshMillis >= freshMillis) {
      builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"");
    }
    long oneDayMillis = 24 * 60 * 60 * 1000L;
    if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
      builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"");
    }
    return new CacheStrategy(null, builder.build());
  }

  // Find a condition to add to the request. If the condition is satisfied, the response body
  // will not be transmitted.
  // 请求条件, 当etag、lastModified、servedDate这三种属性存在时,需要向服务器确认缓存的有效性
  String conditionName;
  String conditionValue;
  if (etag != null) {
    conditionName = "If-None-Match";
    conditionValue = etag;
  } else if (lastModified != null) {
    conditionName = "If-Modified-Since";
    conditionValue = lastModifiedString;
  } else if (servedDate != null) {
    conditionName = "If-Modified-Since";
    conditionValue = servedDateString;
  } else {
    return new CacheStrategy(request, null); // No condition! Make a regular request.
  }

  Headers.Builder conditionalRequestHeaders = request.headers().newBuilder();
  Internal.instance.addLenient(conditionalRequestHeaders, conditionName, conditionValue);

  // 构造一个请求询问服务器资源是否过期
  Request conditionalRequest = request.newBuilder()
      .headers(conditionalRequestHeaders.build())
      .build();
  return new CacheStrategy(conditionalRequest, cacheResponse);
}

9. ConnectInterceptor

public Response intercept(Chain chain) throws IOException {
    // 将拦截器链转换为实际的 RealInterceptorChain
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    // 从实际的拦截器链中获取请求对象
    Request request = realChain.request();
    // 获取用于分配流的 StreamAllocation 对象
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // 我们需要网络来满足这个请求,可能是为了验证一个条件 GET 请求。
    // 如果请求方法不是 GET,则需要进行全面的健康检查
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    // 创建一个新的 HttpCodec 对象,用于处理请求和响应的编码
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    // 获取与请求相关联的实际连接对象
    RealConnection connection = streamAllocation.connection();

    // 继续处理请求,并传递流分配、编码器和连接对象
    return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

上面的代码只有几行,其作用注释也说的很清楚了:与服务器建立连接,然后传递到下一个拦截器。

其中 StreamAllocation 是一个重点,它是 ConnectionsStreamsCalls三者的一个纽带,此类的实例使用一个或多个connections上的一个或多个stream来执行call。

  • Connections: 连接到远程服务器的物理的socket。connections可能建立很慢,因此必须能够取消当前已经连接上的connection。
  • Streams: 位于Connections上的逻辑上的HTTP请求、响应对。每个connection都有自己的allocation limit,这决定了connection可以承载多少可以并发的stream。HTTP/1.x connections一次可以承载1个流,HTTP/2通常可以承载多个。
  • Calls: streams的逻辑序列,通常是初始请求及其后续请求。我们希望将单个call的所有streams保持在同一个connection上,以获得更好的表现。

streamAllocation.newStream方法就是打开连接的关键,看看到底怎么操作的:

public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
  int connectTimeout = client.connectTimeoutMillis();
  int readTimeout = client.readTimeoutMillis();
  int writeTimeout = client.writeTimeoutMillis();
  boolean connectionRetryEnabled = client.retryOnConnectionFailure();

  try {
    RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
        writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
    HttpCodec resultCodec = resultConnection.newCodec(client, this);

    synchronized (connectionPool) {
      codec = resultCodec;
      return resultCodec;
    }
  } catch (IOException e) {
    throw new RouteException(e);
  }
}

该方法有两处要点,第一处就是调用 findHealthyConnection得到一个可用的connection,第二处就是调用 resultConnection.newCodec 得到一个对HTTP请求进行编码、HTTP响应进行解码的 HttpCodec。

先看跟踪 findHealthyConnection方法,该方法会调用 findConnection 得到一个connection,然后对其调用 isHealth(true) 方法进行健康诊断。如果健康,那么就可以返回该connection了;否则,从连接池中移除,并继续while循环。

/**
  * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
  * until a healthy connection is found.
  */
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
    int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
    throws IOException {
  while (true) {
    RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
        connectionRetryEnabled);

    // If this is a brand new connection, we can skip the extensive health checks.
    synchronized (connectionPool) {
      if (candidate.successCount == 0) {
        return candidate;
      }
    }

    // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
    // isn't, take it out of the pool and start again.
    if (!candidate.isHealthy(doExtensiveHealthChecks)) {
      noNewStreams();
      continue;
    }

    return candidate;
  }
}

在 findConnection 方法中,会先看已经存在的connection,然后再看连接池,最后都没有就创建新的connection。

/**
  * Returns a connection to host a new stream. This prefers the existing connection if it exists,
  * then the pool, finally building a new connection.
  */
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
    boolean connectionRetryEnabled) throws IOException {
  Route selectedRoute;
  synchronized (connectionPool) {
    if (released) throw new IllegalStateException("released");
    if (codec != null) throw new IllegalStateException("codec != null");
    if (canceled) throw new IOException("Canceled");

    // Attempt to use an already-allocated connection.
    RealConnection allocatedConnection = this.connection;
    if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
      return allocatedConnection;
    }

    // Attempt to get a connection from the pool.
    Internal.instance.get(connectionPool, address, this, null);
    if (connection != null) {
      return connection;
    }

    selectedRoute = route;
  }

  // If we need a route, make one. This is a blocking operation.
  if (selectedRoute == null) {
    selectedRoute = routeSelector.next();
  }

  RealConnection result;
  synchronized (connectionPool) {
    if (canceled) throw new IOException("Canceled");

    // Now that we have an IP address, make another attempt at getting a connection from the pool.
    // This could match due to connection coalescing.
    Internal.instance.get(connectionPool, address, this, selectedRoute);
    if (connection != null) return connection;

    // Create a connection and assign it to this allocation immediately. This makes it possible
    // for an asynchronous cancel() to interrupt the handshake we're about to do.
    route = selectedRoute;
    refusedStreamCount = 0;
    result = new RealConnection(connectionPool, selectedRoute);
    acquire(result);
  }

  // Do TCP + TLS handshakes. This is a blocking operation.
  result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
  routeDatabase().connected(result.route());

  Socket socket = null;
  synchronized (connectionPool) {
    // Pool the connection.
    Internal.instance.put(connectionPool, result);

    // If another multiplexed connection to the same address was created concurrently, then
    // release this connection and acquire that one.
    if (result.isMultiplexed()) {
      socket = Internal.instance.deduplicate(connectionPool, address, this);
      result = connection;
    }
  }
  closeQuietly(socket);

  return result;
}

在创建新connection时,会执行TCP + TLS握手,然后放入连接池中。
在调用 connect进行握手时会调用 establishProtocol方法确定协议:

private void establishProtocol(ConnectionSpecSelector connectionSpecSelector) throws IOException {
  if (route.address().sslSocketFactory() == null) {
    protocol = Protocol.HTTP_1_1;
    socket = rawSocket;
    return;
  }

  connectTls(connectionSpecSelector);

  if (protocol == Protocol.HTTP_2) {
    socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
    http2Connection = new Http2Connection.Builder(true)
        .socket(socket, route.address().url().host(), source, sink)
        .listener(this)
        .build();
    http2Connection.start();
  }
}

也就是说,在建立连接时,如果是HTTP_2,就会初始化一个 http2Connection。

我们回到 streamAllocation.newStream方法的 resultConnection.newCodec 语句中:

public HttpCodec newCodec(
    OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
  if (http2Connection != null) {
    return new Http2Codec(client, streamAllocation, http2Connection);
  } else {
    socket.setSoTimeout(client.readTimeoutMillis());
    source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
    sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
    return new Http1Codec(client, streamAllocation, source, sink);
  }
}

我们发现,这里根据 http2Connection 有没有初始化,也就是是不是HTTP_2协议来创建对应的 HttpCodec 对象。

在拦截器的最后,调用 realChain.proceed(request, streamAllocation, httpCodec, connection) 将请求传递给下一个拦截器。

10. CallServerInterceptor

public Response intercept(Chain chain) throws IOException {
  // 将拦截器链转换为实际的 RealInterceptorChain
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  // 获取 HttpCodec 对象,用于处理请求和响应的编码
  HttpCodec httpCodec = realChain.httpStream();
  // 获取用于分配流的 StreamAllocation 对象
  StreamAllocation streamAllocation = realChain.streamAllocation();
  // 获取实际连接对象
  RealConnection connection = (RealConnection) realChain.connection();
  // 获取请求对象
  Request request = realChain.request();

  // 记录请求发送的时间
  long sentRequestMillis = System.currentTimeMillis();
  // 写入请求头
  httpCodec.writeRequestHeaders(request);

  Response.Builder responseBuilder = null;
  // 如果请求方法允许请求体并且请求体不为空
  if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
    // 如果请求头中有 "Expect: 100-continue",等待 "HTTP/1.1 100 Continue" 响应
    if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
      // 刷新请求
      httpCodec.flushRequest();
      // 读取响应头
      responseBuilder = httpCodec.readResponseHeaders(true);
    }

    // 如果没有收到 "100-continue" 响应,写入请求体
    if (responseBuilder == null) {
      // 创建请求体的输出流
      Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
      BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
      // 将请求体写入输出流
      request.body().writeTo(bufferedRequestBody);
      bufferedRequestBody.close();
    } else if (!connection.isMultiplexed()) {
      // 如果没有收到 "100-continue" 响应,并且连接不是多路复用的,禁止复用 HTTP/1 连接
      streamAllocation.noNewStreams();
    }
  }

  // 完成请求
  httpCodec.finishRequest();

  // 如果没有收到 "100-continue" 响应,读取响应头
  if (responseBuilder == null) {
    responseBuilder = httpCodec.readResponseHeaders(false);
  }

  // 构建响应对象
  Response response = responseBuilder
      .request(request)
      .handshake(streamAllocation.connection().handshake()) // 获取握手信息
      .sentRequestAtMillis(sentRequestMillis) // 设置请求发送时间
      .receivedResponseAtMillis(System.currentTimeMillis()) // 设置响应接收时间
      .build();

  int code = response.code();
  // 如果是 WebSocket 并且响应码是 101,确保拦截器看到的响应体不是空的
  if (forWebSocket && code == 101) {
    response = response.newBuilder()
        .body(Util.EMPTY_RESPONSE)
        .build();
  } else {
    // 否则打开响应体
    response = response.newBuilder()
        .body(httpCodec.openResponseBody(response))
        .build();
  }

  // 如果请求头或响应头中有 "Connection: close",禁止复用连接
  if ("close".equalsIgnoreCase(response.request().header("Connection"))
      || "close".equalsIgnoreCase(response.header("Connection"))) {
    streamAllocation.noNewStreams();
  }

  // 如果响应码是 204 或 205,并且响应体长度大于 0,抛出协议异常
  if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
    throw new ProtocolException(
        "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
  }

  // 返回响应对象
  return response;
}

下面这些代码看起来也很清晰,就是利用 HttpCodec进行请求数据、响应数据的读写。其中读写的不详细描述。

11. 小结

至此,OkHttp3的源码大致就过了一遍,这里小结一下。

OkHttp 的拦截器总共分为以下几种:

在OkHttp中,RealCall是Call的实现类,负责执行网络请求。其中,异步请求由 Dispatcher进行调度,并放到线程池(一个典型的 CachedThreadPool)中执行。

执行网络请求的过程中,请求会依次经过下列拦截器组成的责任链,最后发送到服务器。

  1. interceptors()
  2. 重试、重定向拦截器 RetryAndFollowUpInterceptor
  3. 把用户请求转换为服务器请求、把服务器返响应转换为用户响应的 BridgeInterceptor
  4. 读取缓存直接返回、将响应写入到缓存中的 CacheInterceptor
  5. 与服务器建立连接的 ConnectInterceptor
  6. networkInterceptors()
  7. 真正执行网络请求的 CallServerInterceptor

而响应会从 CallServerInterceptor开始往前依次经过这些拦截器,最后客户端进行处理。

下图为OkHttp工作的大致流程:

2

评论区