从源码角度看okhttp

举个栗子

  okhttp是square公司开源的http请求框架,支持同步请求和异步请求,一般用法如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
val client = OkHttpClient.Builder()
.connectTimeout(60, TimeUnit.SECONDS)
.readTimeout(60, TimeUnit.SECONDS)
.writeTimeout(60, TimeUnit.SECONDS)
.addInterceptor { chain ->
val oldRequest: Request = chain.request()
val newRequest: Request = oldRequest.newBuilder()
.addHeader("accessToken", "xxx")
.build()
chain.proceed(newRequest)
}
.build()
val request = Request.Builder()
.url("xxx")
.get()
.build()
val call = client.newCall(request)
//同步请求
//call.execute()

//异步请求
call.enqueue(object : Callback {
override fun onFailure(call: Call?, e: IOException?) {
//失败
}

override fun onResponse(call: Call?, response: Response?) {
//成功
}
})

  可以看到,使用okhttp的一般步骤如下:

  1. 构建OkHttpClient对象,并配置相关的参数;
  2. 构建Request请求;
  3. 构建Call对象;
  4. 执行execute()同步请求或者执行enqueue()异步请求

OkHttpClient

  典型的Builder模式,我们可以通过构建OkHttpClient.Builder来配置一些参数信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public Builder() {
dispatcher = new Dispatcher();//网络请求的分发器
protocols = DEFAULT_PROTOCOLS;//支持http1.1,http2.0
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;//失败重连
connectTimeout = 10_000;//默认连接超时10s
readTimeout = 10_000;//默认读取超时10s
writeTimeout = 10_000;//默认写入超时10s
pingInterval = 0;
}

Request

  Request表示一个请求,请求一般包括url,method,header等,我们可以看看Request的成员定义。

1
2
3
4
final HttpUrl url;//请求的url
final String method;//请求方式 get/post/put/delete 等
final Headers headers;//请求头
final @Nullable RequestBody body;//请求体

Call

它是一个接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public interface Call extends Cloneable {
Request request();

//同步请求
Response execute() throws IOException;

//异步请求
void enqueue(Callback responseCallback);

//取消请求
void cancel();

//如果Call已经调用了execute()或者enqueue(),则返回true
boolean isExecuted();

//是否已经取消了
boolean isCanceled();

//使用当前Call创建一个新的Call
Call clone();

interface Factory {
Call newCall(Request request);
}
}

  我们可以看看OkHttpClient的newCall(Request)方法。

1
2
3
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}

  可以看到这里实际上是调用了RealCall的newRealCall()

RealCall

  RealCall是Call的实现类,一般我们操作的实际上也是RealCall。这里我只粘贴比较有用的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
final class RealCall implements Call {
final OkHttpClient client;
final RetryAndFollowUpInterceptor retryAndFollowUpInterceptor;

//事件监听器
private EventListener eventListener;

//用来构建Call的Request
final Request originalRequest;

//当前Call是否已经执行,每个Call只能够被执行一次
private boolean executed;

private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
//调用构造方法
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}

//同步执行
@Override public Response execute() throws IOException {
//这里使用同步代码块保证每个Call只能够执行一次
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}

captureCallStackTrace();
eventListener.callStart(this);
try {
//还记得OkHttpClient中构造的时候的Dispatcher吗,这里实际上是把RealCall给了Dispatcher去调用它的executed()方法,当然只是将其加入到正在执行同步请求的队列中
client.dispatcher().executed(this);
//实际上进行同步请求的是这个方法
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}

//异步执行
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
//这里构造成Runnable然后移交给Dispatcher的异步请求方法
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

//这里是做同步请求的核心代码
Response getResponseWithInterceptorChain() throws IOException {
//这里是我们添加的拦截器
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
//网络请求重试拦截器
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {//是否是websocket
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));

//构造InterceptorChain
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
//调用proceed开始进行请求
return chain.proceed(originalRequest);
}
}

  可以看到不管是executed()做同步请求还是使用enqueue()做异步请求,最终都会调用到OkHttpClient的Dispatcher的executed()方法和enqueue()方法,请求的核心代码是在RealCall的getResponseWithInterceptorChain()中,对于同步请求,Dispatcher只是将RealCall添加到正在执行同步请求的队列中做一个记录。

AsyncCall

  我们可以看到RealCall里面有一个内部类AsyncCall,它是一个Runnable,我们在RealCall的enqueue()方法中有这么一行代码。

1
client.dispatcher().enqueue(new AsyncCall(responseCallback));

  可以看到在交给Dispatcher进行异步请求的时候会先构造一个AsyncCall的对象,那么AsyncCall是什么呢?实际上就是一个Runnable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;

AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}

@Override protected void execute() {
boolean signalledCallback = false;
try {
//咦咦咦,看到没有,这个方法由线程池执行Runnable后调用run()进一步调用这个方法,做请求的东西实际上和同步请求一样,还是调用了getResponseWithInterceptorChain()
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}

  我们可以看到异步请求的核心代码和同步请求的核心代码都是getResponseWithInterceptorChain();然后我们看看NamedRunnable,就只是在构造器中增加了对应的名称,然后当任务交给线程执行的时候,将线程的名称修改为指定的名称。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public abstract class NamedRunnable implements Runnable {
protected final String name;

public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}

@Override public final void run() {
//修改Thread的名称为指定名称
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
//这里执行请求
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}

protected abstract void execute();
}

Dispatcher

  这是个网络请求的分发器,我们上面有分析RealCall的源码,它里面的同步和异步请求都会分发给Dispatcher的同步和异步请求,这里同样只粘贴核心代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public final class Dispatcher {
//最大请求数
private int maxRequests = 64;
//每个host的最大请求数
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;

//执行请求的线程池
private @Nullable ExecutorService executorService;
//准备进行异步请求的Call队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//正在进行异步请求的Call队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//正在进行同步请求的Call队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

//构造线程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}

synchronized void enqueue(AsyncCall call) {
//1.正在执行异步请求的Call队列小于最大请求数量;2.call对应的host的请求数量小于每个host的最大请求数量
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//添加到正在执行异步请求的Call队列中
runningAsyncCalls.add(call);
//将Runnable交给线程池
executorService().execute(call);
} else {
//添加到等待队列
readyAsyncCalls.add(call);
}
}

synchronized void executed(RealCall call) {
//添加到正在执行的同步请求队列
runningSyncCalls.add(call);
}
}

//这个方法主要是调整用的,如果正在请求的Call队列没有到最大请求,且等待队列有Call则将等待队列中的Call移到正在请求队列,并将Runnable交给线程池处理
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();

if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}

if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}

RealInterceptorChain

  在RealCall的getResponseWithInterceptorChain()方法中,我们可以看到最终代码会调用到RealInterceptorChain.proceed()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {

calls++;

//创建下一个RealInterceptorChain
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
//在interceptor的interceptor中实际上调用了chanin的proceed(),相当于交给了下一个RealInterceptorChain
Response response = interceptor.intercept(next);

return response;
}

  可以看到这是一个链式调用,大致上如下图所示。001

  关于如何回传我们可以看看最后一个Interceptor的intercep(),也就是CallServerInterceptor的intercept()方法。

1
2
3
4
5
6
7
8
9
10
@Override public Response intercept(Chain chain) throws IOException {
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

return response;
}

  可以看到CallServerInterceptor的intercept()方法会返回Response。

本文标题:从源码角度看okhttp

文章作者:严方雄

发布时间:2018-09-12

最后更新:2018-09-13

原始链接:http://yanfangxiong.com/2018/09/12/从源码角度看okhttp/

0%