java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java多线程 锁

Java多线程之锁的强化学习

作者:刘架构

Java多线程的锁都是基于对象的,Java中的每一个对象都可以作为一个锁。这篇文章主要来通过一下示例为大家强化一下锁的相关知识的掌握,希望对大家有所帮助

首先强调一点:Java多线程的锁都是基于对象的,Java中的每一个对象都可以作为一个锁。同时,类锁也是对象锁,类是Class对象

Java8锁

核心思想

关键字在实例方法上,锁为当前实例

关键字在静态方法上,锁为当前Class对象

关键字在代码块上,锁为括号里面的对象

在进行线程执行顺序的时候,如果添加了线程睡眠,那么就要看锁的对象是谁,同一把锁 / 非同一把锁是不一样的

Synchronized

synchronized 是Java提供的关键字,用来保证原子性的

synchronized的作用域如下

先看一段简单的代码

public class SynchronizedTest {
    public static void main(String[] args) {
        test1();
        test2();
    }

    // 使用synchronized修饰的方法
    public synchronized static void test1() {
        System.out.println("SynchronizedTest.test1");
    }

    // 使用synchronized修饰的代码块
    public static void test2() {
        synchronized (SynchronizedTest.class) {
            System.out.println("SynchronizedTest.test2");
        }
    }
}

执行之后,对其进行执行javap -v命令反编译

