一文探究ArrayBlockQueue函数及应用场景
作者:好学的康达姆机器人
开篇语
队列在生活中随处可见,医院缴费需要排队、做核酸需要排队、汽车等红绿灯需要排队等等。
队列是一个按照先来到就排在前面,后来到排在后面的数据结构,并且出队的时候也是按照先来到先出队。使用数组和链表进行实现。通常用于协调任务的执行和数据的交换。
介绍
ArrayBlockingQueue 是一个有界阻塞队列,有界指的是队列存在一个最大容量;阻塞指的是如果队列已经满了,想要往队列继续添加元素的话,那么这个操作将会被暂停,直到队列中有空位才会继续完成添加操作。如果队列已经为空,想要从队列中获取元素,那么这个操作将会被暂停,直接队列中存在元素才会继续完成获取操作。
它具有线程安全、性能好、公平锁选项的特点:
- 线程安全:使用锁和条件变量实现线程安全,无需额外的同步措施。
- 阻塞操作:当队列满时,插入操作阻塞;当队列空时,删除操作阻塞。这有助于避免忙等待和减少无意义的资源消耗。
- 公平锁选项:支持是否使用公平锁。避免锁饥饿。
- 高性能:基于数组实现,内存连续分配,访问性能较高。
但是同时也存在不灵活、无法支撑高并发的缺点
- 有界性:队列的容量固定,不可动态改变。因此在创建时分配多大容量将成为关键,分配过多会造成资源浪费,分配过少会造成竞争激烈。
- 锁竞争:在高并发情况下,锁竞争可能会导致性能下降。
实现原理
ArrayBlockingQueue 内部使用数组作为元素的存储结构。
执行存取操作时,都必须先获取锁,才可以执行存取操作,这就保证ArrayBlockingQueue 是线程安全。
ArrayBlockingQueue 通过两个 Condition 条件队列,一个 notFull 条件,一个 notEmpty 条件。在对队列进行插入元素操作时,判断当前队列已经满,则通过 notFull 条件将线程阻塞,直到其他线程通知该线程队列可以继续插入元素。在对队列进行移除元素操作时,判断当前队列已经空,则通过 notEmpty 条件阻塞线程,直到其他线程通过该线程可以继续获取元素。
这样保证线程的存取操作不会出现错误。避免队列在满时,丢弃插入的元素;也避免在队列空时取到一个 null 值。
构造函数
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
构造函数中,需要指定队列的容量和是否使用公平锁。并且创建了两个 Condition 条件队列,分别命名为 notEmpty 和 notFull,这两个条件队列是实现阻塞的关键。
通过构造函数我们可以知道为什么它叫有界:因为创建数组时,需要指定数组的容量,并且数组容量不能在运行中动态扩大。所以队列的容量是有边界的,不是无限扩张的。
插入函数
public void put(E e) throws InterruptedException { Objects.requireNonNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
- 获取锁
- 判断当前队列是否已经满了
- 如果队列1已经满了,调用 notFull 条件队列的 await() 方法,将该线程阻塞,暂停该线程的插入操作。避免内部溢出的问题。
- 如果没有满,则直接调用入队函数 enqueue 插入到队列末尾。
- 解锁
获取函数
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
- 获取锁
- 判断当前队列是否为空
- 如果队列没有元素,调用 notEmpty 条件队列的 await() 方法,将该线程阻塞,暂停该线程的获取操作。避免获取元素出错。
- 如果不为空,则直接调用出队函数 dequeue 移除队列第一个元素,并返回给客户端。
- 释放锁
入队函数
private void enqueue(E e) { final Object[] items = this.items; items[putIndex] = e; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
将元素插入到队列的尾部,在完成插入操作之后会调用 notEmpty 对象的 signal 方法,告诉 notEmpty 阻塞队列,现在队列中已经有元素,之前因为队列没有元素而被阻塞的线程,现在可以来获取元素了。
内部维护一个 putIndex,用于表示下一个将要插入元素的坐标。当 putIndex 等于数组长度时,将会重置为 0。putIndex 是一个从 0 - length 循环使用的坐标。
维护一个 count 变量,用于表示队列中存在多少元素,在存入的时候增加,在取出的时候减少。
出队函数
private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E e = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return e; }
将队列的第一个元素移除,并返回给客户端。在完成移除操作之后会调用 notFull 对象的 signal 方法,告诉 notFull 阻塞队列,现在队列中已经有空位了,之前因为队列没有空位而被阻塞的线程,现在可以继续插入元素。
内部维护一个 takeIndex,用于表示下一个可以获取元素的坐标。当 takeIndex 等于数组长度时,将会重置为 0。takeIndex 是一个从 0 至数组长度之间循环使用的坐标。
应用场景
适用场景
ArrayBlockingQueue 适用于多个线程之间需要共享数据、协调任务执行的场景。因此可以总结出以下几个应用场景:
- 线程池:线程池是一个常见的并发编程模型,它通过线程池中的线程执行任务。并且可以重复使用这些线程。在线程池中,可以使用 ArrayBlockingQueue 来存储需要执行的任务,以此控制任务数量和执行顺序。当线程池中的线程执行完任务之后,可以从 ArrayBlockingQueue 中取出下一个任务执行。
- 生产者-消费者:在生产者-消费者模型中,生产者负责生产数据,消费者负责对数据进行处理。在这种模式下,ArrayBlockingQueue 可以作为生产者与消费者之间的数据通道,保证线程安全和数据正确。
实际应用场景
- Apache Tomcat Apache Tomcat 是一个流行的 Java Web 应用服务器,它使用 ArrayBlockingQueue 来实现内部的请求队列。当请求到达 Tomcat 时,它们被放入一个 ArrayBlockingQueue 中,并由工作线程从队列中取出并处理请求。
- Netty Netty 是一个高性能的网络编程框架,它使用 ArrayBlockingQueue 来实现内部的事件队列。当有新的网络事件到达时,它们被放入一个 ArrayBlockingQueue 中,并由 IO 线程从队列中取出并处理事件。
总结
ArrayBlockingQueue 是一个固定容量,并且采用阻塞方式的队列。内部采用锁和条件队列保证了线程安全性。支持公平锁选项。但是因为采用阻塞机制且容量有限,无法很好满足高并发需求。
以上就是一文探究ArrayBlockQueue函数及应用场景的详细内容,更多关于ArrayBlockQueue函数应用场景的资料请关注脚本之家其它相关文章!