Java AQS中闭锁CountDownLatch的使用
作者:飞奔的小付
一. 简介
CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。
CountDownLatch使用给定的计数值(count)初始化。await方法会阻塞直到当前的计数值(count)由于countDown方法的调用达到0,count为0之后所有等待的线程都会被释放,并且随后对await方法的调用都会立即返回。这是一个一次性现象 —— count不会被重置。如果你需要一个重置count的版本,那么请考虑使用CyclicBarrier。
二. 使用
构造器
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
常用方法
// 调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行 public void await() throws InterruptedException { }; // 和 await() 类似,若等待 timeout 时长后,count 值还是没有变为 0,不再等待,继续执行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; // 会将 count 减 1,直至为 0 public void countDown() { sync.releaseShared(1); }
三. 应用场景
CountDownLatch一般用作多线程倒计时计数器,强制它们等待其他一组(CountDownLatch的初始化决定)任务执行完成。
CountDownLatch的两种使用场景:
- 让多个线程等待
- 让单个线程等待
场景1 让多个线程等待:模拟并发,让并发线程一起执行
public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); for (int i = 0; i < 5; i++) { new Thread(() -> { try { //等待 countDownLatch.await(); String parter = "【" + Thread.currentThread().getName() + "】"; System.out.println(parter + "开始执行……"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } Thread.sleep(2000); countDownLatch.countDown(); }
for循环中等待阻塞,直到执行countdown方法。
场景2 让单个线程等待:多个线程(任务)完成后,进行汇总合并。
很多时候,我们的并发任务,存在前后依赖关系;比如数据详情页需要同时调用多个接口获取数据,并发请求获取到数据后、需要进行结果合并;或者多个数据操作完成后,需要数据check;这其实都是:在多个线程(任务)完成后,进行汇总合并的场景。
public static void main(String[] args) throws Exception { CountDownLatch countDownLatch = new CountDownLatch(5); for (int i = 0; i < 5; i++) { final int index = i; new Thread(() -> { try { Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(1000)); System.out.println(Thread.currentThread().getName() + " finish task" + index); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } // 主线程在阻塞,当计数器==0,就唤醒主线程往下执行。 countDownLatch.await(); System.out.println("主线程:在所有任务运行完成后,进行结果汇总"); }
四. 底层原理
底层基于 AbstractQueuedSynchronizer 实现,CountDownLatch 构造函数中指定的count直接赋给AQS的state;每次countDown()则都是release(1)减1,最后减到0时unpark线程;这一步是由最后一个执行countdown方法的线程执行的。
而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程。
构造方法
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); } protected final void setState(int newState) { state = newState; }
阻塞
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //arg为1,不为0 ,返回-1,这里小于0 if (tryAcquireShared(arg) < 0) //入队阻塞 doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
入队阻塞
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //入队,创建节点 使用共享模式 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //获取当前节点的前躯节点 final Node p = node.predecessor(); //如果节点为head节点 if (p == head) { //阻塞动作比较重,通常会再尝试获取资源,没有获取到返回负数 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //判断是否可以阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
countDown方法减一
public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } //尝试释放共享锁 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) //减到0的时候返回true,进行唤醒 return nextc == 0; } }
唤醒逻辑
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //wa为-1时,将其状态设置为0,并且唤醒 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) //ws状态小于0就将其设置为0 compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //s不为空就调用unpark LockSupport.unpark(s.thread); }
五. CountDownLatch与Thread.join的区别
- CountDownLatch的作用就是允许一个或多个线程等待其他线程完成操作,看起来有点类似join() 方法,但其提供了比 join() 更加灵活的API。
- CountDownLatch可以手动控制在n个线程里调用n次countDown()方法使计数器进行减一操作,也可以在一个线程里调用n次执行减一操作。
- 而 join() 的实现原理是不停检查join线程是否存活,如果 join 线程存活则让当前线程永远等待。所以两者之间相对来说还是CountDownLatch使用起来较为灵活。
到此这篇关于Java AQS中闭锁CountDownLatch的使用的文章就介绍到这了,更多相关Java CountDownLatch内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!