java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > ReentrantLock的实现原理

关于ReentrantLock的实现原理解读

作者:盛夏温暖流年

这篇文章主要介绍了关于ReentrantLock的实现原理,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

ReentrantLock 简介

ReentrantLock 实现了 Lock 接口,是一种可重入的独占锁。

相比于 synchronized 同步锁,ReentrantLock 更加灵活,拥有更加强大的功能,比如可以实现公平锁机制。

首先,先来了解一下什么是公平锁机制。

ReentrantLock 的公平锁机制

我们知道,ReentrantLock 分为公平锁非公平锁,可以通过构造方法来指定具体类型:

//默认非公平锁
public ReentrantLock() {
    sync = new NonfairSync();
}
//公平锁
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

公平锁

在多个线程竞争获取锁时,公平锁倾向于将访问权授予等待时间最长的线程。

也就是说,公平锁相当于有一个线程等待队列,先进入队列的线程会先获得锁,按照 "FIFO(先进先出)" 的原则,对于每一个等待线程都是公平的。

非公平锁

非公平锁是抢占模式,线程不会关注队列中是否存在其他线程,也不会遵守先来后到的原则,直接尝试获取锁。

接下来进入正题,一起分析下 ReentrantLock 的底层是如何实现的。

ReentrantLock 的底层实现

ReentrantLock 实现的前提是 AbstractQueuedSynchronizer(抽象队列同步器),简称 AQS,是 java.util.concurrent 的核心,

常用的线程并发类 CountDownLatch、CyclicBarrier、Semaphore、ReentrantLock 等都包括了一个继承自 AQS 抽象类的内部类。

同步标志位 state

AQS 内部维护了一个同步标志位 state,用来实现同步加锁控制:

private volatile int state;

同步标志位 state 的初始值为 0,线程每加一次锁,state 就会加 1,也就是说,已经获得锁的线程再次加锁,state 值会再次加 1。

可以看出,state 实际上表示的是已获得锁的线程进行加锁操作的次数。

CLH 队列

除了 state 同步标志位外,AQS 内部还使用一个 FIFO 的队列(也叫 CLH 队列)来表示排队等待锁的线程,当线程争抢锁失败后会封装成 Node 节点加入 CLH 队列中去。

Node 的代码实现:

static final class Node {
      // 标识当前节点在共享模式
      static final Node SHARED = new Node();
      // 标识当前节点在独占模式
      static final Node EXCLUSIVE = null;
      static final int CANCELLED =  1;
      static final int SIGNAL    = -1;
      static final int CONDITION = -2;
      static final int PROPAGATE = -3;
    volatile int waitStatus;
      //前驱节点
      volatile Node prev;
      //后驱节点
      volatile Node next;
      //当前线程
      volatile Thread thread;
      //存储在condition队列中的后继节点
      Node nextWaiter;
      //是否为共享锁
      final boolean isShared() {
            return nextWaiter == SHARED;
      }
      final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
      }
      Node() {}
      Node(Thread thread, Node mode) {
            this.nextWaiter = mode;
            this.thread = thread;
      }
      Node(Thread thread, int waitStatus) {
            this.waitStatus = waitStatus;
            this.thread = thread;
      }
}

分析代码可知, 每个 Node 节点都有两个指针,分别指向直接后继节点和直接前驱节点。

Node 节点的变化过程

当出现锁竞争以及释放锁的时候,AQS 同步队列中的 Node 节点会发生变化,如下图所示:

  

线程封装成 Node 节点追加到队列末尾,设置当前节点的 prev 节点和 next 节点的指向;通过 CAS 将 tail 重新指向新的尾部节点,即当前插入的 Node 节点;

head 节点表示获取锁成功的节点,当头结点释放锁后,会唤醒后继节点,如果后继节点获得锁成功,就会把自己设置为头结点,节点的变化过程如下:

修改 head 节点指向下一个获得锁的节点;新的获得锁的节点,将 prev 的指针指向 null;

和设置 tail 的重新指向不同,设置 head 节点不需要用 CAS,是因为设置 head 节点是由获得锁的线程来完成的,而同步锁只能由一个线程获得,所以不需要 CAS 保证。

