Java基于阻塞队列实现生产者消费者模型示例详解
作者:兜里有颗棉花糖
这篇文章主要介绍了Java基于阻塞队列实现生产者消费者模型,阻塞队列的特点就是阻塞两个字,阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度降下来,感兴趣的朋友可以参考下
一、阻塞式队列
什么是阻塞式队列(有两点):
- 第一点:当队列满的时候,如果此时入队列的话就会出现阻塞,直到其它线程从队列中取走元素为止。
- 第二点:当队列为空的时候,如果继续出队列,此时就会出现阻塞,一直阻塞到其它线程往队列中添加元素为止。
二、生产者消费者模型
什么是生产者消费者模型
生产者消费者模型是常见的多线程编程模型,可以用来解决生产者和消费者之间的数据交互问题。
阻塞队列的最主要的一个目的之一就是实现生产者消费者模型(基于阻塞队列实现),生产者消费主模型是处理多线程问题的一种方式。
生产消费者模型的优势
生产者消费主模型的优势:针对分布式系统有两个优势,一个是解耦合(耦合我们可以理解为依赖程度)、另一个是削峰填谷。
- 解耦合:生产者和消费主之间通过缓冲区进行解耦合,而不会对彼此产生直接的依赖,我们通过引入生产者消费者模型(即阻塞队列)就可以达到解耦合的效果,但是付出的代价就是效率有所降低。
- 削峰填谷:服务器接收到的来自用户端的请求数量可能会因为一些突发时间而暴增,此时服务器面临的压力就非常大了。我们要知道一台服务器承担的上限是一样的,不同的服务器所能承担的上限又是不同的。(机器的硬件资源(CPU、内存、硬盘、网络带宽等等)是有限的,而服务器每处理一个请求都需要消耗一定的资源,请求足够多直到机器的硬件资源招架不住的时候服务器也就挂了)通过引入生产消费者模型(即阻塞队列)就可以起到一个缓冲的作用,其中阻塞队列就承担了服务器的一部分压力,然后当峰值消退的时候,服务器接收到的请求就相对较少了,此时服务器由于阻塞队列的原因依然可以按照既定的顺序处理请求。
阻塞队列只是一个数据结构,如果我们把这个数据结构单独实现称了一个服务器程序,并且使用单独的主机或者主机群来进行部署的话,此时阻塞式队列就进化成了消息队列。而在Java标准库中已经实现了阻塞队列,并且实现了三种阻塞队列的实现方式:
三、生产者消费者举例代码
生产消费者模型代码如下(基于阻塞式队列):
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; // 生产消费者模型——阻塞队列 public class Demo20 { public static void main(String[] args) { // 创建一个阻塞队列来作为交易场所 BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10); Thread t1 = new Thread(() -> { int count = 0; while(true) { try { queue.put(count); System.out.println("生产元素:" + count); count++; Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread t2 = new Thread(() -> { while(true) { while(true) { try { Integer n = queue.take(); System.out.println("消费元素:" + n); } catch (InterruptedException e) { e.printStackTrace(); } } } }); t1.start(); t2.start(); } }
代码运行结果如下:
四、基于阻塞式队列实现生产者消费者模型
现在,我们自己来基于循环队列来实现阻塞式队列。注意我们这里实现的阻塞队列是基于数组、基于循环队列的阻塞队列。
我们在实现阻塞队列的时候有以下几点需要注意:
- 线程安全问题:需要给put方法和take()方法进行加锁操作。
- 经过加锁之后还需要考虑到内存可见性问题,这里就涉及到volatile关键字的使用。
- 阻塞状态以及阻塞状态的解除时机要把握好(即wait()方法和notify()方法的使用)。
- wait()方法不一定是被notify()方法唤醒的,还有可能是被interrupt()方法唤醒的:如果interrupt方法是按照try catch的形式来进行编写的,一旦interrupt方法唤醒wait方法,接着执行完catch之后,代码并不会结束而是继续往后执行,此时就会出现覆盖元素的问题。(解决方法,使用while循环不断等待和检查条件。如果不使用 while 循环在状态被满足之前不断地等待和检查条件,就有可能在 wait 方法返回之后仍然不能安全地进行操作,这可能导致程序出现异常和错误。)强烈建议使用wait方法的时候搭配while循环来判定条件
代码如下:
class MyBlockQueue { // 使用string类型的数组来保存元素,我们假设这里只存string private String[] items = new String[1000]; //head表示指向队列的头部 volatile private int head = 0; volatile private int tail = 0; volatile private int size = 0; // size表示元素个数 private Object locker = new Object(); public void put(String elem) throws InterruptedException { synchronized(locker) { while(size >= items.length) { //队列已满 locker.wait(); //return; } items[tail] = elem; tail++; if(tail >= items.length) { tail = 0; } //tail++和下面的if判断可以替换成tail = (tail + 1) % (items.length) //但是站在CPU的角度来看,其实还是简单的if判断比较快 size++; locker.notify(); // 用来唤醒队列为空的阻塞情况 } } //出队列 public String take() throws InterruptedException { synchronized(locker) { while(size == 0) { locker.wait(); } String elem = items[head]; head++; if(head >= items.length) { head = 0; } size--; //使用notify来唤醒队列阻塞满的情况 locker.notify(); return elem; } } } public class Demo21 { public static void main(String[] args) { // 创建两个线程分别表示消费者和生产者 MyBlockQueue queue = new MyBlockQueue(); Thread t1 = new Thread(() -> { int count = 0; while(true) { try { queue.put(count + ""); System.out.println("生产元素: " + count); count++; } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread t2 = new Thread(() -> { while(true) { try { String count = queue.take(); System.out.println("消费元素: " + count); Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); t2.start(); } }
以上就是Java基于阻塞队列实现生产者消费者模型示例详解的详细内容,更多关于Java阻塞队列的资料请关注脚本之家其它相关文章!