// 省略啰嗦的代码
public class cn.zq.sync.SynchronizedTest
  minor version: 0
  major version: 52
  flags: ACC_PUBLIC, ACC_SUPER
{
  // 源码
  public cn.zq.sync.SynchronizedTest();
    descriptor: ()V
    flags: ACC_PUBLIC
  // main 方法
  public static void main(java.lang.String[]);
    descriptor: ([Ljava/lang/String;)V
    flags: ACC_PUBLIC, ACC_STATIC

  // synchronized 修饰的静态方法 test1()
  public static synchronized void test1();
    descriptor: ()V
    // 在这里我们可以看到 flags 中有一个 ACC_SYNCHRONIZED
    // 这个就是一个标记符这是 保证原子性的关键
    // 当方法调用的时候,调用指令将会检查方法的 ACC_SYNCHRONIZED 访问标记符是否被设置
    // 如果设置了,线程将先获取 monitor,获取成功之后才会执行方法体,方法执行之后,释放monitor
    // 在方法执行期间,其他任何线程都无法在获得一个 monitor 对象,本质上没区别。
    flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
    Code:
      stack=2, locals=0, args_size=0
         0: getstatic     #4                  // Field java/lang/System.out:Ljava/io/PrintStream;
         3: ldc           #5                  // String SynchronizedTest.test1
         5: invokevirtual #6                  // Method java/io/PrintStream.println:(Ljava/lang/String;)V
         8: return
      LineNumberTable:
        line 17: 0
        line 18: 8

  // 代码块使用的 synchronized
  public static void test2();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_STATIC
    Code:
      stack=2, locals=2, args_size=0
         0: ldc           #7                  // class cn/zq/sync/SynchronizedTest
         2: dup
         3: astore_0
         // 这个 monitorenter 是一个指令
         // 每个对象都有一个监视器锁(monitor),当monitor被占用的时候就会处于锁定状态
         // 线程执行monitorenter的时候,尝试获取monitor的锁。过程如下
         // 1.任何monitor进入数为0,则线程进入并设置为1,此线程就是monitor的拥有者
         // 2.如果线程已经占用,当前线程再次进入的时候,会将monitor的次数+1
         // 3.如何其他的线程已经占用了monitor,则线程进阻塞状态,直到monitor的进入数为0
         // 4.此时其他线程才能获取当前代码块的执行权
         4: monitorenter
         5: getstatic     #4                  // Field java/lang/System.out:Ljava/io/PrintStream;
         8: ldc           #8                  // String SynchronizedTest.test2
        10: invokevirtual #6                  // Method java/io/PrintStream.println:(Ljava/lang/String;)V
        13: aload_0
        // 执行monitorexit这条指令的线程必须是拥有monitor的
        // 执行的之后,monitor的进入数-1.如果为0,那么线程就退出 monitor,不再是此代码块的执行者
        // 此时再由其他的线程获得所有权
        // 其实 wait/notify 等方法也依赖于monitor对象,
        // 所以只有在同步方法或者同步代码块中才可以使用,否则会报错 java.lang.IllegalMonitorstateException 异常
        14: monitorexit
        15: goto          23
        18: astore_1
        19: aload_0
        20: monitorexit
        21: aload_1
        22: athrow
        23: return
      Exception table:
         from    to  target type
             5    15    18   any
            18    21    18   any
      LineNumberTable:
        line 21: 0
        line 22: 5
        line 23: 13
        line 24: 23
      StackMapTable: number_of_entries = 2
        frame_type = 255 /* full_frame */
          offset_delta = 18
          locals = [ class java/lang/Object ]
          stack = [ class java/lang/Throwable ]
        frame_type = 250 /* chop */
          offset_delta = 4
}
SourceFile: "SynchronizedTest.java"

总结:

使用synchronized修饰的同步方法

使用synchronized修饰的代码块

这个时候我们会发现synchronized是可重入锁,其实现原理就是monitor的个数增加和减少

同时wait / notify方法的执行也会依赖 monitor,所以wait和notify方法必须放在同步代码块中,否则会报错 java.lang.IllegalMonitorstateException

因为方法区域很大,所以设置一个标记,现在执行完判断之后,就全部锁起来,而代码块不确定大小,就需要细化monitor的范围

ReentrantLock

ReentrantLock是Lock接口的一个实现类

在ReentrantLock内部有一个抽象静态内部类Sync

其中一个是 NonfairSync(非公平锁),另外一个是 FairSync (公平锁),二者都实现了此抽象内部类Sync,ReentrantLock默认使用的是 非公平锁 ,我们看一下源码:

public class ReentrantLock implements Lock, java.io.Serializable {

    // 锁的类型
    private final Sync sync;
    
    // 抽象静态类Sync继承了AbstractQueueSynchroniser [这个在下面进行解释]
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        // 抽象加锁方法
        abstract void lock();

        // 不公平的 tryLock 也就是不公平的尝试获取
        final boolean nonfairTryAcquire(int acquires) {
            // 获取当前线程对象
            final Thread current = Thread.currentThread();
            // 获取线程的状态
            int c = getState();
            // 根据线程的不同状态执行不同的逻辑
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 获取独占模式的线程的当前锁的状态
            else if (current == getExclusiveOwnerThread()) {
                // 获取新的层级大小
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 设置锁的状态
                setState(nextc);
                return true;
            }
            return false;
        }

        // 尝试释放方法
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        // 返回当前线程是不是独占的
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        // 返回 ConditionObject 对象
        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // 获得独占的线程
        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        // 获得独占线程的状态
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        // 判断是否是加锁的
        final boolean isLocked() {
            return getState() != 0;
        }

        // 序列化
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); 
        }
    }

    // 非公平锁继承了Sync
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        // 加锁操作
        final void lock() {
            // 判断是不是第一次加锁 底层调用 Unsafe的compareAndSwapInt()方法
            if (compareAndSetState(0, 1))
                // 设置为独占锁
                setExclusiveOwnerThread(Thread.currentThread());
            // 如果不是第一次加锁,则调用 acquire 方法在加一层锁
            else
                acquire(1);
        }

        // 返回尝试加锁是否成功
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
    // 公平锁
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        // 加锁操作,直接设置为1
        final void lock() {
            acquire(1);
        }

        // 尝试加锁
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
}

Lock接口

public interface Lock {
    // 加锁
    void lock();
    // 不断加锁
    void lockInterruptibly() throws InterruptedException;
    // 尝试加锁
    boolean tryLock();
    // 尝试加锁,具有超时时间
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    // 释放锁
    void unlock();
    // Condition 对象
    Condition newCondition();
}

Condition接口

