详解Java如何实现一个BlockingQueue
作者:Shawn_Shawn
前几篇文章,主要讨论了一下关于互斥的相关内容,例如synchronized、Lock、CAS、原子类,累加器等。未来的几篇文章将讨论线程同步,例如condition、信号量、CountDownLatch、CyclicBarrier等。
线程同步与线程互斥的区别
互斥与同步是多线程需要解决的两大核心问题。互斥通过互斥锁来解决问题,同步则是通过同步工具来解决。
互斥是一种间接制约关系,是指系统中的某些共享资源,一次只允许一个线程访问。当一个线程正在访问该临界资源时,其它线程必须等待。
同步,又称直接制约关系,是指多个线程(或进程)为了合作完成任务,必须严格按照规定的某种先后次序来运行。
如何实现一个BlockingQueue
如果有线程需要在某些“条件”满足后才接着后续操作,要如何实现?
例如:设计一个阻塞队列,当队列元素为空的时候,不允许读取线程读取,当队列元素满的时候,不允许写入线程写入。
要实现这样的功能,需要一个完整的等待-通知机制:线程首先获取互斥锁,当线程要求的条件不满足时,释放互斥锁,进入等待状态;当要求的条件满足时,通知等待的线程,重新获取互斥锁。
在java中可以通过这么几种方式来实现
synchronized + loop
synchronized + wait + notifyAll
Condition
Semaphore
其中1不常用,2和3都可以称之为条件变量,4为信号量,本篇文章主要讲解条件变量的实现方式。
Synchronized + Look实现BlockingQueue
public class BlockingQueueWithLoop implements Queue { private int[] elements; private int head; private int tail; private volatile int size; // 队列元素个数 public BlockingQueueWithLoop() { this(10); } public BlockingQueueWithLoop(int capacity) { this.elements = new int[capacity]; this.head = 0; this.tail = 0; this.size = 0; } @Override public void put(int e) throws InterruptedException { while (size == elements.length) {} // 使用自旋锁,等待队列不满 synchronized (this) { if(size == elements.length){ // 双重检索 return; } elements[tail] = e; tail++; if (tail == elements.length) { tail = 0; } size++; } } @Override public int take() throws InterruptedException { while (true) { while (size <= 0) {} // 使用自旋锁,等待队列不为空 synchronized (this) { // 队列不为空,需要加锁 if (size > 0) { // 双重检索 int e = elements[head]; head++; if (head == elements.length) { head = 0; } size--; return e; } } } } @Override public synchronized int size() { return size; } @Override public synchronized boolean isEmpty() { return size == 0; } }
我们通过使用自旋来等待队列满足(为空,为满)的条件。
需要注意的是,当自旋检测到队列不满足条件之后,为了保证后续操作线程安全,我们需要对其进行加锁。
在加锁之后,我们需要再次检查队列是否满足条件(为空?为满)。这有点类似线程安全单例类中的双重检测。这样做的原因是,多个线程有可能同时执行put()/take()
函数,并且同时检测到队列不满足条件,于是,它们依次获取锁然后从队列中操作数据,如果不在获取锁之后重新检测,那么就有可能导致数组访问越界或者其他未知问题。
当然,我们也无法将自旋逻辑放如synchronized代码块中,如果这样做的话,那么可能会导致死锁。
条件变量实现BlockingQueue
自旋并不会让线程进入阻塞状态。如果线程将一直执行while循环,白白浪费CPU资源,甚至会让CPU使用率达到100%。
为了减少对CPU资源的浪费,我们可以在while循环中调用sleep()函数,让线程睡眠一小段时间。但这样会导致性能下降,如果sleep一段时间,不能立刻获取到队列的状态,导致响应不及时。
所以我们需要另外一套方案来实现阻塞队列,那就是通过条件变量来解决浪费CPU资源和响应不及时这两个问题。
java对于条件变量的实现有两种:
- Object.wait()/Object.notify()/Object.notifyAll()
- ReentrantLock.Condition
Object.wait()/notifyAll()/notify()
首先java内置的条件变量,是使用Object类上的wait/notify/notifyAll方法实现的。
public class Object { public final void wait() throws InterruptedException; public final native void wait(long timeoutMillis) throws InterruptedException; public final void wait(long timeoutMillis, int nanos) throws InterruptedException public final native void notify(); public final native void notifyAll(); }
说明:
- 线程调用
wait()
,线程状态会进入WAITING
状态。 - 线程调用
wait(long timeoutMillis)
,线程状态会进入TIMED_WAITING
状态,等待时间超过了预设的超时时间。 - 其余线程调用
notify()/notifyAll()
唤醒此线程。 - 线程被中断,调用
wait()/wait(long timeout)
会抛出InterruptedException
异常。
public class BlockingQueueWithSync implements Queue { private int[] elements; private int head; private int tail; private volatile int size; // 队列元素个数 public BlockingQueueWithSync() { this(10); } public BlockingQueueWithSync(int capacity) { this.elements = new int[capacity]; this.head = 0; this.tail = 0; this.size = 0; } @Override public synchronized void put(int e) throws InterruptedException { // 当队列满的时候阻塞 while (size == elements.length) { this.wait(); } elements[tail] = e; tail++; if (tail == elements.length) { tail = 0; } size++; // 通知其他线程有数据了 this.notifyAll(); } @Override public synchronized int take() throws InterruptedException { // 当队列空的时候阻塞 while (isEmpty()) { this.wait(); } int e = elements[head]; if (++head == elements.length) { head = 0; } --size; // 通知其他线程,暂无数据 this.notifyAll(); return e; } @Override public synchronized int size() { return size; } @Override public synchronized boolean isEmpty() { return size == 0; } }
如图所示,wait()
和notify()
的工作流程如下:
需要注意的是:
wait()
和notify()
都是Object类里的方法,原则上是可以单独调用的,但是会配合状态联合调用- 在调用
wait()
和notify()
的时候,需要加锁,因为状态的检查和业务逻辑执行构成一组复合操作,如果不加锁就会出现线程不安全的问题。 - 当状态不满足条件的时候,会调用wait方法,进入等待队列等待被唤醒,此时需要释放锁,否则其他线程将无法获取到锁,也就无法更新状态。
- 当等待中的线程被唤醒时,必须再次竞争获取到锁的机会,需要再次检查状态是否满足条件。
- while循环是为了避免线程被假唤醒。
wait()
和notify()
实现原理如下:
// ObjectMonitor.cpp void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {} void ObjectMonitor::notify(TRAPS) {} void ObjectMonitor::notifyAll(TRAPS) {}
当某个线程调用wait()
函数时,线程会将自己放入_WaitSet
中,并释放持有的锁,调用park()
阻塞自己。
当其他线程调用notify()
- 如果
_EntryList
或者_cxq
不为空时,那么它会从_WaitSet
取出一个线程放入_EntryList
中,让其排队等待锁。 - 如果
_EntryList
或者_cxq
均为空时,那么它会从_WaitSet
取出一个线程直接调用这个线程的unpark()
方法取消其阻塞状态,让其去竞争锁。
当调用了wait()
的线程再次获取到锁的时候,会从wait()
中返回,继续检查状态是否满足条件,如果不满足则继续执行上述两步,如果满足了,则执行业务逻辑。
notify()
和notifyAll()
的区别在于notifyAll()
会将_WaitSet
中所有线程取出来放入_EntryList
中,让他们一起竞争锁。
ReentrantLock+Condition
public class BlockingQueueWithCondition implements Queue { private int[] elements; private int head; private int tail; private volatile int size; // 队列元素个数 private final ReentrantLock lock = new ReentrantLock(); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); public BlockingQueueWithCondition() { this(10); } public BlockingQueueWithCondition(int capacity) { this.elements = new int[capacity]; this.head = 0; this.tail = 0; this.size = 0; } @Override public void put(int e) throws InterruptedException { lock.lockInterruptibly(); try { while (size == elements.length) { notFull.await(); } elements[tail] = e; tail++; if (tail == elements.length) { tail = 0; } size++; notEmpty.signalAll(); } finally { lock.unlock(); } } @Override public int take() throws InterruptedException { lock.lockInterruptibly(); try { while (isEmpty()) { notEmpty.await(); } int e = elements[head]; if (++head == elements.length) { head = 0; } --size; notFull.signalAll(); return e; } finally { lock.unlock(); } } @Override public int size() { try { lock.lock(); return size; } finally { lock.unlock(); } } @Override public boolean isEmpty() { try { lock.lock(); return size == 0; } finally { lock.unlock(); } } }
Condition
是java SDK里提供的一种条件变量实现方式,其原理与Object#wait()
以及Object#notify()
类似
// java.util.concurrent.locks.Condition public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
Condition
里的awaitXXX(XXX)
方法基本等同于Object#wait()
,但是比Object#wait()
提供了更多的等待形式。例如:
Condition#awaitUninterruptibly()
,表示此方法执行中,不可以被中断。Condition#awaitNanos(long nanosTimeout)
,表示等待超过nanosTimeout纳秒时,函数返回,返回值为等待时间。Condition#await(long time, TimeUnit unit)
,跟awaitNanos
类似。Condition#awaitUntil(Date deadline)
,表示等待到某个时间点deadline,函数返回,返回值如果为false则表示已经超时,返回值如果为true,则表示线程被中断或者被唤醒。
Condition
里的signalXXX()
方法基本等同于Object#notify()/notifyAll()
。
Condition实现原理如下:
Condition
是一个接口,其实现类为java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject
,是AQS的一个内部类,侧面说明条件变量是需要相关的锁操作的。
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; // ... } static final class Node { volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; // 用于Condition }
通过 firstWaiter
和 lastWaiter
构建的队列称为等待队列,用来存储调用了await()
函数的线程
AQS也包含一个队列,通过head
和tail
构建的同步队列,用于存储等待锁的线程。
一个 Node 可以同时加入等待队列和同步队列。
如上图所示,Lock中的同步队列是双向链表,由于双向链表的操作复杂性,增加虚拟头节点可以有效简化操作。Condition中的等待队列是单向链表,就没有必要增加虚拟头节点的必要了。
await()
源代码如下:
public final void await() throws InterruptedException { // 检测到中断,抛异常 if (Thread.interrupted()) throw new InterruptedException(); // 将线程包裹为Node添加到Condition等待队列尾部 Node node = addConditionWaiter(); // 将state修改为0,返回释放前锁的状态 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 被意外唤醒的话需要再次挂起 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 接收到 signal,返回前需要再排队等待锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } // 加入条件等待队列 private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 加入链表末尾 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } // 意外唤醒 final boolean isOnSyncQueue(Node node) { // 进入同步队列时,waitStatus为0,且prev指向前驱节点 // 之后节点可能被取消,状态变为CANCELLED if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 存在后继节点,肯定在同步队列中 if (node.next != null) return true; // 兜底,从tail查找,确保node已经被加入同步队列 return findNodeFromTail(node); }
signal()
源代码如下:
public final void signal() { // 必须保证持有锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) // 唤醒队首线程 doSignal(first); } private void doSignal(Node first) { do { // 将first移出队列 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && // 唤醒线程 (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { // 节点状态不为CONDITION,说明已经被取消了,不进行唤醒 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false; // 将节点加入到同步队列,返回之前的队尾节点 Node p = enq(node); int ws = p.waitStatus; // 如果设置前驱节点的状态失败(如前驱已被取消)则直接唤醒线程 // 唤醒后的线程会在 `await` 中执行 `acquireQueued` 直到抢锁成功 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
以上就是详解Java如何实现一个BlockingQueue的详细内容,更多关于Java BlockingQueue的资料请关注脚本之家其它相关文章!