Java中的CountDownLatch原理深入解析
作者:我不是欧拉_
1. CountDownLatch是什么?
CountDownLatch是多线程控制的一种同步工具类,它被称为门阀、 计数器或者闭锁。这个工具经常用来用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。
它允许一个或多个线程一直等待,直到其他线程执行完后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。
当然利用ReentrantLock + Condition也可以实现线程之间通信,达到同样的效果
2. 类图
可以看出CountDownLatch只有一个内部类Sync,Sync继承AbstractQueuedSynchronizer
3. 实现原理
3.1 示例用法
// N个线程等待主线程 class Driver { // ... void main() throws InterruptedException { // 开始信号 CountDownLatch startSignal = new CountDownLatch(1); // 完成信号 CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) // create and start threads // 创建N个工作线程并开始运行 new Thread(new Worker(startSignal, doneSignal)).start(); // 做准备工作 doSomethingElse(); // don't let run yet // 准备完毕,唤醒工作线程 startSignal.countDown(); // let all threads proceed doSomethingElse(); // 等待工作线程结束 doneSignal.await(); // wait for all to finish } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; // 构造方法创建工作线程 Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { // 工作线程进入等待状态 startSignal.await(); // 工作线程工作 doWork(); // 完成工作后,countDown doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { ... } }
// 主线程等到N个线程 class Driver2 { // ... void main() throws InterruptedException { // 完成信号 CountDownLatch doneSignal = new CountDownLatch(N); // 创建线程执行器 Executor e = ... for (int i = 0; i < N; ++i) // create and start threads // 创建并执行N个准备工作线程 e.execute(new WorkerRunnable(doneSignal, i)); // 主线程等到准备工作线程执行完毕 doneSignal.await(); // wait for all to finish } } class WorkerRunnable implements Runnable { private final CountDownLatch doneSignal; private final int i; // 构造方法 WorkerRunnable(CountDownLatch doneSignal, int i) { this.doneSignal = doneSignal; this.i = i; } // run public void run() { try { // 完成准备工作 doWork(i); // countDown doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { ... } }
3.2 Sync
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; // State即同步状态,在不同的实现中叫法不一样,只是为了方便理解 // 构造方法初始化计数器计数值(即同步状态值) Sync(int count) { setState(count); } // 获取计数值 int getCount() { return getState(); } // 共享模式获取 protected int tryAcquireShared(int acquires) { // 体现出只有计数值为0时,才能算获取成功 return (getState() == 0) ? 1 : -1; } // 共享模式释放 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); // 如果计数值已经为0,直接返回false,结束自旋 if (c == 0) return false; // 否则计数 - 1 int nextc = c-1; // 通过自旋 + CAS方式改变剩余计数 if (compareAndSetState(c, nextc)) // 如果计数为0返回true,否则返回false,结束自旋 // 返回true表示可以唤醒等待的线程 return nextc == 0; } } }
通过上面代码解析可知, CountDownLatch的实现方法都是在内部类Sync里面。
3.3 CountDownLatch
public class CountDownLatch { // 同步队列 private final Sync sync; // 构造方法初始化计数值 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } // 线程等待 public void await() throws InterruptedException { // 调用AQS的acquireSharedInterruptibly方法 // 即共享模式响应中断的获取 sync.acquireSharedInterruptibly(1); } // 计数 - 1 public void countDown() { sync.releaseShared(1); } }
3.3.1 await() 方法解析
// CountDownLatch public void await() throws InterruptedException { // 调用AQS的acquireSharedInterruptibly方法 sync.acquireSharedInterruptibly(1); } // 进入AQS public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 中断判断 if (Thread.interrupted()) throw new InterruptedException(); // 如果没有获取到同步状态,或者说计数值不为0 // 则调用doAcquireSharedInterruptibly方法,进入同步队列 // 如果计数值为0则执行后续业务逻辑 if (tryAcquireShared(arg) < 0) // 该方法的解析参考文章结尾的链接,此处不再赘述 doAcquireSharedInterruptibly(arg); } // CountDownLatch 中tryAcquireShared的实现 protected int tryAcquireShared(int acquires) { // 当计数为0时,线程才不会进入同步队列 return (getState() == 0) ? 1 : -1; }
通过上面代码可以知道,如果计数值为0,表示获取成功。这就是CountDownLatch的机制,尝试获取latch的线程只有当latch的值减到0的时候,才能获取成功。
3.3.2 countDown() 方法解析
// CountDownLatch public void countDown() { // 调用AQS的releaseShared方法 sync.releaseShared(1); } // 进入AQS public final boolean releaseShared(int arg) { // 共享模式释放 if (tryReleaseShared(arg)) { // 如果释放成功则唤醒等待的线程,并返回true // 具体唤醒逻辑不再赘述,参考AQS解析文章 doReleaseShared(); return true; } return false; } // CountDownLatch 中tryReleaseShared的实现 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; // 通过自旋 + CAS方式改变剩余计数 if (compareAndSetState(c, nextc)) // 如果计数为0返回true,否则返回false,结束自旋 // 返回true表示可以唤醒等待的线程 return nextc == 0; } }
3.3.3 CountDownLatch如何唤醒所有调用 await() 等待的线程呢?
当调用doReleaseShared()唤醒后继节点后,回到线程被挂起的地方,也就是doAcquireSharedInterruptibly(int arg)方法中
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 将当前线程加入同步队列的尾部 final Node node = addWaiter(Node.SHARED); try { // 自旋 for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 如果前驱节点是头结点,则尝试获取同步状态 if (p == head) { // 当前节点尝试获取同步状态 int r = tryAcquireShared(arg); if (r >= 0) { // 如果获取成功,则设置当前节点为头结点 setHeadAndPropagate(node, r); p.next = null; // help GC return; } } // 如果当前节点的前驱不是头结点,尝试挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }
当头结点的后继节点被唤醒后,线程将从挂起的地方醒来,继续执行,因为没有return,所以进入下一次循环。
此时,获取同步状态成功,执行setHeadAndPropagate(node, r)。
// 如果执行这个函数,那么propagate一定等于1 private void setHeadAndPropagate(Node node, int propagate) { // 获取头结点 Node h = head; // 因为当前节点被唤醒,设置当前节点为头结点 setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 获取当前节点的下一个节点 Node s = node.next; // 如果下一个节点为null或者节点为shared节点 if (s == null || s.isShared()) doReleaseShared(); } } private void doReleaseShared() { // 自旋 for (;;) { Node h = head; // 如果队列存在排队的节点 if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { // CAS设置不成功则不断循环 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // CAS操作成功后释放后继节点,并唤醒线程 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 队列不存在排队的节点,直接结束自旋 if (h == head) // loop if head changed break; }
调用doReleaseShared方法唤醒后继节点,后继节点又回到线程被挂起的地方,也就是doAcquireSharedInterruptibly(int arg)方法中,实现循环唤醒所有await的线程。
此篇文章只解析了CountDownLatch的实现,它就是一个基于 AQS 的计数器,它内部的方法都是围绕 AQS 框架来实现的。
建议感兴趣的同学先去了解AQS原理,只要明白了AQS的实现原理,再来看CountDownLatch、Semaphore、ReentrantLock等实现原理就一目了然了。
到此这篇关于Java中的CountDownLatch原理深入解析的文章就介绍到这了,更多相关CountDownLatch原理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!