public interface Condition {
    // 等待
    void await() throws InterruptedException;
    // 超时等待
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    // 超时纳秒等待
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    // 可中断等待
    void awaitUninterruptibly();
    // 等待死亡
    boolean awaitUntil(Date deadline) throws InterruptedException;
    // 指定唤醒
    void signal();
    // 唤醒所有
    void signalAll();
}

为什么官方提供的是非公平锁,因为如果是公平锁,假如一个线程需要执行很久,那执行效率会大大降低

ReentrantLock的其他方法

public class ReentrantLock implements Lock, java.io.Serializable {

    // 锁的类型
    private final Sync sync;

    // 默认是非公平锁
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    // 有参构造,可以设置锁的类型
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    // 加锁
    public void lock() {
        sync.lock();
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    // 解锁 调用release() 因为是重入锁,所以需要减少重入的层数
    public void unlock() {
        sync.release(1);
    }

    // 返回Condition对象 ,用来执行线程的唤醒等待等操作
    public Condition newCondition() {
        return sync.newCondition();
    }

    // 获取锁的层数
    public int getHoldCount() {
        return sync.getHoldCount();
    }

    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }
    // 是否加锁
    public boolean isLocked() {
        return sync.isLocked();
    }
    // 是否是公平锁
    public final boolean isFair() {
        return sync instanceof FairSync;
    }
    // 获取独占锁
    protected Thread getOwner() {
        return sync.getOwner();
    }
    // 查询是否有任何线程正在等待获取此锁
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }
    // 查询给定线程是否正在等待获取此锁
    public final boolean hasQueuedThread(Thread thread) {
        return sync.isQueued(thread);
    }
    // 获取队列的长度
    public final int getQueueLength() {
        return sync.getQueueLength();
    }
    // 返回一个包含可能正在等待获取该锁的线程的集合
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

    // 判断是否等待
    public boolean hasWaiters(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
    }
    
    // 获得等待队列的长度
    public int getWaitQueueLength(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    // 获取正在等待的线程集合
    protected Collection<Thread> getWaitingThreads(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
    }
    
    // toString()
    public String toString() {
        Thread o = sync.getOwner();
        return super.toString() + ((o == null) ?
                                   "[Unlocked]" :
                                   "[Locked by thread " + o.getName() + "]");
    }
}

总结:

1.ReentrantLock是独占锁

2.ReentrantLock是可重入锁

3.底层使用AbstractQueuedSynchronizer实现

4.synchronized 和 ReentrantLock的区别

AQS

AQS:AbstractQueueSynchronizer => 抽象队列同步器

AQS定义了一套多线程访问共享资源的同步器框架,很多同步器的实现都依赖AQS。如ReentrantLock、Semaphore、CountDownLatch …

首先看一下AQS队列的框架

它维护了一个volatile int state (代表共享资源)和一个FIFO线程等待队列(多线程争抢资源被阻塞的时候会先进进入此队列),这里的volatile是核心。在下个部分进行讲解~

state的访问方式有三种

AQS定义了两种资源共享方式:Exclusive(独占,只有一个线程可以执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore、CountdownLatch)

不同的自定义同步器争用共享资源的方式也不同。自定义的同步器在实现的时候只需要实现共享资源的获取和释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队)AQS在顶层已经实现好了。

自定义同步器时需要实现以下方法即可

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁,然后将state+1,此后其他线程在调用tryAcquire()就会失败,直到A线程unlock()到state为0为止,其他线程才有机会获取该锁。当前在A释放锁之前,A线程是可以重复获取此锁的(state)会累加。这就是可重入,但是获取多少次,就要释放多少次。

再和CountdownLock为例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程的个数一致)。这N个子线程是并行执行的,每个子线程执行完之后countDown一次。state会CAS-1。等到所有的子线程都执行完后(即state=0),会upark()主调用线程,然后主调用线程就会从await()函数返回,继续剩余动作

