Java OkHttp框架源码深入解析
作者:niuyongzhi
1.OkHttp发起网络请求
可以通过OkHttpClient发起一个网络请求
//创建一个Client,相当于打开一个浏览器 OkHttpClient okHttpClient = new OkHttpClient.Builder().build(); //创建一个请求。 Request request = new Request.Builder() .url("http://www.baidu.com") .method("GET",null) .build(); //调用Client 创建一个Call。 Call call = okHttpClient.newCall(request); //Call传入一个回调函数,并加入到请求队列。 call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { } @Override public void onResponse(Call call, Response response) throws IOException { } }); }
通过Retrofit发起一个OkHttp请求
Retrofit retrofit = new Retrofit.Builder() .baseUrl("http://www.baidu.com/") .build(); NetInterface netInterface = retrofit.create(NetInterface.class); Call<Person> call = netInterface.getPerson(); call.enqueue(new Callback<Person>() { @Override public void onResponse(Call<Person> call, Response<Person> response) { } @Override public void onFailure(Call<Person> call, Throwable t) { } });
以上两种方式都是通过call.enqueue() 把网络请求加入到请求队列的。
这个call是RealCall的一个对象。
public void enqueue(Callback responseCallback) { client.dispatcher().enqueue(new AsyncCall(responseCallback)); }
这里有两个判断条件
runningAsyncCalls.size() < maxRequests如果运行队列数量大于最大数量,
runningCallsForHost(call) < maxRequestsPerHost并且访问同一台服务器的请求数量大于最大数量,请求会放入等待队列,否则加入运行队列,直接执行。
//等待队列 private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); //运行队列 private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); //运行队列数量最大值 private int maxRequests = 64; //访问不同主机的最大数量 private int maxRequestsPerHost = 5;
dispatcher.java synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } }
接下来看这行代码executorService().execute(call);
executorService()拿到一个线程池实例,
public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService;
execute(call)执行任务,发起网络请求。
//AsyncCall.java @Override protected void execute() { try { //这个方法去请求网络,会返回Respose Response response = getResponseWithInterceptorChain(); //请求成功,回调接口 responseCallback.onResponse(RealCall.this, response); }catch(Exceptrion e){ //失败回调 responseCallback.onFailure(RealCall.this, e); }finally { //从当前运行队列中删除这个请求 client.dispatcher().finished(this); } }
getResponseWithInterceptorChain()
这行代码,使用了设计模式中的责任链模式。
//这个方法命名:通过拦截器链,获取Response Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); // 这个我们自己定义的拦截器。 interceptors.addAll(client.interceptors()); //重试和重定向拦截器 interceptors.add(retryAndFollowUpInterceptor); //请求头拦截器 interceptors.add(new BridgeInterceptor(client.cookieJar())); //缓存拦截器 interceptors.add(new CacheInterceptor(client.internalCache())); //连接拦截器 interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } //访问拦截器 interceptors.add(new CallServerInterceptor(forWebSocket)); //拦截器责任链 Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0, originalRequest, this, eventListener, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); //执行拦截器集合中的拦截器 return chain.proceed(originalRequest); }
责任链模式中,链条的上游持有下游对象的引用。这样能够保证在链条上的每一个对象,都能对其符合条件的任务进行处理。
但是在上面的拦截器构成责任链中,是把拦截器,放在了一个集合中。
第一个参数interceptors 是一个拦截器的集合。
第五个参数0是集合的index,RealInterceptorChain就是根据这个索引值+1,
对chain.proceed方法循环调用,进行集合遍历,并执行拦截器中定义的方法的。
这个责任链模式,并没有明确的指定下游对象是什么,而是通过集合index值的变化,动态的指定的。
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0......) chain.proceed(originalRequest); public Response proceed(Request request,...){ //构建一个index+1的拦截器链 RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec, connection, index + 1,....); //拿到当前的拦截器 Interceptor interceptor = interceptors.get(index); //调用拦截器intercept(next)方法, //在这个方法中继续调用realChain.proceed(),从而进行循环调用,index索引值再加1. Response response = interceptor.intercept(next); }
2.OkHttp的连接器
1)RetryAndFollowUpInterceptor:重试和重定向拦截器
public Response intercept(Chain chain){ while (true) { Response response; try { //创建StreamAllocation对象,这个对象会在连接拦截器中用到 StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()), call, eventListener, callStackTrace); this.streamAllocation = streamAllocation; 调用责任链下游拦截器 response = realChain.proceed(request, streamAllocation, null, null); } catch (RouteException e) { // The attempt to connect via a route failed. The request will not have been sent. 路由异常,请求还没发出去。 这样这个recover(),如果返回的是false,则抛出异常,不再重试 如果返回的是true,则执行下面的continue,进行下一次while循环,进行重试,重新发起网络请求。 if (!recover(e.getLastConnectException(), streamAllocation, false, request)) { throw e.getFirstConnectException(); } releaseConnection = false; continue; } catch (IOException e) { // An attempt to communicate with a server failed. The request may have been sent. 请求已经发出去了,但是和服务器连接失败了。 这个recover()返回值的处理逻辑和上面异常一样。 boolean requestSendStarted = !(e instanceof ConnectionShutdownException); if (!recover(e, streamAllocation, requestSendStarted, request)) throw e; releaseConnection = false; continue; } } finally {//finally是必定会执行到的,不管上面的catch中执行的是continue还是thow // We're throwing an unchecked exception. Release any resources. if (releaseConnection) { streamAllocation.streamFailed(null); streamAllocation.release(); } } 在这个重试拦截器中,okhttp的做法很巧妙。先是在外面有一个while循环,如果发生异常, 会在recover方法中对异常类型进行判断,如果不符合属于重试,则返回false,并thow e,结束while循环。 如果符合重试的条件,则返回true,在上面的catch代码块中执行continue方法,进入下一个while循环。 //如果请求正常,并且返回了response,则会进行重定向的逻辑判断 followUpRequest在这个方法中会根据ResponseCode,状态码进行重定向的判断, Request followUp; try { followUp = followUpRequest(response, streamAllocation.route()); } catch (IOException e) { streamAllocation.release(); throw e; } 如果flolowUp 为null,则不需要重定向,直接返回response if (followUp == null) { if (!forWebSocket) { streamAllocation.release(); } return response; } 如果flolowUp 不为null,则进行重定向了请求 如果重定向次数超过MAX_FOLLOW_UPS=20次,则抛出异常,结束while循环 if (++followUpCount > MAX_FOLLOW_UPS) { streamAllocation.release(); throw new ProtocolException("Too many follow-up requests: " + followUpCount); } if (followUp.body() instanceof UnrepeatableRequestBody) { streamAllocation.release(); throw new HttpRetryException("Cannot retry streamed HTTP body", response.code()); } if (!sameConnection(response, followUp.url())) { streamAllocation.release(); //从重定向请求中拿到url,封装一个新的streamAllocation对象, streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(followUp.url()), call, eventListener, callStackTrace); this.streamAllocation = streamAllocation; } else if (streamAllocation.codec() != null) { throw new IllegalStateException("Closing the body of " + response + " didn't close its backing stream. Bad interceptor?"); } //将重定向请求赋值给request 进入下一个重定向的请求的while循环,继续走上面的while循环代码 request = followUp; priorResponse = response; } } //只有这个方法返回值为false都不进行重试。 private boolean recover(IOException e, StreamAllocation streamAllocation, boolean requestSendStarted, Request userRequest) { streamAllocation.streamFailed(e); // The application layer has forbidden retries. 应用层禁止重试。可以通过OkHttpClient进行配置(默认是允许的) if (!client.retryOnConnectionFailure()) return false; // We can't send the request body again. if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false; // This exception is fatal. 致命的异常 判断是否属于重试的异常 if (!isRecoverable(e, requestSendStarted)) return false; // No more routes to attempt. 没有更多可以连接的路由线路 if (!streamAllocation.hasMoreRoutes()) return false; // For failure recovery, use the same route selector with a new connection. return true; } 只有这个方法返回false,都不进行重试。 private boolean isRecoverable(IOException e, boolean requestSendStarted) { // If there was a protocol problem, don't recover. 出现了协议异常,不再重试 if (e instanceof ProtocolException) { return false; } // If there was an interruption don't recover, but if there was a timeout connecting to a route // we should try the next route (if there is one). requestSendStarted为false时,并且异常类型为Scoket超时异常,将会进行下一次重试 if (e instanceof InterruptedIOException) { return e instanceof SocketTimeoutException && !requestSendStarted; } // Look for known client-side or negotiation errors that are unlikely to be fixed by trying // again with a different route. 如果是一个握手异常,并且证书出现问题,则不能重试 if (e instanceof SSLHandshakeException) { // If the problem was a CertificateException from the X509TrustManager, // do not retry. if (e.getCause() instanceof CertificateException) { return false; } }
2)BridgeInterceptor 桥拦截器:连接服务器的桥梁,主要是在请求头中设置一些参数配置
如:请求内容长度,编码,gzip压缩等。
public Response intercept(Chain chain) throws IOException { Request userRequest = chain.request(); Request.Builder requestBuilder = userRequest.newBuilder(); RequestBody body = userRequest.body(); if (body != null) { MediaType contentType = body.contentType(); if (contentType != null) { requestBuilder.header("Content-Type", contentType.toString()); } .................. } 在请求头中添加gizp,是否压缩 boolean transparentGzip = false; if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) { transparentGzip = true; requestBuilder.header("Accept-Encoding", "gzip"); } //cookies List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url()); if (!cookies.isEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)); } 调用责任链中下一个拦截器的方法,网络请求得到的数据封装到networkResponse中 Response networkResponse = chain.proceed(requestBuilder.build()); 对cookie进行处理 HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers()); 如果设置了gzip,则会对networkResponse进行解压缩。 if (transparentGzip && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding")) && HttpHeaders.hasBody(networkResponse)) { GzipSource responseBody = new GzipSource(networkResponse.body().source()); Headers strippedHeaders = networkResponse.headers().newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build(); responseBuilder.headers(strippedHeaders); String contentType = networkResponse.header("Content-Type"); responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody))); } return responseBuilder.build(); }
3)CacheInterceptor缓存拦截器
public Response intercept(Chain chain){ // this.cache = DiskLruCache.create(fileSystem, directory, 201105, 2, maxSize); 这个缓存在底层使用的是DiskLruCache //以request为key从缓存中拿到response。 Response cacheCandidate = cache != null ? cache.get(chain.request()): null; long now = System.currentTimeMillis(); //缓存策略 CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get(); Request networkRequest = strategy.networkRequest; Response cacheResponse = strategy.cacheResponse; // If we're forbidden from using the network and the cache is insufficient, fail. //如果请求和响应都为null,直接返回504 if (networkRequest == null && cacheResponse == null) { return new Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(504) .message("Unsatisfiable Request (only-if-cached)") .body(Util.EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); } // If we don't need the network, we're done. //如果请求为null,缓存不为null,则直接使用缓存。 if (networkRequest == null) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); } Response networkResponse = null; try { //调用责任链下一个拦截器 networkResponse = chain.proceed(networkRequest); } finally { } Response response = networkResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); // Offer this request to the cache. //将响应存入缓存。 CacheRequest cacheRequest = cache.put(response); }
4)ConnectInterceptor 连接拦截器。当一个请求发出,需要建立连接,然后再通过流进行读写。
public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); //在重定向拦截器中创建, StreamAllocation streamAllocation = realChain.streamAllocation(); boolean doExtensiveHealthChecks = !request.method().equals("GET"); //从连接池中,找到一个可以复用的连接, HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks); // RealConnection 中封装了一个Socket和一个Socket连接池 RealConnection connection = streamAllocation.connection(); //调用下一个拦截器 return realChain.proceed(request, streamAllocation, httpCodec, connection); } //遍历连接池 RealConnection get(Address address, StreamAllocation streamAllocation, Route route) { assert (Thread.holdsLock(this)); for (RealConnection connection : connections) { if (connection.isEligible(address, route)) { streamAllocation.acquire(connection, true); return connection; } } return null; } public boolean isEligible(Address address, @Nullable Route route) { // If this connection is not accepting new streams, we're done. if (allocations.size() >= allocationLimit || noNewStreams) return false; // If the non-host fields of the address don't overlap, we're done. if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false; // If the host exactly matches, we're done: this connection can carry the address. 从连接池中找到一个连接参数一致且并未占用的连接 if (address.url().host().equals(this.route().address().url().host())) { return true; // This connection is a perfect match. }
5)CallServerInterceptor 请求服务器拦截器
/** This is the last interceptor in the chain. It makes a network call to the server. */ 这是责任链中最后一个拦截器,这个会去请求服务器。 public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; HttpCodec httpCodec = realChain.httpStream(); StreamAllocation streamAllocation = realChain.streamAllocation(); RealConnection connection = (RealConnection) realChain.connection(); Request request = realChain.request(); //将请求头写入缓存 httpCodec.writeRequestHeaders(request); return response;
到此这篇关于Java OkHttp框架源码深入解析的文章就介绍到这了,更多相关Java OkHttp框架内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!