Java中的阻塞队列BlockingQueue使用详解
作者:皓亮君
这篇文章主要介绍了Java中的阻塞队列BlockingQueue使用详解,阻塞队列是一种线程安全的数据结构,用于在多线程环境下进行数据交换,它提供了一种阻塞的机制,当队列为空时,消费者线程将被阻塞,直到队列中有数据可供消费,需要的朋友可以参考下
1.BlockingQueue 简介
BlockingQuene是一个阻塞队列接口,当BlockingQueue操作无法立即响应时,有四种处理方式:
- 抛出异常;
- 返回特定的值,根据操作不同,可能是null或者false中的一个;
- 无限制的阻塞当前线程,直到操作可以成功为止;
- 根据阻塞超时设置来进行阻塞;
BlockingQueue的核心和未响应处理方式的对应形式如下:
方式 | 抛出异常 | 返回特定值 | 无限阻塞 | 超时 |
插入 | add(e) | offer (e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
查询 | element() | peek() |
2.ArrayBlockingQueue(有界队列)
ArrayBlockingQueue是基于数组实现的有界BlockingQueue,该队列满足先入先出(FIFO)的特性,当队列满时,存数据的操作会被阻塞;队列空的时候,取数据的操作会被阻塞。
/** * @Author Dominick Li * @CreateTime 2022/3/6 20:03 * @Description 消息生产者 **/ public class Product implements Runnable { private BlockingQueue<String> bq; /** * 多少秒生产一条任务 */ private int period; private Random r = new Random(); /** * 生产者名称 */ private String name; Product(BlockingQueue<String> bq, int period, String name) { this.bq = bq; this.period=period; this.name=name; } @Override public void run() { try { while (true){ Thread.sleep(period); String product=String.valueOf(r.nextInt(100)); //如果队列满了则阻塞 bq.put(product); System.out.println("生产者["+this.name+"]生产"+product+",当前队列中产品为:"+bq); } }catch (Exception e){ e.printStackTrace(); } } } /** * @Author Dominick Li * @CreateTime 2022/3/6 20:11 * @Description 消费者 **/ public class Cusumer implements Runnable { private BlockingQueue<String> bq; /** * 多少秒获取一条任务 */ private int period; /** * 消费者名称 */ private String name; Cusumer(BlockingQueue<String> bq, int period, String name) { this.bq = bq; this.period=period; this.name=name; } @Override public void run() { try { while (true){ Thread.sleep(period); String value=bq.take(); System.out.println("消费者["+this.name+"]消费"+value+",当前队列中产品为:"+bq); } }catch (Exception e){ e.printStackTrace(); } } } public class Test { public static void main(String[] args) { BlockingQueue blockingQueue = new ArrayBlockingQueue(5); ExecutorService pool = Executors.newCachedThreadPool(); pool.execute(new Product(blockingQueue, 1000, "生产者")); pool.execute(new Cusumer(blockingQueue, 5000, "消费者001")); pool.execute(new Cusumer(blockingQueue, 5000, "消费者002")); pool.shutdown(); } }
运行效果如下
3.LinkedBlockingQueue(双锁线程安全队列)
与ArrayBlockingQueue相比,LinkedBlockingQueue的重入锁被分成了两份,分别对应存值和取值,这种实现方法被称为双锁队列算法,这样的好处是读写操作的lock操作由两个锁控制,因此可以同时进程读操作和写操作,这也是LinkedBlockingQueue吞吐量超出ArrayBlockingQueue的主要原因,但是使用两个锁比一个锁复杂很多,需要考虑各种死锁的状态。 使用方法和ArrayBlockingQueue一致
public class Test { public static void main(String[] args) { LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(5); ExecutorService pool = Executors.newCachedThreadPool(); pool.execute(new Product(linkedBlockingQueue, 1000, "生产者")); pool.execute(new Cusumer(linkedBlockingQueue, 5000, "消费者1")); pool.shutdown(); } }
4.PriorityBlockingQueue(优先级队列)
优先级阻塞队列ProiorityBlockQueue不是FIFO(先入先出)队列,它要求使用者提供一个Comparetor比较器,或者队列内部元素实现Comparable接口,队头元素会是整个队列里的最小元素.
PriorityBlockQueue是用数组实现的最小堆结构,利用的原理是: 在数组实现的完全二叉树中根节点的下标为子节点的下标除以2,长度是不定的,会随着数据的增长而逐步扩容
public class PriorityProduct implements Comparable<PriorityProduct> { /** * 任务的优先级 */ private int priority; private String productName; public PriorityProduct(int priority, String productName) { this.priority = priority; this.productName = productName; } @Override public int compareTo(PriorityProduct o) { if (o == null) return -1; if (o == this) return 0; return o.priority - this.priority; } @Override public String toString(){ return "{priority="+priority+",name="+this.productName; } } public class PriorityBlockQueueProduct implements Runnable { private PriorityBlockingQueue<PriorityProduct> bq; /** * 多少秒生产一条任务 */ private int period; private Random r = new Random(); public PriorityBlockQueueProduct(PriorityBlockingQueue<PriorityProduct> bq, int period) { this.bq = bq; this.period = period; } @Override public void run() { try { while (true) { Thread.sleep(period); if(bq.size()>10){ //限制大小 continue; } //随机生成优先级5以内的 PriorityProduct priorityProduct = new PriorityProduct(r.nextInt(5), "test"); //如果队列满了则阻塞 bq.put(priorityProduct); //System.out.println("生产者商品[" +priorityProduct + "],当前队列中产品为:" + bq); } } catch (Exception e) { e.printStackTrace(); } } } public class PriorityBlockQueueCusumer implements Runnable { private PriorityBlockingQueue<PriorityProduct> bq; /** * 多少秒消费一条任务 */ private int period; public PriorityBlockQueueCusumer(PriorityBlockingQueue<PriorityProduct> bq, int period) { this.bq = bq; this.period = period; } @Override public void run() { try { while (true) { Thread.sleep(period); //如果队列满了则阻塞 PriorityProduct priorityProduct=bq.take(); System.out.println("消费产品[" +priorityProduct + "],当前队列中产品为:" + bq); } } catch (Exception e) { e.printStackTrace(); } } } public class PriorityTest { public static void main(String[] args) { PriorityBlockingQueue<PriorityProduct> priorityProducts=new PriorityBlockingQueue<>(); ExecutorService executorService= Executors.newFixedThreadPool(2); executorService.execute(new PriorityBlockQueueProduct(priorityProducts,100)); executorService.execute(new PriorityBlockQueueCusumer(priorityProducts,1000)); } }
运行结果如下,可以查看消费者在消费的时候只会消费任务队列中优先级最高的任务
到此这篇关于Java中的阻塞队列BlockingQueue使用详解的文章就介绍到这了,更多相关Java阻塞队列BlockingQueue内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!