java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > java锁机制ReentrantLock

java锁机制ReentrantLock源码实例分析

作者:沉迷学习的小伙

这篇文章主要为大家介绍了java锁机制ReentrantLock源码实例分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

一:简述

ReentrantLock是java.util.concurrent包中提供的一种锁机制,它是一种可重入,互斥的锁,ReentrantLock还同时支持公平和非公平两种实现。本篇文章基于java8,对并发工具ReentrantLock的实现原理进行分析。

二:ReentrantLock类图

三:流程简图

四:源码分析

我们以lock()方法和unlock()方法为入口对ReentrantLock的源码进行分析。

lock()源码分析:

ReentrantLock在构造构造方法创建对象的时候会根据构造函数传递的fair参数 创建不同的Sync对象(默认是非公平锁的实现),reentrantLock.lock()会调用sync.lock()。在Sync类中的lock()方法是一个抽象方法,具体实现分别在FairSync(公平)和NonfairSync(非公平)中。

    public ReentrantLock() {
        //默认是非公平锁
        sync = new NonfairSync();
    }
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    public void lock() {
        sync.lock();
    }

非公平实现:

       final void lock() {
            // state表示锁重入次数 0代表无锁状态
            //尝试抢占锁一次  利用cas替换state标志 替换成功代表抢占锁成功
            if (compareAndSetState(0, 1))
		//抢占成功将抢占到锁的线程设置为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

公平锁实现:

	final void lock() {
            acquire(1);
        }

可以看出非公平锁在lock()方法开始会尝试cas抢占一次锁,也就是在这里会插队一次。然后都会调用acquire()方法。acquire()方法中调用了三个方法,tryAcquire(),addWaiter(),acquireQueued()三个方法,而这三个方法正是ReentrantLock加锁的核心方法,接下来我们会对这三个方法进行重点的分析。

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))		
	//这里acquireQueued()是一步步将线程是否中断的标志传回来的 如果为true 那么代表线程被中断过 需要设置中断标志 交给客户端处理
	// 因为LockSupport.park()之后不会对中断进行响应 所以需要一步一步将中断标记传回来
            selfInterrupt();
        }

tryAcquire()方法

流程图:

tryAcquire()方法是抢占锁的方法,返回ture表示线程抢占到锁了,如果抢占到锁就什么都不做,直接执行同步代码,如果没有抢占到锁就需要将线程的信息保存起来,并且阻塞线程,也就是调用addWaiter()方法和acquireQueued()方法。

tryAcquire()也有公平和非公平锁两种实现。

公平锁实现:

	protected final boolean tryAcquire(int acquires) {
	    //获取当前线程
            final Thread current = Thread.currentThread();
	    //获取state的值
            int c = getState();
            if (c == 0) {
	    //如果state为0 代表现在是无锁状态 那么可以抢占锁
	    //公平锁这里多了一个判断 只有在AQS链表中不存在元素 才去尝试抢占锁 否则就去链表中排队
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
		//cas替换成功 那么就代表抢占锁成功了 将获取锁的线程设置为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
	//判断获取锁的线程是否是当前线程 如果是的话增加重入次数 (即增加state的值)
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

非公平锁实现:

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
    }
	final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
		//如果state为0 代表现在是无锁状态 非公平锁这里就直接尝试抢占锁
		// 公平锁这里多了一个判断 先去看AQS链表中有没有已经在等待的线程 有的话就不会尝试去抢占锁了,非公平锁这里没有这个判断,也就是这里允许插队(不公平)
                if (compareAndSetState(0, acquires)) {	
		//cas替换成功 那么就代表抢占锁成功了 将获取锁的线程设置为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
	   //判断获取锁的线程是否是当前线程 如果是的话增加重入次数 (即增加state的值)
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                //重新设置state的值    
                setState(nextc);
                return true;
            }
            return false;
        }

接下来分析addWaiter()方法

addWaiter()

addWaiter()方法的作用就是将没有获取到锁的线程封装成一个Node对象,然后存储在AbstractQueuedSynchronizer这个队列同步器中(为了偷懒简称AQS)。我们先看一下Node对象的结构。

    static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        Node() {    // Used to establish initial head or SHARED marker
        }
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

通过Node对象的结构我们可以看出这是一个双向链表的结构,保存了prev和next引用,thread成员变量用来保存阻塞的线程引用,并且node是有状态的,分别是CANCELLED,SIGNAL,CONDITION,PROPAGATE,其中ReentrantLock涉及到的状态就是SIGNAL(等待被唤醒),CANCELLED(取消)(由于相关性不强,其他的状态在后续文章用到的时候在讲吧)。而AQS又保存了链表的头结点head和尾结点tail,所以实际上AQS存储阻塞线程的数据结构是一 个Node双向链表。addWaiter()方法的作用就是将阻塞线程封装成Node并且将其保存在AQS的链表结构中。

流程图:

源码分析:

