java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java AbstractQueuedSynchronizer

一文带你深入理解Java AbstractQueuedSynchronizer

作者:单程车票

在并发编程中,锁是一种保证线程安全的方式,这篇文章主要为大家介绍了AbstractQueuedSynchronizer(AQS)的数据结构及实现原理,感兴趣的小伙伴可以了解一下

前言

在并发编程中,锁是一种保证线程安全的方式,Java 主要有两种锁机制,一种是 synchronized 关键字修饰的锁,通过 JVM 层面的指令码来控制锁(依赖于底层的操作系统);而另一种则是 JUC 包下的各类同步器如 ReentrantLock 可重入锁,那么这类同步器是怎么样实现锁机制的呢?

其实 ReentrantLock 这类的锁是基于 AbstractQueuedSynchronizer(文章后面都称为 AQS)实现的。那么为什么 AQS 能够实现锁机制呢?

本篇文章将会深入讲解 AQS 的数据结构及实现原理。

AQS 概述

AQS 是什么?

AQS 直译为抽象队列同步器,是用来构建锁和同步器的重量级基础框架。JUC包下的锁和同步器如 ReentrantLockSemaphoreReentrantReadWriteLockCountDownLatch等都是基于 AQS 实现的。

AQS 的原理

在并发场景下,多线程存在抢占共享资源的情况,那么必定存在抢占不到资源的线程需要进行排队等待,并且当资源释放时也需要唤醒这些线程进行资源争抢,所以 AQS 提供了一套线程等待阻塞以及线程唤醒的机制来实现多线程下线程安全。

AQS 通过 维护一个 int 类型的状态变量和一个 FIFO 的虚拟双向队列(CLH 队列锁的变体) 来实现线程等待和唤醒机制的。

原理大致为:当线程请求共享资源空闲时,AQS 会将当前线程设置为有效的工作线程并通过 CAS 的方式将状态变量设置为锁定状态;当线程获取共享资源失败时,AQS 会将线程及等待状态封装成一个 Node 节点,将其加入队列中;当共享资源被释放时,AQS 会唤醒队列中的下一个节点再次尝试获取共享资源。

下文将通过源码分析具体展示 AQS 是如何实现线程等待阻塞以及唤醒的。

如何自定义同步器

上面提到 ReentrantLockSemaphoreReentrantReadWriteLockCountDownLatch等同步器都是基于 AQS 实现的(其实就是继承 AQS),那么是不是只要继承 AQS 也可以实现自定义同步器呢?

浅看 AQS 源码,可以看到 AQS 是抽象类。其实 AQS 是基于模板模式设计的,也就是说 AQS 已经提供了一套线程等待阻塞以唤醒的实现,不同的同步器只需要继承 AQS 类并重写 AQS 指定的方法来定制自己的获取资源和释放资源逻辑即可。

那么有哪些方法是可以进行重写的呢?

通过源码会发现 AQS 的所有方法只有 5 个方法是可以重写的,其余要不是 private 就是 final 修饰的方法。

下面列出这五个方法:

// (独占式)尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
// (独占式)尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
// (共享式)尝试获取资源,负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
// (共享式)尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}
// 判断当前线程是否正在独占式,只有用到condition才需要去实现它。
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

上面代码可以看到 AQS 分为独占式和共享式两种获取共享资源的方式:

需要实现独占式的同步器就去重写独占式的方法,需要实现共享式的同步器就去重写共享式的方法(注意这里重写的方法必须都是内部线程安全的,并且尽可能地简短)。

下面以独占式的 ReentrantLock 为例,看看 ReentrantLock 源码中是如何继承 AQS 并重写方法实现同步器的。

可以看到 ReentrantLock 通过内部类 Sync 继承 AQS 并在 Sync 中重写了独占式的 tryRelease() 方法,然后将独占式的 tryAcquire() 交给 Sync 的两个子类(也就是公平与非公平)去按照各自的逻辑实现。

基本上面提到同步器都是通过依赖内部类 Sync 继承 AQS 实现地同步器,所以需要自定义同步器的也可以仿照这样的方式创建。

AQS 的核心数据结构

前文说道 AQS 是基于 CLH 队列锁的变体 实现的,所以 AQS 的核心数据结构就是这个 CLH 队列锁的变体,在了解 AQS 的核心数据结构前,还需要先介绍一下 CLH 队列锁是什么?