一般来说,自定义同步器要么是独占方法,要么是共享方式,也只需要实现tryAcquire - tryRelease,tryAcquireShared - tryReleaseShared 中的一组即可,但是AQS也支持自定义同步器同时实现独占锁和共享锁两种方式,如:ReentrantReadWriteLock

AQS的源码

AbstractQueueSynchronizer 继承了 AbstractOwnableSynchronizer

AbstractOwnableSynchronizer类

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }

    // 独占模式当前的拥有者
    private transient Thread exclusiveOwnerThread;

    // 设置独占模式当前的拥有者
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    // 得到独占模式当前的拥有者
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

AbstractQueueSynchronizer类

public abstract class AbstractQueuedSynchronizer    extends AbstractOwnableSynchronizer implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691L;

    protected AbstractQueuedSynchronizer() { }

    // AbstractQueueSynchronizer 中的静态内部类 Node 节点
    static final class Node {

        // 指示节点正在以共享模式等待的标记
        static final Node SHARED = new Node();

        // 指示节点正在以独占模式等待的标记
        static final Node EXCLUSIVE = null;

        // 表示线程已经取消
        static final int CANCELLED =  1;

        // 表示线程之后需要释放
        static final int SIGNAL    = -1;

        // 表示线程正在等待条件
        static final int CONDITION = -2;

        // 指示下一个 acquireShared 应该无条件传播
        static final int PROPAGATE = -3;

        // 状态标记
        volatile int waitStatus;

        // 队列的前一个节点
        volatile Node prev;

        // 队列的后一个节点
        volatile Node next;

        // 线程
        volatile Thread thread;

        // 下一个正在等待的节点
        Node nextWaiter;

        // 判断是否时共享的
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        // 返回上一个节点,不能为null,为null抛出空指针异常
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        // 构造
        Node() {    // Used to establish initial head or SHARED marker
        }

        // 有参构造,用来添加线程的队列
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        // 有参构造,根据等待条件使用
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

    // 头节点
    private transient volatile Node head;

    // 尾节点
    private transient volatile Node tail;
    // 状态
    private volatile int state;

    // 获取当前的状态
    protected final int getState() {
        return state;
    }

    //设置当前的状态
    protected final void setState(int newState) {
        state = newState;
    }

    // 比较设置当前的状态
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    // 纳秒数,使之更快的旋转
    static final long spinForTimeoutThreshold = 1000L;

    // 将节点插入队列
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    // 加一个等待节点
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    // 设置头节点
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

    // 如果存在后继节点,就唤醒
    private void unparkSuccessor(Node node) {
        // 获得节点的状态
        int ws = node.waitStatus;
        // 如果为负数,就执行比较并设置方法设置状态
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 唤醒后面的节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

    // 共享模式的释放动作,并且向后继节点发出信号
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

    // 设置队列的头,并检查后继者能否在共享模式下等待,如果可以,就是否传播设置为>0或者propagate状态
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);

        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

    // 取消正在进行的尝试
    private void cancelAcquire(Node node) {
        // 节点为null,直接返回
        if (node == null)
            return;

        node.thread = null;

        // 跳过已经取消的前一个节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        Node predNext = pred.next;

        node.waitStatus = Node.CANCELLED;

        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

    // 还有好多方法... 其实本质就是基于 队列的判断和操作,AQS提供了独占锁和共享锁的设计
    // 在AQS中,使用到了Unsafe类,所以AQS其实就是基于CAS算法的,
    // AQS的一些方法就是直接调用 Unsafe 的方法 如下所示

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }

    // 比较并设置头
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    // 比较并设置尾
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    // 比较并设置状态
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }

    // 比较并设置下一个节点
    private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

    // 除此之外 AQS 还有一个实现了Condition的类 如下
    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;

        // 条件队列的第一个节点
        private transient Node firstWaiter;

        // 条件队列的最后一个节点
        private transient Node lastWaiter;

        public ConditionObject() { }

        // 在等待队列中添加一个新的节点
        private Node addConditionWaiter() {
            // 获取最后一个节点
            Node t = lastWaiter;
            // 如果最后一个节点被取消了,就清除它
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
              return node;
        }

        // 删除并转移节点直到它没有取消或者不为null
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

        // 删除所有的节点
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

        // 取消节点的连接
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

        // 将等待最长的线程,唤醒
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

        // 唤醒所有的等待线程
        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }

        // 实现不间断的条件等待
        public final void awaitUninterruptibly() {
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if (Thread.interrupted())
                    interrupted = true;
            }
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        }

        // 模式意味着在退出等待时重新中断
        private static final int REINTERRUPT =  1;

        // 模式的含义是在退出等待时抛出InterruptedException异常
        private static final int THROW_IE    = -1;

        // 检查中断,如果在信号通知之前被中断,则返回THROW_IE;
        // 如果在信号通知之后,则返回REINTERRUPT;如果未被中断,则返回 0
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
        }

        // 抛出InterruptedException,重新中断当前线程,
        // 或不执行任何操作,具体取决于模式。
        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

        // 实现不可中断的条件等待
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

        // 纳秒级别的等待
        public final long awaitNanos(long nanosTimeout)
            throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (nanosTimeout <= 0L) {
                    transferAfterCancelledWait(node);
                    break;
                }
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return deadline - System.nanoTime();
        }

        // 绝对定时等待
        public final boolean awaitUntil(Date deadline)
            throws InterruptedException {
            long abstime = deadline.getTime();
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean timedout = false;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (System.currentTimeMillis() > abstime) {
                    timedout = transferAfterCancelledWait(node);
                    break;
                }
                LockSupport.parkUntil(this, abstime);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }

        // 超时等待
        public final boolean await(long time, TimeUnit unit)
            throws InterruptedException {
            long nanosTimeout = unit.toNanos(time);
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            boolean timedout = false;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (nanosTimeout <= 0L) {
                    timedout = transferAfterCancelledWait(node);
                    break;
                }
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }

        // 判断是不是独占的
        final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
            return sync == AbstractQueuedSynchronizer.this;
        }

        // 返回是否有正在等待的
        protected final boolean hasWaiters() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION)
                    return true;
            }
            return false;
        }

        // 获得等待队列的长度
        protected final int getWaitQueueLength() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int n = 0;
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION)
                    ++n;
            }
            return n;
        }

        // 获取所有正在等待的线程集合
        protected final Collection<Thread> getWaitingThreads() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            ArrayList<Thread> list = new ArrayList<Thread>();
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION) {
                    Thread t = w.thread;
                    if (t != null)
                        list.add(t);
                }
            }
            return list;
        }
    }
}

