java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java的AQS

Java并发框架中的AQS详细解析

作者:Smallc0de

这篇文章主要介绍了Java并发框架中的AQS详细解析,之前说锁的升级的时候,说到了自旋锁会空转几次尝试等待获取资源,其实这一系列的动作是有一个规范的这个规范叫做同步发生器AbstractQueuedSynchronizer ,简称AQS,需要的朋友可以参考下

前言

之前说锁的升级的时候,说到了自旋锁会空转几次尝试等待获取资源,其实这一系列的动作是有一个规范的这个规范叫做同步发生器AbstractQueuedSynchronizer ,简称AQS。

同步发生器是用来构建锁用的,可以说是Java中同步组件的基础,在Java JDK的JUC包中:java.util.concurrent。

我们常用的ReentrantLock,Semaphore等等用的都是这样一个架构,可以说是这些Lock工具的一个基础。

AQS基本思想

AQS是一个队列管理器,通过内置的FIFO同步队列去管理线程争夺资源的。

FIFO就是先进先出(Frist In First Out)的缩写。

其实现核心是用到了CHL同步队列,其核心思想就是:把每一个线程看作一个节点Node,然后给这些节点上加上前驱指针和后继指针,这样一来就可以用指针把这些线程节点(Thread Node)连接起来形成一个双向链表,或者说双向队列。

除此之外,还有一个同步器节点(Synchronized Node),用来管理这些线程节点。

同步器节点也有两个指针,第一个指针指向队列首节点,第二个指针指向队列尾节点。

因此同步器节点是事实上的头节点Head,下图就是一个完整的CHL同步队列示意图。

在这里插入图片描述

注:CHL是人名简称没啥具体意义。

AQS操作同步队列

同步队列有了,谁能拿到资源则是由一个状态变量(state)来确定的。

当state==0时,表示当前资源没有线程占用;当state>=1时,表示当前资源已经被占用了,其他线程必须等待资源释放。

假设有一个线程要使用资源,首先先会去检查state变量获取结果,如果state==0说明该线程可以使用请求资源,不需要排队,直接取出线程节点去执行;如果state>=1,说明该资源已经被前面的线程拿走了,就必须要排队。

在这里插入图片描述

既然有个这个概念,所以说每一个线程节点都会做这样的事情:获取锁和释放锁。

每个在队列里面的县城节点,都会不断地自旋,每次自旋结束都会尝试获取锁,如果获取不到那么继续自旋。

由于是FIFO先进先出这种公平模式,因此线程头节点总会第一个获取到锁,以此类推。

这里所有的节点都在[ 尝试获取锁 – 自旋 ] 这种状态不断地重复。但是由于使用FIFO模式,只有头节点的自旋是有意义的,其他的就是在空转。

在这里插入图片描述

这样做有什么好处呢?假设我们把所有没有获取到资源的线程都挂起,这就必然要经过用户线程和核心(系统)线程之间的切换,这种切换是非常耗时的。

由于CPU执行的会很快,所以预期就是自旋几次以后,就可以拿到想要的锁,以规避线程之间的切换。

AQS 的用法

上面说过AbstractQueuedSynchronizer是一个框架,它能干什么用还得祭出官方文档一探究竟,官方文档很长,我们截取两句最重要的:

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. 这句话是对AbstractQueuedSynchronizer定义,翻译过来就是说:为那些想要依赖于FIFO等待队列的阻塞锁和相关的同步器(semaphores, events, 等等) 提供一个实施框架。 Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class. 这句话是用法的概括:其子类应该被定义为非共有的内部帮助器(助手类),用于实现外部类的属性同步。

说白了AbstractQueuedSynchronizer就是Java给开发人员提供一个获取锁和释放锁的模板,用来处理synchronized封锁粒度过大的问题。

它的主要功能方法如下:

Modifier and TypeMethodDescription
voidacquire(int arg)Acquires in exclusive mode, ignoring interrupts. 独占模式获取对象,忽略中断。
voidacquireShared(int arg)Acquires in shared mode, ignoring interrupts. 共享模式获取对象,忽略中断。
booleanrelease(int arg)Releases in exclusive mode.以独占模式释放对象。
booleanreleaseShared(int arg)Releases in shared mode.以共享模式释放对象。
protected booleantryAcquire(int arg)Attempts to acquire in exclusive mode.试图以独占模式获取锁,这个就是自旋的方法,一直试探
protected inttryAcquireShared(int arg)Attempts to acquire in shared mode. 试图以共享模式获取锁。