CLH 队列锁是什么?

相信大家都听说过自旋锁吧,自旋锁是互斥锁的一种实现方式,通过 CAS 的方式获取锁和释放锁来实现互斥锁。但是自旋锁存在锁饥饿问题锁竞争激烈下性能较差问题

CLH 队列锁是对自旋锁的改进,有效地解决了自旋锁上面的两个问题,通过队列的方式可以防止锁饥饿问题,同时实现了锁状态去中心化,让每个线程可以在不同的状态变量下自旋,从而来减少 CPU 的性能开销。

CLH 队列锁可以看成是一个单向链表队列,将所有请求共享资源的线程封装成 节点(包含 线程标识 和 被锁定状态) 排列在队列中,如下图。

原理流程:

可以看到 CLH 队列锁通过队列实现了公平锁,先入队的线程会先获得共享资源,解决了锁饥饿问题;并且每个节点的被锁定状态只会影响到其后一个节点,实现了锁去中心化从而减少 CPU 的开销。

AQS 对 CLH 队列锁的改进

虽然 CLH 队列锁已经具有良好的性能了,但是因为存在自旋所以依旧存在 CPU 开销问题,并且 CLH 队列锁本身的功能单一,不能支持复杂的功能。

所以 AQS 对 CLH 队列锁进行了改进,使用 LockSupport 类将自旋改为阻塞线程操作(后续源码会具体介绍)来减少 CPU 的开销,扩展节点的状态以及显示的维护前驱和后续节点

AQS 使用内部类 Node 来实现 CLH 队列锁的变体,也就是 AQS 的核心数据结构。

下面看看 AQS 的内部类 Node 的源码:

static final class Node {
    // 共享模式
    static final Node SHARED = new Node();
    // 独占模式
    static final Node EXCLUSIVE = null;
    // 由于超时、中断或其他原因,线程被取消
    static final int CANCELLED =  1;
    // 当前节点的后继节点阻塞等待共享资源
    static final int SIGNAL    = -1;
    // 当前节点在条件队列
    static final int CONDITION = -2;
    // 当前节点的下一个acquireShared应无条件传播
    static final int PROPAGATE = -3;
    // 节点状态
    volatile int waitStatus;
    // 前驱节点
    volatile Node prev;
    // 后继节点
    volatile Node next;
    // 节点的线程
    volatile Thread thread;
    // 下一个等待者(这个用于 Condition,这里不做过多说明)
    Node nextWaiter;
    // 是否是共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    // 获取前驱节点,为空则抛出异常
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    // 各种构造器
    Node() {
    }
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

(说明:AQS 实际上有两种队列,一个是使用双向链表实现(利用 prevnext)的队列,一个是使用单向链表实现(利用 nextWaiter)的 Condition 队列,文章主要讲解 AQS 最为核心的双向链表队列,关于 Condition 的内容本文不做讲解,有兴趣的可以通过阅读源码)

从上面的源码中可以看到内部类 Node 中三个比较重要的属性:

waitStatus 节点状态

前驱节点和后继节点

除此之外还有一个常用的方法 predecessor() 用于获取当前节点的前驱节点,如果前驱节点为空则抛出异常。

AQS 的 CLH 队列锁变体如下图

图中有 headtail 两个变量分别指向队列头部和末尾节点,这是 AQS 类的属性(后续源码分析中介绍)。并且队列的头节点是哨兵节点(只用来占位,没有线程),其实就是 new Node() 初始化一个空节点(后续源码中介绍)。

AQS 源码分析

前面已经说明了 AQS 的核心数据结构,接下来将会通过源码去进一步的了解 AQS 是如何凭借 同步状态变量CLH 队列锁的变体 实现线程等待唤醒机制的。

1. 继承 AOS 类

先了解一下 AQS 继承的父类 AOS 为 AQS 提供了什么方法。

AQS 继承 AbstractOwnableSynchronizer 抽象类,AbstractOwnableSynchronizer 类中有exclusiveOwnerThread 属性,表示独占式下资源的所有者(持有共享资源的线程)。同时包含该属性的 get/set 方法用于设置当前独占资源的线程和获取当前独占资源的线程。因为继承关系也就意味着 AQS 可以调用这两个方法。

2. AQS 的属性

接下来了解一下 AQS 的属性,更好地去认识 AQS 的数据结构以及如何获取和设置同步状态变量。

查看源码:

// 序列号
private static final long serialVersionUID = 7373984972572414691L;
// 头结点
private transient volatile Node head;
// 尾结点
private transient volatile Node tail;
// 同步状态
private volatile int state;
// 自旋时间
static final long spinForTimeoutThreshold = 1000L;
// Unsafe实例以及各种内存偏移量
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

这里主要关注 head 头结点、tail 尾结点、state 同步状态即可。

AQS 中通过 Getter/Setter 方法以及 CAS 的方式设置和获取 state 属性。

// 获取同步状态
protected final int getState() {
    return state;
}
// 设置同步状态
protected final void setState(int newState) {
    state = newState;
}
// CAS方式设置同步状态(当期待值expect等于当前同步状态时,将同步状态设置为update值)
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

3. AQS 如何实现线程等待机制

通过前文可以了解到 ReentrantLock 是如何继承 AQS 的,那么下面以 ReentrantLock 的非公平锁为例深入 AQS 源码中分析 AQS 的 核心方法 以及 AQS 如何结合 同步状态CLH 队列锁的变体 实现线程的等待机制的。

首先创建非公平锁的可重入锁 Lock lock = new ReentrantLock();,此时 AQS 的 state 同步状态为0,表示没有线程占用共享资源。

lock.lock() 开始看 AQS 如何实现多线程抢占式下的线程等待阻塞

进入非公平锁的 lock()方法,查看源码:

可以看到调用 compareAndSetState() 用于以 CAS 的方式(原子操作)设置同步状态 state

AQS 核心方法 acquire()

AQS 实现线程获取资源以及线程等待阻塞的核心方法,查看源码:

可以看到这里共有三个 AQS 核心方法:tryAcquire()addWaiter()acquireQueued()

下面根据 if 语句从左到右执行的顺序依次进入这三个方法。

tryAcquire() 方法:自定义获取共享资源的逻辑

这个方法如果前文看的仔细的话,会发现它是需要子类重写的方法,也就是由子类自己定义抢占式获取资源的逻辑。

进入非公平锁实现的 tryAcquire() 方法

发现该方法调用了 nonfairTryAcquire() 方法,查看源码:

// 这里调用的是 nonfairTryAcquire(1),也就是acquires为 1
final boolean nonfairTryAcquire(int acquires) {
    // 获取当前线程
    final Thread current = Thread.currentThread();
    // 获取同步状态
    int c = getState();
    // 同步状态为0,说明共享资源空闲可以直接抢占
    if (c == 0) {
        // 再次尝试抢占
        if (compareAndSetState(0, acquires)) {
            // 成功则将当前线程设置为共享资源所有者
            setExclusiveOwnerThread(current);
            // 返回 true
            return true;
        }
    }
    // 当前共享资源还在被占用,判断当前线程是不是共享资源所有者
    else if (current == getExclusiveOwnerThread()) {
        // 增加同步状态(这里其实也就是 ReentrantLock 是可重入锁的原因,如果是自己线程持有的锁,可以再次加锁,也就是可重入)
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 更新同步状态
        setState(nextc);
        // 返回 true
        return true;
    }
    // 如果上面情况都不是则返回 false
    return false;
}

这个方法其实体现的主要是 ReentrantLock 可重入锁的逻辑,执行流程如下:

addWaiter() 方法:将当前线程结合模式封装成节点加入队列

该方法是 AQS 中的方法,查看源码:

private Node addWaiter(Node mode) {
    // 以当前线程以及 mode(这里是独占式)为参数构建节点
    Node node = new Node(Thread.currentThread(), mode);
    // 获取队列末尾节点
    Node pred = tail;
    // 判断末尾节点是否为空
    if (pred != null) {
        // 末尾不为空时,将当前节点链接到莫尾结点后面,并且将当前节点设置为末尾节点
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            // 返回当前节点
            return node;
        }
    }
    // 末尾节点为空,则进入 enq 方法
    enq(node);
    return node;
}

执行流程:

enq() 方法:初始化队列(加入哨兵节点为头结点)并加入当前节点

该方法是 AQS 中的方法,查看源码:

private Node enq(final Node node) {
    // 循环直到将 node 加入队列为止
    for (;;) {
        Node t = tail;
        // 判断末尾节点是否为空
        if (t == null) {
            // 为空,说明需要设置哨兵节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 不为空,直接加入末尾,并设置当前节点为末尾节点
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

执行流程:

这里也就是前文数据结构图中为什么头节点是哨兵节点的原因,如图:

acquireQueued() 方法:队列中轮询抢占共享资源,抢占不成功则进入阻塞等待唤醒

addWaiter() 执行完加入队列后会将节点返回作为参数执行 acquireQueued(),查看源码:

// node 为当前加入队列的节点,arg 为 1
final boolean acquireQueued(final Node node, int arg) {
    // 标志,用来判断是否执行后面的cancelAcquire方法
    // 也就是中途是否存在异常中断,有则使用cancelAcquire整理队列
    boolean failed = true;
    try {
        // 中断标志
        boolean interrupted = false;
        // 循环
        for (;;) {
            // 获取当前节点的前置节点
            final Node p = node.predecessor();
            // 判断前置节点是否是头结点(当前节点是否是队列第一个等待线程)以及再次尝试获取资源
            if (p == head && tryAcquire(arg)) {
                // 满足条件说明当前节点已经抢占到资源,可以从队列中移除
                // 所以这里会将当前节点设置为头结点(setHead方法操作是先将节点线程以及前驱节点置空)
                setHead(node);
                // 并且将原本的头结点置空,通过辅助GC回收
                p.next = null; // help GC
                // 标志置为false,表示没有异常
                failed = false;
                // 返回中断标志
                return interrupted;
            }
            // 上述条件不满足,则执行shouldParkAfterFailedAcquire和parkAndCheckInterrupt
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            // 整理队列,将队列中中断的进程移除
            cancelAcquire(node);
    }
}

这里需要分情况讨论:

shouldParkAfterFailedAcquire() 方法:更改前置节点的状态为 SIGNAL

该方法是 AQS 中的方法,查看源码:

// 传参 pred 为当前节点的前置节点;node 为 当前节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取前置节点的状态
    int ws = pred.waitStatus;
    // 判断是不是 Node.SIGNAL(-1)
    if (ws == Node.SIGNAL)
        // 状态为-1表示线程已经准备好了(阻塞等待了),就等资源释放进行争抢了
        // 返回 true 执行 parkAndCheckInterrupt() 方法(这玩意就是阻塞线程)
        return true;
    // 判断状态是否大于0,表示被中断
    if (ws > 0) {
        //前置节点为被中断,意味着这个节点没用了,需要再往前找,找到不是被中断的状态为止
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        // 找到后将新的前置节点与当前节点链接起来
        pred.next = node;
    } else {
        // CAS 方式将前置节点的状态设置为 Node.SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 返回 false 直接跳出
    return false;
}

这个方法就是当循环无法抢占到资源时,便会去更新当前线程节点的前置节点状态,更新完状态之后等待下一次循环如果还是抢占不到资源,就会去执行 parkAndCheckInterrupt() ,使用 LockSupport 把当前线程阻塞等待唤醒。

parkAndCheckInterrupt() 方法:阻塞当前线程

该方法是 AQS 中的方法,查看源码:

private final boolean parkAndCheckInterrupt() {
    // 调用 LockSupport 类进行阻塞当前线程
    LockSupport.park(this);
    // 返回当前线程是否中断,同时清除中断标志位
    return Thread.interrupted();
}

这里也就是前文提到的 AQS 对 CLH 队列锁的改进,不再通过自旋锁的方式轮询前一个节点的状态,而是尝试两次抢占后还是抢占不到就进入阻塞状态,等待资源释放后唤醒,这样做可以减少 CPU 开销。

该方法返回的是当前线程是否被中断的结果,被中断则返回 true 使得前面的 acquireQueued() 方法将中断标志置为 true,在抢占到资源后会根据中断标志最终进入语句执行 selfInterrupt() 方法将当前线程中断。

cancelAcquire 方法:当前线程出现异常时整理队列移除当前线程节点

该方法是 AQS 中的方法,查看源码:

// 参数为当前节点
private void cancelAcquire(Node node) {
    // 当前节点为空直接返回
    if (node == null)
        return;
    // 将当前节点的线程置空
    node.thread = null;
    // 这一步是找到当前节点的前驱节点
    Node pred = node.prev;
    // 如果前驱节点状态也为 CANCELLED 则循环找前驱节点的前驱节点直到状态不为 CANCELLED
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // 获取前驱节点的下一个节点
    Node predNext = pred.next;
    // 将当前节点设置为 CANCELLED
    node.waitStatus = Node.CANCELLED;
    // 判断当前节点是否是末尾节点,是则只需要将末尾节点设置成获取到的前驱节点即可
    // 相当于移除掉了前驱节点后的所有节点
    if (node == tail && compareAndSetTail(node, pred)) {
        // 并将前驱节点的下一节点置空
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        // 满足三个条件
        // 1. 前驱节点不是头结点
        // 2. 前驱节点的状态为 SIGNAL 或者 前驱节点状态小于0并且可以设置为 SIGNAL
        // 3. 前驱节点的线程不为空
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            // 保存当前节点的后继节点
            Node next = node.next;
            // 后继节点不为空并且状态不为 CANCELLED
            if (next != null && next.waitStatus <= 0)
                // 将前驱节点与后继节点链接起来,移除中间的节点
                compareAndSetNext(pred, predNext, next);
        } else {
            // 以上条件不满足则唤醒当前线程的下一个节点
            unparkSuccessor(node);
        }
        // 辅助GC回收
        node.next = node; // help GC
    }
}

执行过程:

当前线程不能正常退出时执行该方法,也就是出现异常时执行该方法

先判断该线程节点是否为空,为空则直接返回;不为空则将当前节点的线程置空

通过循环的方式找到当前节点的前置节点(这个节点状态不能为 CANCELLED),并记录前置节点的下一节点

将当前节点状态置为 CANCELLED,分情况讨论当前节点的位置

执行流程图

4. AQS 如何实现线程唤醒机制

假设当前 AQS 的 state 同步状态为 1,表示当前有线程占用共享资源。

lock.unlock() 开始看 AQS 如何实现多线程抢占式下的线程唤醒

深入源码,调用了 ReentrantLockunlock() 方法,查看源码:

进入 release() 方法,也就是 AQS 的 release() 方法,查看源码:

可以看到 AQS 的 release() 先调用 ReentrantLock 重写的 tryRelease() 进行判断。

tryRelease() 方法:自定义释放资源逻辑

查看 ReentrantLock 中的源码:

protected final boolean tryRelease(int releases) {
    // 传入releases 为 1, getState() 为 1
    // 所以 c = 1 - 1 = 0
    int c = getState() - releases;
    // 判断当前线程是否是共享资源所有者,不是则抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 判断c是否等于0,共享资源是否空闲
    if (c == 0) {
        // 更新free标志
        free = true;
        // 将共享资源所有者置空
        setExclusiveOwnerThread(null);
    }
    // 更新state
    setState(c);
    // 返回free标志
    return free;
}

这里是可重入锁 ReentrantLock 自定义的释放资源逻辑,过程较为易懂,就是将 当前的同步状态变量−参数releases当前的同步状态变量 - 参数releases当前的同步状态变量−参数releases 得到新的同步状态变量。如果为 0 说明共享资源已被当前线程释放返回 true。

返回 release 方法体,进入 if 语句,判断头结点不为空并且状态不为 0,表示队列存在并且头结点的后继节点已经准备好获取共享资源了,进入 unparkSuccessor() 方法。

unparkSuccessor 方法:唤醒后继节点的线程

private void unparkSuccessor(Node node) {
    // 获取当前结点的状态
    int ws = node.waitStatus;
    // 判断是否小于0,重新将状态置为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 获取当前结点的后继节点
    Node s = node.next;
    // 如果后继节点为空或者后继节点状态为 CANCELLED
    if (s == null || s.waitStatus > 0) {
        // 后继节点置空
        s = null;
        // 通过从后向前遍历找到离当前节点最近的后一个节点并且该节点状态小于0
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                // 如果状态小于0说明已经准备好获取资源了,替换掉原先的后继节点
                s = t;
    }
    // 后继节点不为空
    if (s != null)
        // 唤醒该后继节点的线程
        LockSupport.unpark(s.thread);
}

该方法就是获取传入节点的后继节点,并且保证该节点的状态小于0即已经准备好获取共享资源了,通过 LockSupportunpark() 方法唤醒后继节点的线程。

到这里就是 AQS 实现线程唤醒的全部过程了,AQS 的线程唤醒机制是通过上一个节点来唤醒当前节点的线程

以上就是一文带你深入理解Java AbstractQueuedSynchronizer的详细内容,更多关于Java AbstractQueuedSynchronizer的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文