java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > CountDownLatch解析

Java多线程中的CountDownLatch解析

作者:进击的猫

这篇文章主要介绍了Java多线程中的CountDownLatch解析,CountDownLatch是一个阻塞部分线程直到其他线程执行完成后唤醒的同步计数器,核心是其内部类Sync继承于AQS,同时也是利用的AQS的同步原理,也称之为闭锁,需要的朋友可以参考下

一、概念简介

CountDownLatch是一个阻塞部分线程直到其他线程执行完成后唤醒的同步计数器

核心是其内部类Sync继承于AQS,同时也是利用的AQS的同步原理,也称之为闭锁。

二、使用场景

当主线程进行执行时,利用构造方法初始化一个同步数state(AQS原理),主线程调用await方法进行阻塞主线程即谁调用谁阻塞,其它线程调用countDown方法会对计数器减1直到0,会精准唤醒被阻塞线程即被await方法阻塞的线程。

(1)用于多种数据源数据汇总;

(2)等待某一时间点才执行逻辑如加载缓存、加载配置等;

注意:为了程序的健壮性,尽量给出合适的时间,防止子线程中断导致线程无法唤醒的情况发生。

三、特点

(1)子线程调用countDown方法只会减1,不会阻塞线程;

(2)主线程调用await方法会导致其被阻塞,当计数器state被其他线程调用countDown方法减至0会唤醒被阻塞的线程;

(3)当主线程发生中断会抛出异常,导致无法唤醒主线程即无法达到屏障点。

CountDownLatch简单使用

 public static void main(String[] args) {
        System.out.println("main 线程开始执行!");
        CountDownLatch latch = new CountDownLatch(5);//初始化同步数
        for (int i = 0; i < 5; i++) {
            int threadId = i+1;
            new Thread(()->{
                System.out.println("线程"+threadId+"执行!");
                latch.countDown();
            }).start();//java8 lamda表达式
        }
        System.out.println("即将被阻塞!");
        try {
            latch.await();//阻塞主线程,等待子线程将state减至0被唤醒
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("main 线程执行完毕!");
    }

四、CountDownLatch源码分析

(1)构造函数

 /**
  * CountDownLatch唯一的构造函数,实例化时只能使用指定同步数的构造方法
  */
  public CountDownLatch(int count) {
      if (count < 0) throw new IllegalArgumentException("count < 0");
      this.sync = new Sync(count);//利用内部类(继承AQS)对state进行设置初始化大小
  }

(2)await方法(核心)

CountDownLatch类:

public void await() throws InterruptedException {
    //核心成员变量sync调用AQS中的方法acquireSharedInterruptibly
    sync.acquireSharedInterruptibly(1);
}

AQS类:

  public final void acquireSharedInterruptibly(int arg)
              throws InterruptedException {
      if (Thread.interrupted())//判断是否有中断标志
          throw new InterruptedException();
          /**
          * 该方法是由子类重写,AQS强制其子类重写,否则报错
          * 根据if中的值判断是否需要阻塞操作 1代表不需要阻塞 -1代表需要阻塞
          */
      if (tryAcquireShared(arg) < 0)
          doAcquireSharedInterruptibly(arg);//调用AQS共享锁阻塞操作
  }

Sync类:

/**
 * 获取同步数并判断是否需要唤醒
 * 同步数state为0,则需要唤醒返回1即不需要阻塞
 * 同步数state不为1,则不需要唤醒,返回-1后的操作即阻塞
 */
 protected int tryAcquireShared(int acquires) {
     return (getState() == 0) ? 1 : -1;//获取AQS中的state进行返回是否需要进行阻塞操作
 }
//以共享锁的方式进行阻塞
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    /**
    * addWaiter方法主要是基于当前线程创建一个等待着并入队且会创建一个哨兵节点
    * addWaiter具体细节和其内部enq初始化队列方法请转入AQS分析
    */
    final Node node = addWaiter(Node.SHARED);//以共享锁创建一个等待者node
    boolean failed = true;
    try {
        for (;;) {//自旋,是否需要阻塞
            final Node p = node.predecessor();//当前线程的前继节点
            if (p == head) {//前继节点是否为头节点
                int r = tryAcquireShared(arg);//尝试获取共享锁即是否需要阻塞1和-1值
                if (r >= 0) {//当其大于等于时,r值只能时1或者-1,满足该条件时则说明不需要阻塞
                    setHeadAndPropagate(node, r);//设置新的头结点并释放共享锁
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            /**
            * shouldParkAfterFailedAcquire主要是改变前节点的等待信号量
            * parkAndCheckInterrupt在前者返回TRUE的情况下会直接调用LockSupport.park()进行阻塞
            * 上述两种方法在AQS分析中可找到详细解释
            */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();//上述两个条件满足则代表线程被中断过
        }
    } finally {
        if (failed)//出现异常且未执行for循环中改变该failed值
            cancelAcquire(node);//取消超时节点和当前节点取消唤醒,AQS原理分析中详细讲解
    }
}

(3)countDown方法(核心)

//用于子线程调用将同步数-1
public void countDown() {
    sync.releaseShared(1);//通过内部成员变量sync调用内部Sync类继承AQS中的释放方法
}

AQS类:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//AQS类中定义强制子类重写该方法,用于是否需要唤醒被阻塞的线程
        doReleaseShared();//满足判断条件则进行正常释放
        return true;//释放成功
    }
    return false;//不需要释放
}

Sync类:

/**
* 主要利用自旋锁的原理,对state值进行-1
*/
protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();//获取state值
        if (c == 0)//还未开始自减,已为0则代表不能正常释放
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))//CAS对state值进行设置新的值
            return nextc == 0;//计数器是否为0,此状态为0代表可以正常释放
    }
}
/**
 * 释放共享锁
 */
private void doReleaseShared() {
    for (;;) {//自旋
        Node h = head;//头节点
        if (h != null && h != tail) {//代表可唤醒且不是尾结点
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {//头节点的等待状态为唤醒信号量
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);//if中cas操作成功,则执行该唤醒方法,否则进行自旋或者结束
            }else if (ws == 0 &&//初始化但未被改变时
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//设置为无条件唤醒
                continue;// loop on failed CAS 该else if中CAS失败进行自旋
        }
        if (h == head)//loop if head changed 循环判定头结点是否发生变化,实际上是唤醒后会执行这里结束自旋
            break;
    }
}

AQS唤醒共享锁

/**
* (1)对信号量节点即前继节点等待值还原
* (2)对于node节点的后继节点不为null直接唤醒或从后往前找寻信号量最靠前的线程进行唤醒
*/
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;//该节点等待状态即头结点的信号量
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);//将该节点的状态值设置为0即初始值
    Node s = node.next;//获取唤醒节点即node的下一节点
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)//从后往前查找最靠前的信号量node
            if (t.waitStatus <= 0)//信号量或初始化值
                s = t;
    }
    if (s != null)//找到唤醒节点
        LockSupport.unpark(s.thread);对该线程进行唤醒
}

到此这篇关于Java多线程中的CountDownLatch解析的文章就介绍到这了,更多相关CountDownLatch解析内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文