AsyncHttpClient ChannelPool线程池频道池源码流程解析
这篇文章主要为大家介绍了AsyncHttpClient ChannelPool线程池频道池源码流程解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
public interface ChannelPool { /** * Add a channel to the pool * * @param channel an I/O channel * @param partitionKey a key used to retrieve the cached channel * @return true if added. */ boolean offer(Channel channel, Object partitionKey); /** * Remove the channel associated with the uri. * * @param partitionKey the partition used when invoking offer * @return the channel associated with the uri */ Channel poll(Object partitionKey); /** * Remove all channels from the cache. A channel might have been associated * with several uri. * * @param channel a channel * @return the true if the channel has been removed */ boolean removeAll(Channel channel); /** * Return true if a channel can be cached. A implementation can decide based * on some rules to allow caching Calling this method is equivalent of * checking the returned value of {@link ChannelPool#offer(Channel, Object)} * * @return true if a channel can be cached. */ boolean isOpen(); /** * Destroy all channels that has been cached by this instance. */ void destroy(); /** * Flush partitions based on a predicate * * @param predicate the predicate */ void flushPartitions(Predicate<Object> predicate); /** * @return The number of idle channels per host. */ Map<String, Long> getIdleChannelCountPerHost(); }
public enum NoopChannelPool implements ChannelPool { INSTANCE; @Override public boolean offer(Channel channel, Object partitionKey) { return false; } @Override public Channel poll(Object partitionKey) { return null; } @Override public boolean removeAll(Channel channel) { return false; } @Override public boolean isOpen() { return true; } @Override public void destroy() { } @Override public void flushPartitions(Predicate<Object> predicate) { } @Override public Map<String, Long> getIdleChannelCountPerHost() { return Collections.emptyMap(); } }
/** * A simple implementation of {@link ChannelPool} based on a {@link java.util.concurrent.ConcurrentHashMap} */ public final class DefaultChannelPool implements ChannelPool { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultChannelPool.class); private final ConcurrentHashMap<Object, ConcurrentLinkedDeque<IdleChannel>> partitions = new ConcurrentHashMap<>(); private final ConcurrentHashMap<ChannelId, ChannelCreation> channelId2Creation; private final AtomicBoolean isClosed = new AtomicBoolean(false); private final Timer nettyTimer; private final int connectionTtl; private final boolean connectionTtlEnabled; private final int maxIdleTime; private final boolean maxIdleTimeEnabled; private final long cleanerPeriod; private final PoolLeaseStrategy poolLeaseStrategy; public DefaultChannelPool(AsyncHttpClientConfig config, Timer hashedWheelTimer) { this(config.getPooledConnectionIdleTimeout(), config.getConnectionTtl(), hashedWheelTimer, config.getConnectionPoolCleanerPeriod()); } public DefaultChannelPool(int maxIdleTime, int connectionTtl, Timer nettyTimer, int cleanerPeriod) { this(maxIdleTime, connectionTtl, PoolLeaseStrategy.LIFO, nettyTimer, cleanerPeriod); } public DefaultChannelPool(int maxIdleTime, int connectionTtl, PoolLeaseStrategy poolLeaseStrategy, Timer nettyTimer, int cleanerPeriod) { this.maxIdleTime = maxIdleTime; this.connectionTtl = connectionTtl; connectionTtlEnabled = connectionTtl > 0; channelId2Creation = connectionTtlEnabled ? new ConcurrentHashMap<>() : null; this.nettyTimer = nettyTimer; maxIdleTimeEnabled = maxIdleTime > 0; this.poolLeaseStrategy = poolLeaseStrategy; this.cleanerPeriod = Math.min(cleanerPeriod, Math.min(connectionTtlEnabled ? connectionTtl : Integer.MAX_VALUE, maxIdleTimeEnabled ? maxIdleTime : Integer.MAX_VALUE)); if (connectionTtlEnabled || maxIdleTimeEnabled) scheduleNewIdleChannelDetector(new IdleChannelDetector()); } //...... }
public boolean offer(Channel channel, Object partitionKey) { if (isClosed.get()) return false; long now = unpreciseMillisTime(); if (isTtlExpired(channel, now)) return false; boolean offered = offer0(channel, partitionKey, now); if (connectionTtlEnabled && offered) { registerChannelCreation(channel, partitionKey, now); } return offered; } private boolean isTtlExpired(Channel channel, long now) { if (!connectionTtlEnabled) return false; ChannelCreation creation = channelId2Creation.get(; return creation != null && now - creation.creationTime >= connectionTtl; } private boolean offer0(Channel channel, Object partitionKey, long now) { ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey); if (partition == null) { partition = partitions.computeIfAbsent(partitionKey, pk -> new ConcurrentLinkedDeque<>()); } return partition.offerFirst(new IdleChannel(channel, now)); } private void registerChannelCreation(Channel channel, Object partitionKey, long now) { ChannelId id =; if (!channelId2Creation.containsKey(id)) { channelId2Creation.putIfAbsent(id, new ChannelCreation(now, partitionKey)); } }
/** * {@inheritDoc} */ public Channel poll(Object partitionKey) { IdleChannel idleChannel = null; ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey); if (partition != null) { while (idleChannel == null) { idleChannel =; if (idleChannel == null) // pool is empty break; else if (!Channels.isChannelActive( { idleChannel = null; LOGGER.trace("Channel is inactive, probably remotely closed!"); } else if (!idleChannel.takeOwnership()) { idleChannel = null; LOGGER.trace("Couldn't take ownership of channel, probably in the process of being expired!"); } } } return idleChannel != null ? : null; }
/** * {@inheritDoc} */ public boolean removeAll(Channel channel) { ChannelCreation creation = connectionTtlEnabled ? channelId2Creation.remove( : null; return !isClosed.get() && creation != null && partitions.get(creation.partitionKey).remove(new IdleChannel(channel, Long.MIN_VALUE)); }
/** * {@inheritDoc} */ public boolean isOpen() { return !isClosed.get(); }
/** * {@inheritDoc} */ public void destroy() { if (isClosed.getAndSet(true)) return; partitions.clear(); if (connectionTtlEnabled) { channelId2Creation.clear(); } }
public void flushPartitions(Predicate<Object> predicate) { for (Map.Entry<Object, ConcurrentLinkedDeque<IdleChannel>> partitionsEntry : partitions.entrySet()) { Object partitionKey = partitionsEntry.getKey(); if (predicate.test(partitionKey)) flushPartition(partitionKey, partitionsEntry.getValue()); } } private void flushPartition(Object partitionKey, ConcurrentLinkedDeque<IdleChannel> partition) { if (partition != null) { partitions.remove(partitionKey); for (IdleChannel idleChannel : partition) close(; } } private void close(Channel channel) { // FIXME pity to have to do this here Channels.setDiscard(channel); if (connectionTtlEnabled) { channelId2Creation.remove(; } Channels.silentlyCloseChannel(channel); }
public Map<String, Long> getIdleChannelCountPerHost() { return partitions .values() .stream() .flatMap(ConcurrentLinkedDeque::stream) .map(idle -> idle.getChannel().remoteAddress()) .filter(a -> a.getClass() == InetSocketAddress.class) .map(a -> (InetSocketAddress) a) .map(InetSocketAddress::getHostName) .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); }
public enum PoolLeaseStrategy { LIFO { public <E> E lease(Deque<E> d) { return d.pollFirst(); } }, FIFO { public <E> E lease(Deque<E> d) { return d.pollLast(); } }; abstract <E> E lease(Deque<E> d); }
private final class IdleChannelDetector implements TimerTask { private boolean isIdleTimeoutExpired(IdleChannel idleChannel, long now) { return maxIdleTimeEnabled && now - idleChannel.start >= maxIdleTime; } private List<IdleChannel> expiredChannels(ConcurrentLinkedDeque<IdleChannel> partition, long now) { // lazy create List<IdleChannel> idleTimeoutChannels = null; for (IdleChannel idleChannel : partition) { boolean isIdleTimeoutExpired = isIdleTimeoutExpired(idleChannel, now); boolean isRemotelyClosed = !Channels.isChannelActive(; boolean isTtlExpired = isTtlExpired(, now); if (isIdleTimeoutExpired || isRemotelyClosed || isTtlExpired) { LOGGER.debug("Adding Candidate expired Channel {} isIdleTimeoutExpired={} isRemotelyClosed={} isTtlExpired={}",, isIdleTimeoutExpired, isRemotelyClosed, isTtlExpired); if (idleTimeoutChannels == null) idleTimeoutChannels = new ArrayList<>(1); idleTimeoutChannels.add(idleChannel); } } return idleTimeoutChannels != null ? idleTimeoutChannels : Collections.emptyList(); } private List<IdleChannel> closeChannels(List<IdleChannel> candidates) { // lazy create, only if we hit a non-closeable channel List<IdleChannel> closedChannels = null; for (int i = 0; i < candidates.size(); i++) { // We call takeOwnership here to avoid closing a channel that has just been taken out // of the pool, otherwise we risk closing an active connection. IdleChannel idleChannel = candidates.get(i); if (idleChannel.takeOwnership()) { LOGGER.debug("Closing Idle Channel {}",; close(; if (closedChannels != null) { closedChannels.add(idleChannel); } } else if (closedChannels == null) { // first non closeable to be skipped, copy all // previously skipped closeable channels closedChannels = new ArrayList<>(candidates.size()); for (int j = 0; j < i; j++) closedChannels.add(candidates.get(j)); } } return closedChannels != null ? closedChannels : candidates; } public void run(Timeout timeout) { if (isClosed.get()) return; if (LOGGER.isDebugEnabled()) for (Object key : partitions.keySet()) { int size = partitions.get(key).size(); if (size > 0) { LOGGER.debug("Entry count for : {} : {}", key, size); } } long start = unpreciseMillisTime(); int closedCount = 0; int totalCount = 0; for (ConcurrentLinkedDeque<IdleChannel> partition : partitions.values()) { // store in intermediate unsynchronized lists to minimize // the impact on the ConcurrentLinkedDeque if (LOGGER.isDebugEnabled()) totalCount += partition.size(); List<IdleChannel> closedChannels = closeChannels(expiredChannels(partition, start)); if (!closedChannels.isEmpty()) { if (connectionTtlEnabled) { for (IdleChannel closedChannel : closedChannels) channelId2Creation.remove(; } partition.removeAll(closedChannels); closedCount += closedChannels.size(); } } if (LOGGER.isDebugEnabled()) { long duration = unpreciseMillisTime() - start; if (closedCount > 0) { LOGGER.debug("Closed {} connections out of {} in {} ms", closedCount, totalCount, duration); } } scheduleNewIdleChannelDetector(timeout.task()); } }
以上就是AsyncHttpClient ChannelPool的详细内容,更多关于AsyncHttpClient ChannelPool的资料请关注脚本之家其它相关文章!