CountDownLatch基于AQS阻塞工具用法详解
作者:程序员札记
这篇文章主要为大家介绍了CountDownLatch基于AQS阻塞工具用法详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
CountDownLatch解决了什么问题
CountDownLatch是基于AQS的阻塞工具,阻塞一个或者多个线程,直到所有的线程都执行完成。
当一个任务运算量比较大的时候,需要拆分为各种子任务,必须要所有子任务完成后才能汇总为总任务。
使用并发模拟的时候可以使用CountDownLatch.也可以设置超时等待时间,
CountDownLatch 用法
package com.conrrentcy.juc; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CountDownLatchExample { private static final Logger log = LoggerFactory.getLogger(CountDownLatchExample.class); //线程数量 private static final int THREAD_NUM = 10; // CountdownLatch阻塞模拟 public static void main(String[] args) throws InterruptedException { // 创建线程池 用于执行线程 ExecutorService executorService = Executors.newCachedThreadPool(); //创建countDownLatch final CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM); long startTime = System.currentTimeMillis(); //循环创建线程 for (int i = 0; i < THREAD_NUM; i++) { final int a = i; executorService.execute(() -> { try { test(a); } catch (Exception e) { log.error("Exception", e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(); long endTime = System.currentTimeMillis(); log.info("执行完毕,{}-{}",startTime,endTime); executorService.shutdown(); } private static void test(int num) throws InterruptedException { Thread.sleep(100); log.info("{}-{}", num,System.currentTimeMillis()); Thread.sleep(100); } }
阻塞所有线程执行完成后再执行
CountDownLatch源码解析
CountDownLatch源码中的方法和属性并不多,下面我们来一一解析。
1.AQS框架以及构造方法
//当前对象中私有阻塞工具 private final Sync sync; // 模板方法模式重写AQS工具 private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; // 共享阻塞AQS Sync(int count) { setState(count); } // 获取当前还剩多少资源可以使用 int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } //构造方法创建一个锁对象 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
2. countDown()方法解析
该方法用于线程执行完毕后减计统计数量,
// 该方法时释放一个共享锁。当所有锁都被释放完成后主线程就能继续执行了。 public void countDown() { sync.releaseShared(1); }
3.await()方法解析
//拦截主线程的方法。主线程在这里等待条件达成后继续执行。 public void await() throws InterruptedException { //在这里阻塞线程的执行 sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //这里判断是否还有可以共享的资源 // 如果有则返回-1 否则返回 1,重写AQS的方法参见(1.AQS框架以及构造方法) if (tryAcquireShared(arg) < 0) // 有资源则运行阻塞自旋等待所有线程执行完毕 doAcquireSharedInterruptibly(arg); // 无资源可用就让线程继续执行 } // 带延迟的减少数据拦截方法 // 返回的结果是没有跑完全部线程就继续执行下一步了。 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { //线程如果被中断则抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 表示如果线程被执行完了直接返回成功,如果没有执行完则看等待时间来决定是否要继续执行。 return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
再看doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
下面是具体的流程
CountDownLatch 总结
CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。 在分散计算统一合成结果,按某个流程加载资源的方面有着非诚好用的效果。CountDownLatch是不能够重用的,如果需要重新计数,可以考虑使用CyclicBarrier或者创建新的CountDownLatch实例
下一篇我们讲解像蓄水池一样功能的Semphore,更多关于CountDownLatch AQS用法的资料请关注脚本之家其它相关文章!