HttpClient连接池及重试机制解析
作者:Heiky0214
一、HttpClient
简介
HttpClient 是Apache Jakarta Common 下的子项目,可以用来提供高效的、最新的、功能丰富的支持 HTTP 协议的客户端编程工具包,基于标准的java语言。
功能介绍
- 支持HTTP和HTTPS协议
- 实现了HTTP的方法,GET,POST,PUT,DELETE等方法。
- 连接管理器支持多线程的应用。
- 可以设置连接超时
使用方法
使用HttpClient发送请求,接收响应可以分为一下几步:
- 创建HttpClient对象
- 创建请求方法的实例,并且指定URL
- 发送请求参数,GET请求和POST请求发送参数的方式有所不同
- 调用HttpClient对象的execute方法,返回HttpResponse对象
- 调用HttpResponse的getAllHeaders()、getHeaders(String name)等方法可获取服务器的响应头;调用HttpResponse的getEntity()方法可获取HttpEntity对象,该对象包装了服务器的响应内容
- 连接释放。无论成功与否,必须释放连接
二、HttpClientUtil
2.1 HttpClient版本
笔者用到的版本是4.5.5,由于是maven工程,需要在pom文件引入对应的坐标。
<dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.5</version> </dependency>
2.2 项目中用到的工具类如下
package cn.htjc.customer.util; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.client.ServiceUnavailableRetryStrategy; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.utils.URIBuilder; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.socket.PlainConnectionSocketFactory; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultHttpRequestRetryHandler; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.message.BasicNameValuePair; import org.apache.http.protocol.HttpContext; import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.util.EntityUtils; import java.io.IOException; import java.net.SocketTimeoutException; import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; @Slf4j public class HttpClientUtil { // utf-8字符编码 private static final String CHARSET_UTF_8 = "utf-8"; // HTTP内容类型。相当于form表单的形式,提交数据 private static final String CONTENT_TYPE_FORM_URL = "application/x-www-form-urlencoded"; // 连接管理器 private static PoolingHttpClientConnectionManager pool; // 请求配置 private static RequestConfig requestConfig; static { try { log.info("初始自定义HttpClient......开始"); SSLContextBuilder builder = new SSLContextBuilder(); builder.loadTrustMaterial(null, new TrustSelfSignedStrategy()); SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(builder.build()); // 配置同时支持 HTTP 和 HTPPS Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create() .register("http", PlainConnectionSocketFactory.getSocketFactory()) .register("https", sslsf).build(); // 初始化连接管理器 pool = new PoolingHttpClientConnectionManager( socketFactoryRegistry); // 设置连接池的最大连接数 pool.setMaxTotal(200); // 设置每个路由上的默认连接个数 pool.setDefaultMaxPerRoute(20); // 根据默认超时限制初始化requestConfig // 客户端从服务器读取数据的timeout int socketTimeout = 1000; // 客户端和服务器建立连接的timeout int connectTimeout = 10000; // 从连接池获取连接的timeout int connectionRequestTimeout = 10000; //设置请求超时时间 requestConfig = RequestConfig.custom().setConnectionRequestTimeout( connectionRequestTimeout).setSocketTimeout(socketTimeout).setConnectTimeout( connectTimeout).build(); log.info("初始自定义HttpClient......结束"); } catch (Exception e) { log.error("初始自定义HttpClient......失败"); } } private HttpClientUtil() { } private static CloseableHttpClient getHttpClient() { // 状态码是503的时候,该策略生效 ServiceUnavailableRetryStrategy serviceUnavailableRetryStrategy = new ServiceUnavailableRetryStrategy() { @Override public boolean retryRequest(HttpResponse httpResponse, int i, HttpContext httpContext) { if (i < 3) { log.info("ServiceUnavailableRetryStrategy========================"+i); return true; } return false; } @Override public long getRetryInterval() { return 2000L; } }; CloseableHttpClient httpClient = HttpClients.custom() // 设置连接池管理 .setConnectionManager(pool) // 设置请求配置 .setDefaultRequestConfig(requestConfig) // 设置重试次数 .setRetryHandler(new DefaultHttpRequestRetryHandler()) .setServiceUnavailableRetryStrategy(serviceUnavailableRetryStrategy) .build(); return httpClient; } public static String doGet(String url, Map<String, String> param) { // 创建Httpclient对象 CloseableHttpClient httpClient = getHttpClient(); String resultString = ""; CloseableHttpResponse response = null; try { // 创建uri URIBuilder builder = new URIBuilder(url); if (param != null) { for (String key : param.keySet()) { builder.addParameter(key, param.get(key)); } } URI uri = builder.build(); // 创建http GET请求 HttpGet httpGet = new HttpGet(uri); // 执行请求 response = httpClient.execute(httpGet); // 判断返回状态是否为200 if (response.getStatusLine().getStatusCode() == 200) { resultString = EntityUtils.toString(response.getEntity(), CHARSET_UTF_8); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (response != null) { response.close(); } } catch (IOException e) { e.printStackTrace(); } } return resultString; } public static String doGet(String url) { return doGet(url, null); } public static String doPost(String url, Map<String, String> param) { // 创建Httpclient对象 CloseableHttpClient httpClient = getHttpClient(); CloseableHttpResponse response = null; String resultString = ""; try { // 创建Http Post请求 HttpPost httpPost = new HttpPost(url); // 创建参数列表 if (param != null) { List<NameValuePair> paramList = new ArrayList<>(); for (String key : param.keySet()) { paramList.add(new BasicNameValuePair(key, param.get(key))); } // 模拟表单 UrlEncodedFormEntity entity = new UrlEncodedFormEntity(paramList, CHARSET_UTF_8); entity.setContentType(CONTENT_TYPE_FORM_URL); httpPost.setEntity(entity); } // 执行http请求main response = httpClient.execute(httpPost); resultString = EntityUtils.toString(response.getEntity(), CHARSET_UTF_8); } catch (Exception e) { e.printStackTrace(); } finally { try { if (response != null) { response.close(); } } catch (IOException e) { e.printStackTrace(); } } return resultString; } public static String doPost(String url) { return doPost(url, null); } public static String doPostJson(String url, String json) { // 创建Httpclient对象 CloseableHttpClient httpClient = getHttpClient(); CloseableHttpResponse response = null; String resultString = ""; try { // 创建Http Post请求 HttpPost httpPost = new HttpPost(url); // 创建请求内容 StringEntity entity = new StringEntity(json, ContentType.APPLICATION_JSON); httpPost.setEntity(entity); // 执行http请求 response = httpClient.execute(httpPost); resultString = EntityUtils.toString(response.getEntity(), CHARSET_UTF_8); } catch (Exception e) { e.printStackTrace(); } finally { try { if (response != null) { response.close(); } } catch (IOException e) { e.printStackTrace(); } } return resultString; } }
代码中出现了@Slf4j,作用是引入log,手动打印日志。这个注解是lombok的注解。
解释一下,什么是Route?
Route的概念可以理解为客户端机器到目标机器的一条线路,例如使用HttpClient的实现来分别请求 www.163.com 的资源和 www.sina.com 的资源就会产生两个route。缺省条件下对于每个Route,HttpClient仅维护2个连接,总数不超过20个连接。
2.3 笔者着重说一下http连接池
1 为什么要使用http连接池?
延迟降低,如果不使用连接池,每次发起的http请求都会重新建立tcp连接(三次握手),用完就会关闭连接(4次握手),采用连接池则会减少这不是分时间的消耗。连接池管理的对象都是长连接。
支持更大的并发,由于连接池只适用于请求经常访问同一主机(或同一端口的情况),连接池避免了反复建立连接,抢占端口资源的情况,如果没用连接池,可能导致连接建立不了。
2 设置超时时间
首先要明白三个概念:socketTimeout,connectTimeout,connectionRequestTimeout。
socketTimeout
:客户端和服务器读取数据的timeoutconnectTimeout
:客户端和服务器建立连接的timeoutconnectionRequestTimeout
:从连接池获取连接的timeout
3 解释:一次http请求
一次http请求,必定会有三个阶段,一:建立连接;二:数据传送;三,断开连接。
当建立连接在规定的时间内(ConnectionTimeOut )没有完成,那么此次连接就结束了。后续的SocketTimeOutException就一定不会发生。只有当连接建立起来后,
也就是没有发生ConnectionTimeOutException ,才会开始传输数据,如果数据在规定的时间内(SocketTimeOut)传输完毕,则断开连接。否则,触发SocketTimeOutException。
三、HttpClient的重试机制
上面说了这么多,就是为了引出下面的重试问题。由于项目中要访问外部接口,访问接口的时候,偶尔会出现SocketTimeOutException:Read timed out,其实就是客户端读取服务器的数据超时了。
3.1. 那么问题来了HttpClient有没有重试策略?
使用PoolingHttpClientConnectionManager得到的InternalHttpClient实例,是抽象类CloseableHttpClient的一个实现。
看一下ClientExecChain接口的实现类
简单看一下build()方法
public CloseableHttpClient build() { // 省略一些代码 // 添加MainClientExec ClientExecChain execChain = this.createMainExec(requestExecCopy, (HttpClientConnectionManager)connManagerCopy, (ConnectionReuseStrategy)reuseStrategyCopy, (ConnectionKeepAliveStrategy)keepAliveStrategyCopy, new ImmutableHttpProcessor(new HttpRequestInterceptor[]{new RequestTargetHost(), new RequestUserAgent(userAgentCopy)}), (AuthenticationStrategy)targetAuthStrategyCopy, (AuthenticationStrategy)proxyAuthStrategyCopy, (UserTokenHandler)userTokenHandlerCopy); execChain = this.decorateMainExec(execChain); // 添加ProtocolExec ClientExecChain execChain = new ProtocolExec(execChain, httpprocessorCopy); ClientExecChain execChain = this.decorateProtocolExec(execChain); // Add request retry executor, if not disabled if (!automaticRetriesDisabled) { HttpRequestRetryHandler retryHandlerCopy = this.retryHandler; if (retryHandlerCopy == null) { retryHandlerCopy = DefaultHttpRequestRetryHandler.INSTANCE; } execChain = new RetryExec(execChain, retryHandlerCopy); } // 省去部分代码 // 如果不为空,添加ServiceUnavailableRetryExec ServiceUnavailableRetryStrategy serviceUnavailStrategyCopy = this.serviceUnavailStrategy; if (serviceUnavailStrategyCopy != null) { execChain = new ServiceUnavailableRetryExec((ClientExecChain)execChain, serviceUnavailStrategyCopy); } // 添加RedirectExec if (!this.redirectHandlingDisabled) { authSchemeRegistryCopy = this.redirectStrategy; if (authSchemeRegistryCopy == null) { authSchemeRegistryCopy = DefaultRedirectStrategy.INSTANCE; } execChain = new RedirectExec((ClientExecChain)execChain, (HttpRoutePlanner)routePlannerCopy, (RedirectStrategy)authSchemeRegistryCopy); } // 省去部分代码 return new InternalHttpClient((ClientExecChain)execChain, (HttpClientConnectionManager)connManagerCopy, (HttpRoutePlanner)routePlannerCopy, cookieSpecRegistryCopy, (Lookup)authSchemeRegistryCopy, (CookieStore)defaultCookieStore, (CredentialsProvider)defaultCredentialsProvider, this.defaultRequestConfig != null ? this.defaultRequestConfig : RequestConfig.DEFAULT, closeablesCopy); }
自上而下,创建了不同的ClientExecChain实例。注意:创建对象的顺序就是执行器链的顺序
在构造CloseableHttpClient实例的时候,判断是否关闭了自动重试功能,automaticRetriesDisabled默认是false。如果没有指定执行器链,就用RetryExec。默认的重试策略是DefaultHttpRequestRetryHandler。
如果重写了ServiceUnavailableRetryStrategy接口,或者使用了DefaultServiceUnavailableRetryStrategy,ServiceUnavailableRetryExec也会加入到执行器链里。
同理,redirectHandlingDisabled默认是false,RedirectExec也会加入到执行器链,并且会最先执行。
3.2 执行流程
前面已经看到我们使用的HttiClient本质上是InternalHttpClient,这里看下他的执行发送数据的方法。
@Override protected CloseableHttpResponse doExecute( final HttpHost target, final HttpRequest request, final HttpContext context) throws IOException, ClientProtocolException { //省略一些代码 return this.execChain.execute(route, wrapper, localcontext, execAware); } }
首先经过RedirectExec,RedirectExec里面调用ServiceUnavailableRetryExec的excute(),进入ServiceUnavailableRetryExec后,调用RetryExec的excute(),进入发到RetryExec后,调用ProtocolExec的execute(),最后调用MainClientExec的excute()。
执行器链结束后,执行HttpRequestExecutor的excute(),excute()方法调用了自己的doSendRequest()。
之后一步一步的返回,遇到异常进行处理。
下面是RetryExec发送请求的部分
public CloseableHttpResponse execute(HttpRoute route, HttpRequestWrapper request, HttpClientContext context, HttpExecutionAware execAware) throws IOException, HttpException { // 参数检验 Args.notNull(route, "HTTP route"); Args.notNull(request, "HTTP request"); Args.notNull(context, "HTTP context"); // 获取请求头的全部信息 Header[] origheaders = request.getAllHeaders(); // 初始化请求次数为1 int execCount = 1; while(true) { try { // 调用基础executor执行http请求 return this.requestExecutor.execute(route, request, context, execAware); } catch (IOException var9) { // 发生IO异常的时候,判断上下文是否已经中断,如果中断则抛异常退出 if (execAware != null && execAware.isAborted()) { this.log.debug("Request has been aborted"); throw var9; } // 根据重试策略,判断当前执行状况是否要重试,如果是则进入下面逻辑 if (!this.retryHandler.retryRequest(var9, execCount, context)) { if (var9 instanceof NoHttpResponseException) { NoHttpResponseException updatedex = new NoHttpResponseException(route.getTargetHost().toHostString() + " failed to respond"); updatedex.setStackTrace(var9.getStackTrace()); throw updatedex; } throw var9; } // 日志 if (this.log.isInfoEnabled()) { this.log.info("I/O exception (" + var9.getClass().getName() + ") caught when processing request to " + route + ": " + var9.getMessage()); } // 日志 if (this.log.isDebugEnabled()) { this.log.debug(var9.getMessage(), var9); } // 判断当前请求是否可以重复发起 if (!RequestEntityProxy.isRepeatable(request)) { this.log.debug("Cannot retry non-repeatable request"); throw new NonRepeatableRequestException("Cannot retry request with a non-repeatable request entity", var9); } // 设置请求头 request.setHeaders(origheaders); // 日志 if (this.log.isInfoEnabled()) { this.log.info("Retrying request to " + route); } ++execCount; } } }
当发生IOException,判断是否要重试。如果重试则记录相应的次数,如果不重试,就抛出异常并且退出。
//单例模式 final 不可变的对象,线程安全 public static final DefaultHttpRequestRetryHandler INSTANCE = new DefaultHttpRequestRetryHandler(); //重试次数 private final int retryCount; //如果一个请求发送成功过,是否还会被再次发送 private final boolean requestSentRetryEnabled; // 不允许重试的异常类 private final Set<Class<? extends IOException>> nonRetriableClasses; // 默认重试3次,请求发送成功,不在发送 public DefaultHttpRequestRetryHandler() { this(3, false); } public DefaultHttpRequestRetryHandler(final int retryCount, final boolean requestSentRetryEnabled) { this(retryCount, requestSentRetryEnabled, Arrays.asList( InterruptedIOException.class, UnknownHostException.class, ConnectException.class, SSLException.class)); } protected DefaultHttpRequestRetryHandler( final int retryCount, final boolean requestSentRetryEnabled, final Collection<Class<? extends IOException>> clazzes) { super(); this.retryCount = retryCount; this.requestSentRetryEnabled = requestSentRetryEnabled; this.nonRetriableClasses = new HashSet<Class<? extends IOException>>(); for (final Class<? extends IOException> clazz: clazzes) { this.nonRetriableClasses.add(clazz); } }
通过构造函数,可以看出:
重试3次请求成功,就不再重试InterruptedIOException、UnknownHostException、ConnectException、SSLException,发生这4种异常不重试
- 重试3次
- 请求成功,就不再重试
- InterruptedIOException、UnknownHostException、ConnectException、SSLException,发生这4种异常不重试
关于默认的重试策略
- 如果超过三次不进行重试
- 以上4中异常及其子类不进行重试
- 同一个请求在异步任务中已经停止,不进行重试
- 幂等的方法可以进行重试,比如get,含有http body都可以认为是非幂等
- 请求没有发送成功,可以进行重试
问题来了,发送成功的请求是怎么样的?
下面的代码在HttpCoreContext里面,HttpCoreContext是HttpContext的实现类
public static final String HTTP_REQ_SENT = "http.request_sent"; public boolean isRequestSent() { final Boolean b = getAttribute(HTTP_REQ_SENT, Boolean.class); return b != null && b.booleanValue(); }
当前httpContext中的http.request_sent设置为true,则认为已经发送成功。
HttpRequestExecutor的excute(),调用了自己的doSendRequest()。
protected HttpResponse doSendRequest(HttpRequest request, HttpClientConnection conn, HttpContext context) throws IOException, HttpException { // 参数检验 Args.notNull(request, "HTTP request"); Args.notNull(conn, "Client connection"); Args.notNull(context, "HTTP context"); HttpResponse response = null; // 将连接放入上下文 context.setAttribute("http.connection", conn); // 在请求发送之前,将http.request_sent放入上下文context的属性中,值为false context.setAttribute("http.request_sent", Boolean.FALSE); // 将request的header放入连接中 conn.sendRequestHeader(request); // 如果是post/put这种有body的请求,要先进行判断 if (request instanceof HttpEntityEnclosingRequest) { boolean sendentity = true; // 获取http协议版本号 ProtocolVersion ver = request.getRequestLine().getProtocolVersion(); // 满足100-continue,并且http协议不是1.0 if (((HttpEntityEnclosingRequest)request).expectContinue() && !ver.lessEquals(HttpVersion.HTTP_1_0)) { // 刷新当前连接,发送数据 conn.flush(); // Checks if response data is available from the connection if (conn.isResponseAvailable(this.waitForContinue)) { // Receives the request line and headers of the next response available from this connection. response = conn.receiveResponseHeader(); // 判断相应是否携带实体(是否有body) if (this.canResponseHaveBody(request, response)) { // Receives the next response entity available from this connection and attaches it to an existing HttpResponse object. conn.receiveResponseEntity(response); } // 获取请求状态码 int status = response.getStatusLine().getStatusCode(); if (status < 200) { if (status != 100) { throw new ProtocolException("Unexpected response: " + response.getStatusLine()); } response = null; } else { sendentity = false; } } } if (sendentity) { // 通过连接发送请求实体 conn.sendRequestEntity((HttpEntityEnclosingRequest)request); } } // Writes out all pending buffered data over the open connection. conn.flush(); // 将http.request_sent置为true context.setAttribute("http.request_sent", Boolean.TRUE); return response; }
判断是否携带实体的方法
protected boolean canResponseHaveBody(HttpRequest request, HttpResponse response) { // 如果是head请求,返回false HEAD:只请求页面首部 if ("HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) { return false; } else { int status = response.getStatusLine().getStatusCode(); return status >= 200 && status != 204 && status != 304 && status != 205; } }
注:HttpEntityEnclosingRequest是一个接口
public interface HttpEntityEnclosingRequest extends HttpRequest { // 询问Server是否愿意接收数据 boolean expectContinue(); // 设置httpEntity void setEntity(HttpEntity entity); // 获取httpEntity HttpEntity getEntity(); }
HttpEntityEnclosingRequestBase是实现HttpEntityEnclosingRequest的抽象类
public abstract class HttpEntityEnclosingRequestBase extends HttpRequestBase implements HttpEntityEnclosingRequest { // HttpEntity其实相当于一个消息实体,内容是http传送的报文,有多个实现类,常用StringEntity private HttpEntity entity; public HttpEntityEnclosingRequestBase() { } public HttpEntity getEntity() { return this.entity; } public void setEntity(HttpEntity entity) { this.entity = entity; } // 判断此请求是否应使用expect-continue public boolean expectContinue() { // 从请求头获取Except键值对 Header expect = this.getFirstHeader("Expect"); // 如果except不为空,并且内容是 100-continue时返回true return expect != null && "100-continue".equalsIgnoreCase(expect.getValue()); } public Object clone() throws CloneNotSupportedException { HttpEntityEnclosingRequestBase clone = (HttpEntityEnclosingRequestBase)super.clone(); if (this.entity != null) { clone.entity = (HttpEntity)CloneUtils.cloneObject(this.entity); } return clone; } }
下图可以看出,HttpPost和HttpPut是HttpEntityEnclosingRequestBase的子类
简要分析一下,上述的操作过程
- 开始将http.request_sent设置为false
- 通过流flush数据到客户端
- 然后将http.request_sent设置为true
显然conn.flush()是可以发生异常的。注意:conn都是从连接池获取的。
3.3 关闭重试
默认是开启重试的,可以在创建HttpClientBuilder的时候,调用下面的方法关闭。
public final HttpClientBuilder disableAutomaticRetries() { this.automaticRetriesDisabled = true; return this; }
四、总结
4.1重试发生的条件
只有发生IOException才会发生重试
InterruptedIOException、UnknownHostException、ConnectException、SSLException,发生这4种异常不重试
get方法可以重试3次,post方法对应的socket流没有被flush成功时可以重试3次
4.2不发生重试的异常
InterruptedIOException
,线程中断异常UnknownHostException
,找不到对应hostConnectException
,找到了host但是建立连接失败。SSLException
,https认证异常
4.3 实践中遇到的异常
另外,我们还经常会提到两种超时,连接超时与读超时:
1. java.net.SocketTimeoutException: Read timed out
2. java.net.SocketTimeoutException: connect timed out
这两种超时都是SocketTimeoutException,继承自InterruptedIOException,属于线程中断异常,不会进行重试。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。