java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java AQS 原理

Java AQS 原理与 ReentrantLock 实现方法

作者:科韵小栈

AQS 的作用是解决同步器的实现问题,它将复杂的同步器实现分解为简单的框架方法,开发者只需要实现少量特定的方法就能快速构建出可靠的同步器,这篇文章主要介绍Java AQS原理与ReentrantLock实现,需要的朋友可以参考下

一、AQS 简介

AbstractQueuedSynchronizer(简称 AQS)是 Java 并发包(java.util.concurrent)中最核心的基础组件之一,它为 Java 中的大多数同步类(如 ReentrantLock、Semaphore、CountDownLatch 等)提供了一个通用的框架。理解 AQS 的工作原理对于深入掌握 Java 并发编程至关重要。

AQS 的作用是解决同步器的实现问题,它将复杂的同步器实现分解为简单的框架方法,开发者只需要实现少量特定的方法就能快速构建出可靠的同步器。

二、AQS 核心设计

2.1 核心组成部分

AQS 主要由以下部分组成:

2.2 AQS 的工作原理

AQS 通过模板方法模式,将一些通用的同步操作封装在框架内部,而将特定同步器的特性(如资源是否可获取的判断)交给子类去实现。AQS 提供以下基本操作:

2.3 AQS 的关键方法

AQS 定义了一组需要子类实现的方法:

三、ReentrantLock 与 AQS 的关系

ReentrantLock 是基于 AQS 实现的可重入锁,它通过内部类 Sync(继承自 AQS)来实现锁的基本功能,并通过 FairSync 和 NonfairSync 两个子类分别实现公平锁和非公平锁。

3.1 ReentrantLock 的结构

public class ReentrantLock implements Lock {
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 实现锁的基本操作
    }
    // 公平锁实现
    static final class FairSync extends Sync { ... }
    // 非公平锁实现
    static final class NonfairSync extends Sync { ... }
}

3.2 ReentrantLock 如何使用 AQS 的 state

ReentrantLock 使用 AQS 的 state 字段来表示锁的持有次数:

四、AQS 关键流程分析

4.1 独占锁的获取流程