private Node addWaiter(Node mode) {
	//将没有获得锁的线程封装成一个node 
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
	//如果AQS尾结点不为null 代表AQS链表已经初始化 尝试将构建好的节点添加到链表的尾部
        if (pred != null) {
            node.prev = pred;
	    //cas替换AQS的尾结点 
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
	//没有初始化调用enq()方法
        enq(node);
        return node;
    }
private Node enq(final Node node) {
	    //自旋
        for (;;) {
            Node t = tail;
	    //尾结点为空 说明AQS链表还没有初始化 那么进行初始化
            if (t == null) { // Must initialize
	    //cas 将AQS的head节点 初始化 成功初始化head之后,将尾结点也初始化
            //注意 这里我们可以看到head节点是不存储线程信息的 也就是说head节点相当于是一个虚拟节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
		//尾结点不为空 那么直接添加到链表的尾部即可
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

接下来分析acquireQueued()

acquireQueued()

acquireQueued()方法的作用就是利用Locksupport.park()方法将AQS链表中存储的线程阻塞起来。

流程图:

源码分析:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
	    //进入自旋
            for (;;) {
		//获取当前节点的前一个节点
                final Node p = node.predecessor();
		// 如果前一个节点是head 而且再次尝试获取锁成功,将节点从AQS队列中去除 并替换head 同时返回中断标志
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    //注意 只有抢占到锁才会跳出这个for循环
                    return interrupted;
                }
                //去除状态为CANCELLED的节点 并且阻塞线程 线程被阻塞在这
                //注意 线程被唤醒之后继续执行这个for循环 尝试抢占锁 没有抢占到的话又会阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                //如果失败 将node状态修改为CANCELLED
                cancelAcquire(node);
        }
    }

在acquireQueued()方法中有两个方法比较重要shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法。

shouldParkAfterFailedAcquire()

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
	    //如果节点是SIGNAL状态 不需要处理 直接返回
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
	   //如果节点状态>0 说明节点是取消状态 这种状态的节点需要被清除 用do while循环顺便清除一下前面的连续的、状态为取消的节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
	    //正常的情况下 利用cas将前一个节点的状态替换为 SIGNAL状态 也就是-1
	    //注意 这样队列中节点的状态 除了最后一个都是-1 包括head节点
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
  //挂起当前线程 并且返回中断标志  LockSupport.park(thread) 会调用UNSAFE.park()方法将线程阻塞起来(是一个native方法)
        LockSupport.park(this);
        return Thread.interrupted();
    }

到这里lock()方法也就分析完了 接下来我们分析unlock()方法

unlock()方法源码分析:

流程图:

reentrantLock的unlock()方法会调用sync的release()方法。

public void unlock() {
	    //每次调用unlock 将state减1
        sync.release(1);
}

release()方法有两个方法比较重要,tryRelease()方法和unparkSuccessor(),tryRelease()方法计算state的值,看线程是否已经彻底释放锁(这是因为ReentrantLock是支持重入的),如果已经彻底释放锁那么需要调用unparkSuccessor()方法来唤醒线程,否则不需要唤醒线程。

public final boolean release(int arg) {
	//只有tryRelease返回true 说明已经释放锁 需要将阻塞的线程唤醒 否则不需要唤醒别的线程
        if (tryRelease(arg)) {
            Node h = head;
	//如果头结点不为空 而且状态不为0 代表同步队列已经初始化 且存在需要唤醒的node
	//注意 同步队列的头结点相当于是一个虚拟节点  这一点我们可以在构建节点的代码中很清楚的知道
	//并且在shouldParkAfterFailedAcquire方法中 会把head节点的状态修改为-1
	//如果head的状态为0 那么代表队列中没有需要被唤醒的元素 直接返回true
            if (h != null && h.waitStatus != 0)
		//唤醒头结点的下一个节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease()

protected final boolean tryRelease(int releases) {
	     //减少重入次数
            int c = getState() - releases;
	    //如果获取锁的线程不是当前线程 抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
	    //如果state为0 说明已经彻底释放了锁 返回true
            if (c == 0) {
                free = true;
		//将获取锁的线程设置为null
                setExclusiveOwnerThread(null);
            }
	    //重置state的值
            setState(c);
	    //如果释放了锁 就返回true 否则返回false
            return free;
        }

unparkSuccessor()

private void unparkSuccessor(Node node) {
        //获取头结点的状态 将头结点状态设置为0 代表现在正在有线程被唤醒 如果head状态为0 就不会进入这个方法了
        int ws = node.waitStatus;
        if (ws < 0)
            //将头结点状态设置为0
            compareAndSetWaitStatus(node, ws, 0);
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
	//唤醒头结点的下一个状态不是cancelled的节点 (因为头结点是不存储阻塞线程的)
        Node s = node.next;
	//当前节点是null 或者是cancelled状态
        if (s == null || s.waitStatus > 0) {
            s = null;
	 //从aqs链表的尾部开始遍历 找到离头结点最近的 不为空的 状态不是cancelled的节点 赋值给s 这里为什么从尾结点开始遍历而是头结点 应该是因为添加结点的时候是先初始化结点的prev的, 从尾结点开始遍历 不会出现prve没有赋值的情况 
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
	    //调用LockSupport.unpark()唤醒指定的线程
            LockSupport.unpark(s.thread);
    }	

五:总结

最后总结一下,有以下几点需要注意:

1.ReentrantLock有公平锁和非公平锁两种实现,其实这两种实现的差别只有两点,第一是在lock()方法开始的时候非公平锁会尝试cas抢占锁插一次队, 第二是在tryAcquire()方法发现state为0的时候,非公平锁会抢占一次锁,而公平锁会判断AQS链表中是否存在等待的线程,没有等待的线程才会去抢占锁。

2.AQS存储阻塞线程的数据结构是一个双向链表的结构,而且它是遵循先进先出的,因为它是从头结点的下一个节点开始唤醒,而添加新的节点的时候是添加到链表的尾部,所以AQS同时也是一个队列的数据结构。

3.线程被唤醒之后会继续执行acquireQueued()方法,因为它是阻塞在acquireQueued()方法的for循环中的,唤醒后尝试去获取锁,获取成功就会将节点从AQS中删除并跳出for循环,否则又会继续阻塞。(获取锁失败的原因就是因为有人插队啊。。也就是非公平锁导致的)。

以上就是java锁机制ReentrantLock源码实例分析的详细内容,更多关于java锁机制ReentrantLock的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文