AsyncHttpClient的TimeoutTimerTask连接池异步超时
作者:codecraft
序
本文主要研究一下AsyncHttpClient的TimeoutTimerTask
TimerTask
io/netty/util/TimerTask.java
/** * A task which is executed after the delay specified with * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}. */ public interface TimerTask { /** * Executed after the delay specified with * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}. * * @param timeout a handle which is associated with this task */ void run(Timeout timeout) throws Exception; }
netty的TimerTask接口定义了run方法,其入参为Timeout
Timeout
io/netty/util/Timeout.java
public interface Timeout { /** * Returns the {@link Timer} that created this handle. */ Timer timer(); /** * Returns the {@link TimerTask} which is associated with this handle. */ TimerTask task(); /** * Returns {@code true} if and only if the {@link TimerTask} associated * with this handle has been expired. */ boolean isExpired(); /** * Returns {@code true} if and only if the {@link TimerTask} associated * with this handle has been cancelled. */ boolean isCancelled(); /** * Attempts to cancel the {@link TimerTask} associated with this handle. * If the task has been executed or cancelled already, it will return with * no side effect. * * @return True if the cancellation completed successfully, otherwise false */ boolean cancel(); }
Timeout接口定义了timer()、task()、isExpired()、isCancelled()、cancel()方法
TimeoutTimerTask
org/asynchttpclient/netty/timeout/TimeoutTimerTask.java
public abstract class TimeoutTimerTask implements TimerTask { private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutTimerTask.class); protected final AtomicBoolean done = new AtomicBoolean(); protected final NettyRequestSender requestSender; final TimeoutsHolder timeoutsHolder; volatile NettyResponseFuture<?> nettyResponseFuture; TimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, TimeoutsHolder timeoutsHolder) { this.nettyResponseFuture = nettyResponseFuture; this.requestSender = requestSender; this.timeoutsHolder = timeoutsHolder; } void expire(String message, long time) { LOGGER.debug("{} for {} after {} ms", message, nettyResponseFuture, time); requestSender.abort(nettyResponseFuture.channel(), nettyResponseFuture, new TimeoutException(message)); } /** * When the timeout is cancelled, it could still be referenced for quite some time in the Timer. Holding a reference to the future might mean holding a reference to the * channel, and heavy objects such as SslEngines */ public void clean() { if (done.compareAndSet(false, true)) { nettyResponseFuture = null; } } void appendRemoteAddress(StringBuilder sb) { InetSocketAddress remoteAddress = timeoutsHolder.remoteAddress(); sb.append(remoteAddress.getHostName()); if (!remoteAddress.isUnresolved()) { sb.append('/').append(remoteAddress.getAddress().getHostAddress()); } sb.append(':').append(remoteAddress.getPort()); } }
TimeoutTimerTask声明实现TimerTask接口,它定义了expire方法,执行requestSender.abort;clean方法来重置done及nettyResponseFuture
ReadTimeoutTimerTask
org/asynchttpclient/netty/timeout/ReadTimeoutTimerTask.java
public class ReadTimeoutTimerTask extends TimeoutTimerTask { private final long readTimeout; ReadTimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, TimeoutsHolder timeoutsHolder, int readTimeout) { super(nettyResponseFuture, requestSender, timeoutsHolder); this.readTimeout = readTimeout; } public void run(Timeout timeout) { if (done.getAndSet(true) || requestSender.isClosed()) return; if (nettyResponseFuture.isDone()) { timeoutsHolder.cancel(); return; } long now = unpreciseMillisTime(); long currentReadTimeoutInstant = readTimeout + nettyResponseFuture.getLastTouch(); long durationBeforeCurrentReadTimeout = currentReadTimeoutInstant - now; if (durationBeforeCurrentReadTimeout <= 0L) { // idleConnectTimeout reached StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Read timeout to "); appendRemoteAddress(sb); String message = sb.append(" after ").append(readTimeout).append(" ms").toString(); long durationSinceLastTouch = now - nettyResponseFuture.getLastTouch(); expire(message, durationSinceLastTouch); // cancel request timeout sibling timeoutsHolder.cancel(); } else { done.set(false); timeoutsHolder.startReadTimeout(this); } } }
ReadTimeoutTimerTask继承了TimeoutTimerTask,其run方法会根据readTimeout及nettyResponseFuture.getLastTouch()计算currentReadTimeoutInstant,然后判断是否已经超时,是则执行expire及timeoutsHolder.cancel(),否则执行timeoutsHolder.startReadTimeout(this)
startReadTimeout
org/asynchttpclient/netty/timeout/TimeoutsHolder.java
void startReadTimeout(ReadTimeoutTimerTask task) { if (requestTimeout == null || (!requestTimeout.isExpired() && readTimeoutValue < (requestTimeoutMillisTime - unpreciseMillisTime()))) { // only schedule a new readTimeout if the requestTimeout doesn't happen first if (task == null) { // first call triggered from outside (else is read timeout is re-scheduling itself) task = new ReadTimeoutTimerTask(nettyResponseFuture, requestSender, this, readTimeoutValue); } this.readTimeout = newTimeout(task, readTimeoutValue); } else if (task != null) { // read timeout couldn't re-scheduling itself, clean up task.clean(); } }
startReadTimeout会判断readTimeoutValue+当前时间是否小于requestTimeoutMillisTime,是则通过newTimeout调度,否则执行task.clean()
RequestTimeoutTimerTask
org/asynchttpclient/netty/timeout/RequestTimeoutTimerTask.java
public class RequestTimeoutTimerTask extends TimeoutTimerTask { private final long requestTimeout; RequestTimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, TimeoutsHolder timeoutsHolder, int requestTimeout) { super(nettyResponseFuture, requestSender, timeoutsHolder); this.requestTimeout = requestTimeout; } public void run(Timeout timeout) { if (done.getAndSet(true) || requestSender.isClosed()) return; // in any case, cancel possible readTimeout sibling timeoutsHolder.cancel(); if (nettyResponseFuture.isDone()) return; StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Request timeout to "); appendRemoteAddress(sb); String message = sb.append(" after ").append(requestTimeout).append(" ms").toString(); long age = unpreciseMillisTime() - nettyResponseFuture.getStart(); expire(message, age); } }
RequestTimeoutTimerTask继承了TimeoutTimerTask,其run方法在done为true或者requestSender为closed则直接返回,对于nettyResponseFuture.isDone()也直接返回,其余的执行expire方法
TimeoutsHolder
org/asynchttpclient/netty/timeout/TimeoutsHolder.java
public class TimeoutsHolder { private final Timeout requestTimeout; private final AtomicBoolean cancelled = new AtomicBoolean(); private final Timer nettyTimer; private final NettyRequestSender requestSender; private final long requestTimeoutMillisTime; private final int readTimeoutValue; private volatile Timeout readTimeout; private volatile NettyResponseFuture<?> nettyResponseFuture; private volatile InetSocketAddress remoteAddress; public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, AsyncHttpClientConfig config, InetSocketAddress originalRemoteAddress) { this.nettyTimer = nettyTimer; this.nettyResponseFuture = nettyResponseFuture; this.requestSender = requestSender; this.remoteAddress = originalRemoteAddress; final Request targetRequest = nettyResponseFuture.getTargetRequest(); final int readTimeoutInMs = targetRequest.getReadTimeout(); this.readTimeoutValue = readTimeoutInMs == 0 ? config.getReadTimeout() : readTimeoutInMs; int requestTimeoutInMs = targetRequest.getRequestTimeout(); if (requestTimeoutInMs == 0) { requestTimeoutInMs = config.getRequestTimeout(); } if (requestTimeoutInMs != -1) { requestTimeoutMillisTime = unpreciseMillisTime() + requestTimeoutInMs; requestTimeout = newTimeout(new RequestTimeoutTimerTask(nettyResponseFuture, requestSender, this, requestTimeoutInMs), requestTimeoutInMs); } else { requestTimeoutMillisTime = -1L; requestTimeout = null; } } //...... }
TimeoutsHolder的构造器对于requestTimeoutInMs不为-1的会创建RequestTimeoutTimerTask,然后通过newTimeout进行调度
scheduleRequestTimeout
org/asynchttpclient/netty/request/NettyRequestSender.java
private <T> ListenableFuture<T> sendRequestWithOpenChannel(NettyResponseFuture<T> future, AsyncHandler<T> asyncHandler, Channel channel) { try { asyncHandler.onConnectionPooled(channel); } catch (Exception e) { LOGGER.error("onConnectionPooled crashed", e); abort(channel, future, e); return future; } SocketAddress channelRemoteAddress = channel.remoteAddress(); if (channelRemoteAddress != null) { // otherwise, bad luck, the channel was closed, see bellow scheduleRequestTimeout(future, (InetSocketAddress) channelRemoteAddress); } future.setChannelState(ChannelState.POOLED); future.attachChannel(channel, false); if (LOGGER.isDebugEnabled()) { HttpRequest httpRequest = future.getNettyRequest().getHttpRequest(); LOGGER.debug("Using open Channel {} for {} '{}'", channel, httpRequest.method(), httpRequest.uri()); } // channelInactive might be called between isChannelValid and writeRequest // so if we don't store the Future now, channelInactive won't perform // handleUnexpectedClosedChannel Channels.setAttribute(channel, future); if (Channels.isChannelActive(channel)) { writeRequest(future, channel); } else { // bad luck, the channel was closed in-between // there's a very good chance onClose was already notified but the // future wasn't already registered handleUnexpectedClosedChannel(channel, future); } return future; } private void scheduleRequestTimeout(NettyResponseFuture<?> nettyResponseFuture, InetSocketAddress originalRemoteAddress) { nettyResponseFuture.touch(); TimeoutsHolder timeoutsHolder = new TimeoutsHolder(nettyTimer, nettyResponseFuture, this, config, originalRemoteAddress); nettyResponseFuture.setTimeoutsHolder(timeoutsHolder); } public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) { NettyRequest nettyRequest = future.getNettyRequest(); HttpRequest httpRequest = nettyRequest.getHttpRequest(); AsyncHandler<T> asyncHandler = future.getAsyncHandler(); // if the channel is dead because it was pooled and the remote server decided to // close it, // we just let it go and the channelInactive do its work if (!Channels.isChannelActive(channel)) return; try { if (asyncHandler instanceof TransferCompletionHandler) { configureTransferAdapter(asyncHandler, httpRequest); } boolean writeBody = !future.isDontWriteBodyBecauseExpectContinue() && httpRequest.method() != HttpMethod.CONNECT && nettyRequest.getBody() != null; if (!future.isHeadersAlreadyWrittenOnContinue()) { try { asyncHandler.onRequestSend(nettyRequest); } catch (Exception e) { LOGGER.error("onRequestSend crashed", e); abort(channel, future, e); return; } // if the request has a body, we want to track progress if (writeBody) { // FIXME does this really work??? the promise is for the request without body!!! ChannelProgressivePromise promise = channel.newProgressivePromise(); ChannelFuture f = channel.write(httpRequest, promise); f.addListener(new WriteProgressListener(future, true, 0L)); } else { // we can just track write completion ChannelPromise promise = channel.newPromise(); ChannelFuture f = channel.writeAndFlush(httpRequest, promise); f.addListener(new WriteCompleteListener(future)); } } if (writeBody) nettyRequest.getBody().write(channel, future); // don't bother scheduling read timeout if channel became invalid if (Channels.isChannelActive(channel)) { scheduleReadTimeout(future); } } catch (Exception e) { LOGGER.error("Can't write request", e); abort(channel, future, e); } }
NettyRequestSender的sendRequestWithOpenChannel方法在channelRemoteAddress不为null时会执行scheduleRequestTimeout,创建TimeoutsHolder调度RequestTimeoutTimerTask;其writeRequest方法在nettyRequest.getBody().write(channel, future)之后,若channel还是active的则通过scheduleReadTimeout(future)调度ReadTimeoutTimerTask
小结
AsyncHttpClient的TimeoutTimerTask声明实现了netty的TimerTask接口,它定义了expire方法,执行requestSender.abort;clean方法来重置done及nettyResponseFuture;它有一个抽象子类为TimeoutTimerTask,RequestTimeoutTimerTask及ReadTimeoutTimerTask继承了TimeoutTimerTask;AsyncHttpClient用TimeoutsHolder来封装了这些timeout timer,NettyRequestSender的sendRequestWithOpenChannel方法会触发调度RequestTimeoutTimerTask,而其writeRequest方法在nettyRequest.getBody().write(channel, future)之后,通过scheduleReadTimeout(future)调度ReadTimeoutTimerTask。
可以看到requestTimeoutMillisTime是总的请求时间,它包含了写入数据之后的readTimeoutValue
以上就是AsyncHttpClient的TimeoutTimerTask连接池异步超时的详细内容,更多关于AsyncHttpClient TimeoutTimerTask的资料请关注脚本之家其它相关文章!