当线程调用 ReentrantLock.lock()方法时,实际上会执行以下流程:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 尝试快速添加到队列尾部
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 快速添加失败,进入完整的入队方法
    enq(node);
    return node;
}
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 如果前驱是头节点,说明轮到当前节点尝试获取锁
            if (p == head && tryAcquire(arg)) {
                // 获取成功,把当前节点设为头节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 判断是否应该阻塞当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

4.2 独占锁的释放流程

当线程调用 ReentrantLock.unlock()方法时,会执行以下流程:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
private void unparkSuccessor(Node node) {
    // 获取当前节点的等待状态
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 找到下一个需要唤醒的节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从尾部向前查找需要唤醒的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 唤醒找到的节点
    if (s != null)
        LockSupport.unpark(s.thread);
}

五、公平锁与非公平锁

ReentrantLock 支持公平锁和非公平锁两种模式:

5.1 非公平锁(默认)

非公平锁的 tryAcquire 实现:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 非公平锁直接尝试CAS获取锁,不检查队列
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

5.2 公平锁

公平锁的 tryAcquire 实现:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 公平锁会先调用hasQueuedPredecessors检查是否有前驱节点在等待
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

公平锁与非公平锁的主要区别在于获取锁时是否考虑等待队列。公平锁会检查是否有线程在等待队列中排队,而非公平锁则直接尝试获取,不考虑等待顺序。

六、自定义实现:简化版 ReentrantLock

为了更深入理解 AQS 原理,我们可以实现一个简化版的 ReentrantLock:

public class SimpleReentrantLock implements Lock {
    private final Sync sync;
    /**
     * 默认创建非公平锁
     */
    public SimpleReentrantLock() {
        sync = new NonfairSync();
    }
    /**
     * 根据参数创建公平锁或非公平锁
     */
    public SimpleReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    /**
     * 继承AQS的同步器实现
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
        /**
         * 非公平的方式获取锁
         */
        final boolean unfairTryAcquire(int acquires) {
            // 获取当前线程
            final Thread current = Thread.currentThread();
            // 获取当前state状态
            int c = getState();
            // state为0表示锁未被持有
            if (c == 0) {
                // 使用CAS尝试将state从0设置为1
                if (compareAndSetState(0, acquires)) {
                    // 成功获取锁,设置当前持有锁的线程为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果当前线程就是持有锁的线程,实现可重入
            else if (current == getExclusiveOwnerThread()) {
                // 增加state值实现重入计数
                int nextC = c + acquires;
                // 检查溢出
                if (nextC < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                // 设置新的state值,这里不需要CAS因为当前线程已经持有锁
                setState(nextC);
                return true;
            }
            // 获取锁失败
            return false;
        }
        /**
         * 释放锁
         */
        @Override
        protected final boolean tryRelease(int releases) {
            // 检查当前线程是否是持有锁的线程
            if (Thread.currentThread() != getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            }
            // 减少state值
            int c = getState() - releases;
            // 判断是否完全释放锁
            boolean free = (c == 0);
            if (free) {
                // 完全释放锁,清空持有锁的线程
                setExclusiveOwnerThread(null);
            }
            // 更新state值
            setState(c);
            return free;
        }
        /**
         * 判断当前线程是否持有锁
         */
        @Override
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
        /**
         * 创建条件变量
         */
        Condition newCondition() {
            return new ConditionObject();
        }
        /**
         * 获取锁的持有计数
         */
        public int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
    }
    /**
     * 公平锁的实现
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        @Override
        protected boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 公平性体现:先检查队列中是否有前驱节点在等待
                if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextC = c + acquires;
                if (nextC < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                setState(nextC);
                return true;
            }
            return false;
        }
    }
    /**
     * 非公平锁的实现
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        /**
         * 非公平锁的获取实现
         */
        @Override
        protected boolean tryAcquire(int acquires) {
            return unfairTryAcquire(acquires);
        }
    }
    // 实现Lock接口的方法
    @Override
    public void lock() {
        sync.acquire(1);
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    @Override
    public boolean tryLock() {
        return sync.unfairTryAcquire(1);
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }
    @Override
    public void unlock() {
        sync.release(1);
    }
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
    /**
     * 查询当前锁是否被某个线程持有
     */
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
    /**
     * 查询当前线程是否持有锁
     */
    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }
    /**
     * 获取当前锁的持有计数
     */
    public int getHoldCount() {
        return sync.getHoldCount();
    }
}

七、Condition 实现原理

AQS 提供了 ConditionObject 内部类,用于实现 Condition 接口,支持类似 wait/notify 的条件等待/通知机制:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加到条件队列
    Node node = addConditionWaiter();
    // 完全释放锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 循环检查节点是否已经转移到同步队列
    while (!isOnSyncQueue(node)) {
        // 阻塞当前线程
        LockSupport.park(this);
        // 检查中断
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 重新竞争锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

八、AQS 的应用场景

AQS 广泛应用于 Java 并发包中的各种同步器:

九、总结

AQS 是 Java 并发框架中最核心的基础组件,它通过以下机制实现了高效的线程同步:

理解 AQS 的工作原理,不仅有助于更好地使用 Java 并发包中的同步器,也能帮助我们在必要时实现自己的高效同步器。AQS 通过简洁的设计将复杂的同步器问题分解为少量的基本方法,使得开发者能够快速实现各种同步器。ReentrantLock 相比 synchronized 提供了更多的功能,如可中断、超时等待、公平性选择等。

到此这篇关于深入理解 Java AQS 原理与 ReentrantLock 实现的文章就介绍到这了,更多相关Java AQS 原理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文