总结:

1.AQS为我们提供了很多实现。AQS内部有两个内部类,ConditionObject和Node节点

2.和开头说的一样,其维护了一个state和一个队列,也提供了独占和共享的实现

3.总结一下流程

4.release() 是独占模式下线程共享资源的底层入口,它会释放指定量的资源,如果彻底释放了(state = 0)

5.如果获取锁的线程在release时异常了,没有unpark队列中的其他结点,这时队列中的其他结点会怎么办?是不是没法再被唤醒了?

这时,队列中等待锁的线程将永远处于park状态,无法再被唤醒!

6.获取锁的线程在什么情形下会release抛出异常呢 ?

7.acquireShared()的流程

8.releaseShared()

释放掉资源之后,唤醒和后继

7.不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

volatile

volatile是Java提供的关键字,是轻量级的同步机制 JSR133提出,Java5增强了语义

volatile关键字有三个重要的特点

提到volatile,就要提到JMM - 什么是JMM

JMM:Java Memory Model

本身就是一种抽象的概念,并不真实存在,它描述的是一组规范和规则,通过这种规则定义了程序的各个变量(包括实例字段、静态字段、和构造数组对象的元素)的访问方式

JMM关于同步的规定

happens-before 规则

前一个操作对下一个操作是完全可见的,如果下一个操作对下下一个操作完全可见,那么前一个操作也对下下个操作可见

重排序

JVM对指令的执行,会进行优化重新排序,可以发生在编译重排序、CPU重排序