只需要把 head 节点设置为原首节点的后继节点,并且断开原 head 节点的 next 引用即可。

除了前驱和后继节点,Node 类中还包括了 SHARED 和 EXCLUSIVE 节点,它们起到了什么作用呢?这就不得不介绍一下 AQS 的两种资源共享模式了。

AQS 的资源共享模式

AQS 通过 EXCLUSIVE 和 SHARED 两个变量来定义独占模式共享模式

独占模式

独占模式是最常用的模式,使用范围很广,比如 ReentrantLock 的加锁和释放锁就是使用独占模式实现的。

独占模式中的核心加锁方法是 acquire()

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

这里首先调用 tryAcquire() 方法尝试获取锁,也就是尝试通过 CAS 修改 state 为 1,如果发现锁已经被当前线程占用,就执行重入,也就是给 state+1;

如果锁被其他线程占有,那么当前线程执行 tryAcquire 返回失败,则会执行 addWaiter() 方法在等待队列中添加一个独占式节点,addWaiter() 方法实现如下:

    private Node addWaiter(Node mode) {
        //创建一个节点,此处mode是独占式的
        Node node = new Node(mode);
        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                // 如果tail节点非空,就将新节点的前节点设置为tail节点,并将tail指向新节点
                node.setPrevRelaxed(oldTail);
                //CAS将tail更新为新节点
                if (compareAndSetTail(oldTail, node)) {
                    //把原tail的next设为当前节点
                    oldTail.next = node;
                    return node;
                }
            } else {
                //还没有初始化,就调用initializeSyncQueue()方法初始化
                initializeSyncQueue();
            }
        }
    }

写入队列后,需要挂起当前线程,代码如下:

/**
 * 已经入队的线程尝试获取锁
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; //标记是否成功获取锁
    try {
        boolean interrupted = false; //标记线程是否被中断过
        for (;;) {
            final Node p = node.predecessor(); //获取前驱节点
            //如果前驱是head,即该结点是第二位,有资格去尝试获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node); // 获取成功,将当前节点设置为head节点
                p.next = null; // 原head节点出队
                failed = false; //获取成功
                return interrupted; //返回是否被中断过
            }
            // 判断获取失败后是否可以挂起,若可以则挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                // 线程若被中断,设置interrupted为true
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

再看下 shouldParkAfterFailedAcquire 和 parkAndCheckInterrupt 都做了哪些事:

/**
 * 判断当前线程获取锁失败之后是否需要挂起.
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //前驱节点的状态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 前驱节点状态为signal,返回true
        return true;
    // 前驱节点状态为CANCELLED
    if (ws > 0) {
        // 从队尾向前寻找第一个状态不为CANCELLED的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 将前驱节点的状态设置为SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
/**
 * 挂起当前线程,返回线程中断状态并重置
 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

通过以上代码可以看出,线程入队后能够挂起的前提是,它的前驱节点的状态为 SIGNAL,这意味着当前一个节点获取锁并且出队后,需要把后面的节点进行唤醒。

加锁说完了再说解锁,解锁的方法相比来说更加简单,核心方法是 release():

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

代码流程:先尝试释放锁,若释放成功,那么查看头结点的状态是否为 SIGNAL,如果是,则唤醒头结点的下个节点关联的线程,如果释放失败就返回 false 表示解锁失败。

其中的 tryRelease() 方法实现如下,详细流程见注释说明:

/**
 * 释放当前线程占用的锁
 * @param releases
 * @return 是否释放成功
 */
protected final boolean tryRelease(int releases) {
    // 计算释放后state值
    int c = getState() - releases;
    // 如果不是当前线程占用锁,那么抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 锁被重入次数为0,表示释放成功
        free = true;
        // 清空独占线程
        setExclusiveOwnerThread(null);
    }
    // 更新state值
    setState(c);
    return free;
}

共享模式

共享模式和独占模式最大的区别在于,共享模式具有传播的特性。

共享模式获取锁的方法为 acquireShared,相比于独占模式,共享模式的加锁多了一个步骤,即自己拿到资源后,还会去唤醒后继队友;

而共享模式释放锁的方法为 releaseShared,它会释放指定量的资源,如果成功释放且允许唤醒等待线程,会唤醒等待队列里的其他线程来获取资源。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文