说明:共享模式下,当一个线程获取了锁,其他线程依然可读取信息。独占模式下,线程独占了锁,不许其他线程使用。

源码解析AQS

上面已经AQS原理和常用方法说完了,总要有一个地方体现吧。我们可以打开一个方法看看Java中AQS获取锁是不是和我们说的原理一致。首先进入acquire()方法。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

进入以后先判断tryAcquire(arg)这里面只有一个抛异常不多说了。然后调用acquireQueued()方法,进入。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) { //空转,自旋
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { //如果当前是头节点,继续尝试获取锁,这也就是为啥只有头节点才有意义
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
		//如果不是继续等待
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

发现这个方法已经要求传入的参数就是Node,其实就是把要等待的Node继续等待,那么返回上去,看看addWaiter()又写了啥。

private Node addWaiter(Node mode) {
	//把当前线程改造称为Node,
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

进入就发现用构建Node用的就是当前线程Thread.currentThread(),这就是线程节点的由来。

实现一个锁类

在Java的JUC包中同样为了这样一个模板提供了一个实现接口就是Lock接口,我们常用的ReentrantLock就是实现了这个接口构建出来的,不妨按照这样一个模板自己构建一个加深理解。

首先看下Lock接口里面都有什么方法:

Modifier and TypeMethodDescription
voidlock()Acquires the lock. 获取锁。
voidlockInterruptibly()Acquires the lock unless the current thread is interrupted. 获取锁,除非当前线程被中断。
ConditionnewCondition()Returns a new Condition instance that is bound to this Lock instance. 返回一个绑定Condition实例的锁。
booleantryLock()Acquires the lock only if it is free at the time of invocation. 当调用时锁为空闲,才能获取锁。尝试获取锁。
booleantryLock(long time, TimeUnit unit)Acquires the lock if it is free within the given waiting time and the current thread has not been interrupted.在给定时间内处于空闲状态且当前线程没有被中断时,才能获取锁。尝试获取锁+超时时间。
voidunlock()Releases the lock. 释放锁。

按照上面的模板,我们可以自己构建一个锁工具:

public class MyLock implements Lock {
    private Helper helper = new Helper();
    //按照官方文档所说,构建内部帮助类
    private class Helper extends AbstractQueuedSynchronizer {
        //构建尝试获取锁的方法
        @Override
        protected boolean tryAcquire(int arg) {
            int state = getState();
            if (0 == state) {
                //利用cas的原理修改state
                if (compareAndSetState(0, arg)) {
                    //设置当前线程拥有资源
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
            }//同一个线程获取被锁住的资源时,直接分配给这个线程,实现可重入性
            else if(getExclusiveOwnerThread()==Thread.currentThread()){ //删除这个else if条件,就会变为一个不可重入锁
                setState(getState()+arg);
                return true;
            }
            return false;
        }
        //构建尝试释放锁的方法
        @Override
        protected boolean tryRelease(int arg) {
            //arg是传递进来的state的期望值
            int state = getState() - arg;
            //判断释放后状态是否为0
            if (0 == state) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
            //因为需要修改的线程就是当前占有锁的线程,所以此时直接重置是没有线程安全问题的,也就是当前线程独占了资源state
            setState(state);
            return false;
        }
        public Condition newConditionObject(){
            return new ConditionObject();
        }
    }
    //加锁方法
    @Override
    public void lock() {
        helper.acquire(1); //AbstractQueuedSynchronizer原生方法
    }
    //锁中断
    @Override
    public void lockInterruptibly() throws InterruptedException {
        helper.acquireInterruptibly(1); //AbstractQueuedSynchronizer原生方法
    }
	//尝试获取锁
    @Override
    public boolean tryLock() {
        return helper.tryAcquire(1); //使用自己实现的tryAcquire()方法
    }
    //尝试加锁
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return helper.tryAcquireNanos(1,unit.toNanos(time)); //AbstractQueuedSynchronizer原生方法
    }
	//释放锁
    @Override
    public void unlock() {
        helper.tryRelease(1);  //使用自己实现的tryRelease()方法
    }
	//条件
    @Override
    public Condition newCondition() {
        return helper.newConditionObject(); //AbstractQueuedSynchronizer原生方法
    }
}

总结

到此AQS的内容告一段落。这篇博客主要讲了AQS的设计思想,以及操作同步队列的方式,同时完成了一个简单的锁帮助类,希望能够帮助大家更好的理解锁这一个同步机制,以及由AQS架构为基础的各种锁工具的内部原理。

到此这篇关于Java并发框架中的AQS详细解析的文章就介绍到这了,更多相关Java的AQS内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文