什么是内存屏障?

内存屏障分为2种

内存屏障的作用

编译器生成字节码的时候,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。编译器选择了一个比较保守的JMM内存屏障插入策略,这样就可以保证在任何处理器平台,任何程序中都有正确的volatile语义

原子性

如何解决?

有序性

1.计算机在执行程序的时候,为了提高性能,编译器和处理器通常会对指令重排序,一般分为3种-

2.指令重排序

多线程环境种线程交替执行,由于编译器优化重排序的存在,两个线程中使用的变量能否保证一致性是无法确定的,结果无法预测此时使用volatile禁用指令重排序,就可以解决这个问题

volatile的使用

单例设计模式中的 安全的双重检查锁

volatile的底层实现

根据JMM,所有线程拿到的都是主内存的副本,然后存储到各自线程的空间,当某一线程修改之后,立即修改主内存,然后主内存通知其他线程修改

Java代码 instance = new Singleton();//instance 是 volatile 变量 汇编代码:0x01a3de1d: movb $0x0,0x1104800(%esi);0x01a3de24: lock addl $0x0,(%esp); 有 volatile 变量修饰的共享变量进行写操作的时候会多第二行汇编代码,通过查 IA-32 架构软件开发者手册可知,lock 前缀的指令在多核处理器下会引发了两件事情。将当前处理器缓存行的数据会写回到系统内存。这个写回内存的操作会引起在其他 CPU 里缓存了该内存地址的数据无效。

如果对声明了volatile变量进行写操作,JVM就会向处理器发送一条Lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存。但是就算写回到内存,如果其他处理器缓存的值还是旧的,再执行计算操作就会有问题,所以在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器要对这个数据进行修改操作的时候,会强制重新从系统内存里把数据读到处理器缓存里。

自旋锁 ,自旋锁的其他种类

CAS 自旋锁

我们可以实现通过手写代码完成CAS自旋锁

CAS包括三个操作数

如果内存位置的值与期望值匹配,那么处理器会自动将该位置的值设置为新值,否则不做改变。无论是哪种情况,都会在CAS指令之前返回该位置的值。

public class Demo {
    volatile static int count = 0;

    public static void request() throws Exception {
        TimeUnit.MILLISECONDS.sleep(5);
        // 表示期望值
        int expectedCount;
        while (!compareAndSwap(expectedCount = getCount(), expectedCount + 1)) {
        }
    }

    public static synchronized boolean compareAndSwap(int expectedCount, int newValue) {
        if (expectedCount == getCount()) {
            count = newValue;
            return true;
        }
        return false;
    }

    public static int getCount() {
        return count;
    }

    public static void main(String[] args) throws Exception {
        long start = System.currentTimeMillis();
        int threadSize = 100;
        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
        for (int i = 0; i < threadSize; i++) {
            new Thread(() -> {
                try {
                    for (int j = 0; j < 10; j++) {
                        request();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }).start();
        }
        countDownLatch.await();
        long end = System.currentTimeMillis();
        System.out.println("count :" + count + " 耗时:" + (end - start));
    }
}

上述是我们自己书写的CAS自旋锁,但是JDK已经提供了响应的方法

Java提供了 CAS 的支持,在 sun.misc.Unsafe 类中,如下

public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

参数说明

CAS的实现原理

CAS通过调用JNI的代码实现,JNI:Java Native Interface ,允许Java调用其他语言

而CompareAndSwapXxx系列的方法就是借助“C语言”CPU底层指令实现的

以常用的 Inter x86来说,最后映射到CPU的指令为“cmpxchg”,这个是一个原子指令,CPU执行此命令的时候,实现比较并替换的操作

cmpxchg 如何保证多核心下的线程安全

系统底层进行CAS操作的时候,会判断当前操作系统是否为多核心,如果是,就给“总线”加锁,只有一个线程对总线加锁,保证只有一个线程进行操作,加锁之后会执行CAS操作,也就是说CAS的原子性是平台级别的

CAS这么强,有没有什么问题?

高并发情况下,CAS会一直重试,会损耗性能

CAS的ABA问题

CAS需要在操作值得时候检查下值有没有变化,如果没有发生变化就更新,但是如果原来一个值为A,经过一轮的操作之后,变成了B,然后又是一轮的操作,又变成了A,此时这个位置有没有发生改变?改变了的,因为不是一直是A,这就是ABA问题

如何解决ABA问题?

解决ABA问题就是给值增加一个修改版本号,每次值的变化,都会修改它的版本号,CAS在操作的时候都会去对比此版本号。

下面给出一个ABA的案例

public class CasAbaDemo {
    public static AtomicInteger a = new AtomicInteger(1);

    public static void main(String[] args) {
        Thread main = new Thread(() -> {
            System.out.println("CasAbaDemo.main " + Thread.currentThread().getName() + ",初始值 " + a.get());
            try {
                int executedNum = a.get();
                int newNum = executedNum + 1;
                TimeUnit.SECONDS.sleep(3);
                boolean isCasSuccess = a.compareAndSet(executedNum, newNum);
                System.out.println(Thread.currentThread().getName() + ",CAS 操作:" + isCasSuccess);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "主线程");

        Thread thread = new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                a.incrementAndGet();
                System.out.println(Thread.currentThread().getName() + ",incrementAndGet,之后" + a.get());
                a.decrementAndGet();
                System.out.println(Thread.currentThread().getName() + ",decrementAndGet,之后" + a.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "干扰线程");

        main.start();
        thread.start();
    }
}

Java中ABA解决办法(AtomicStampedReference)

AtomicStampedReference 主要包含一个引用对象以及一个自动更新的整数 “stamp”的pair对象来解决ABA问题

public class AtomicStampedReference<V> {

    private static class Pair<T> {
        // 数据引用
        final T reference;
        // 版本号
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }

    private volatile Pair<V> pair;

    /**
     * 期望引用
     * @param expectedReference the expected value of the reference
     * 新值引用
     * @param newReference the new value for the reference
     * 期望引用的版本号
     * @param expectedStamp the expected value of the stamp
     * 新值的版本号
     * @param newStamp the new value for the stamp
     * @return {@code true} if successful
     */
    public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            // 期望引用与当前引用一致
            expectedReference == current.reference &&
            // 期望版本与当前版本一致
            expectedStamp == current.stamp &&
            // 数据一致
            ((newReference == current.reference &&
              newStamp == current.stamp) 
             ||
             // 数据不一致
             casPair(current, Pair.of(newReference, newStamp)));
    }   
}

修改之后完成ABA问题

public class CasAbaDemo02 {
    public static AtomicStampedReference<Integer> a = new AtomicStampedReference(new Integer(1), 1);

    public static void main(String[] args) {
        Thread main = new Thread(() -> {
            System.out.println("CasAbaDemo.main " + Thread.currentThread().getName() + ",初始值 " + a.getReference());
            try {
                Integer executedReference = a.getReference();
                Integer newReference = executedReference + 1;
                Integer expectStamp = a.getStamp();
                Integer newStamp = expectStamp + 1;
                TimeUnit.SECONDS.sleep(3);
                boolean isCasSuccess = a.compareAndSet(executedReference, newReference, expectStamp, newStamp);
                System.out.println(Thread.currentThread().getName() + ",CAS 操作:" + isCasSuccess);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "主线程");

        Thread thread = new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                a.compareAndSet(a.getReference(), a.getReference() + 1, a.getStamp(), a.getStamp() + 1);
                System.out.println(Thread.currentThread().getName() + ",incrementAndGet,之后" + a.getReference());
                a.compareAndSet(a.getReference(), a.getReference() - 1, a.getStamp(), a.getStamp() - 1);
                System.out.println(Thread.currentThread().getName() + ",decrementAndGet,之后" + a.getReference());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "干扰线程");

        main.start();
        thread.start();
    }
}

到此这篇关于Java多线程之锁的强化学习的文章就介绍到这了,更多相关Java多线程 锁内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文