Java 阻塞队列和线程池原理分析
作者:高、远
【1】阻塞队列
一、什么是阻塞队列?
① 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
② 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序整体处理数据的速度。
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。
为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。生产者和消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
在Android开发中阻塞队列也是常见的 —— Handler机制中的MessageQueue就是优先级阻塞队列
二、阻塞队列有什么用?
解耦 在生产者和消费者之间解除了耦合
平衡两者性能差异 平衡了生产者消费者之间的性能差异
三、阻塞队列的简单实用
①常见的阻塞队列主要有那些?
常见的阻塞队列主要有一下7中:
ArrayBlockingQueue
LinkedBlockingQueue
PriorityBlockingQueue
DelayQueue
SynchronousQueue
LinkedTransferQueue
LinkedBlockingDeque
②阻塞队列常见的几种处理方式(并非所有方式都阻塞)
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
- 其中只有put和take方法时阻塞的
- 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException(“Queuefull”)异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。
- -返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。
- 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
- 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。
③阻塞队列简单使用
- 三个线程添加数据
- 三个线程消费数据
public class MyBlockingQueue { static ArrayBlockingQueue<String> abq = new ArrayBlockingQueue(3); public static void main(String[] args) { // 生产者线程 for (int i = 0; i < 3; i++) { new Thread(() -> producer(), "producerThread" + i).start(); } // 消费者线程 for (int i = 0; i < 3; i++) { new Thread(() -> consumer(), "consumerThread" + i).start(); } } private static void consumer() { while (true) { try { String msg = abq.take(); System.out.println(Thread.currentThread().getName() + " ->receive msg:" + msg); } catch (InterruptedException e) { e.printStackTrace(); } } } private static void producer() { for (int i = 0; i < 100; i++) { try { abq.put("[" + i + "]"); System.out.println(Thread.currentThread().getName() + " ->send msg:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } } }
执行结果:
producerThread1 ->send msg:0
producerThread2 ->send msg:0
producerThread0 ->send msg:0
consumerThread1 ->receive msg:[0]
producerThread1 ->send msg:1
consumerThread2 ->receive msg:[0]
producerThread1 ->send msg:2
producerThread2 ->send msg:1
consumerThread1 ->receive msg:[0]
consumerThread0 ->receive msg:[1]
...
【2】Java 线程池
一、我们为什么需要Java 线程池?使用它的好处是什么?
①降低资源消耗。
通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
②提高响应速度。
通常我们在Java程序中执行一个任务的到结果分为以下步骤:
1.创建线程 ——> 2.执行任务 ——> 3.销毁线程
当任务到达时,任务可以不需要等到线程创建就能立即执行。假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
③提高线程的可管理性。
线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
二、Java中主要提供了哪几种线程的线程池?
Java中主要提供了一下4中线程池:
1、newCachedThreadPool:用来创建一个可以无限扩大的线程池,适用于负载较轻的场景,执行短期异步任务。(可以使得任务快速得到执行,因为任务时间执行短,可以很快结束,也不会造成cpu过度切换)
2、newFixedThreadPool:创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于负载较重的场景,对当前线程数量进行限制。(保证线程数可控,不会造成线程过多,导致系统负载更为严重)
3、newSingleThreadExecutor:创建一个单线程的线程池,适用于需要保证顺序执行各个任务。
4、newScheduledThreadPool:适用于执行延时或者周期性任务。
三、线程类的继承关系
ThreadPoolExecutor
的类关系Executor
是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。ExecutorService
接口继承了Executor,在其上做了一些shutdown()、submit()的扩展,可以说是真正的线程池接口;AbstractExecutorService
抽象类实现了ExecutorService接口中的大部分方法;ThreadPoolExecutor
是线程池的核心实现类,用来执行被提交的任务。ScheduledExecutorService
接口继承了ExecutorService接口,提供了带"周期执行"功能ExecutorService;ScheduledThreadPoolExecutor
是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
Executor——>ExecutorService——>AbstractExecutorService——>ThreadPoolExecutor
二中常用的几种线程池都是源自ThreadPoolExecutor,所以我们来分析一下这个类
四、ThreadPoolExecutor参数的含义 corePoolSize
corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize
keepAliveTime
线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于corePoolSize时才有用
TimeUnit
keepAliveTime的时间单位
workQueue
workQueue必须是BlockingQueue阻塞队列。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能。
一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。
1)当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize。
2)由于1,使用无界队列时maximumPoolSize将是一个无效参数。
3)由于1和2,使用无界队列时keepAliveTime将是一个无效参数。
4)更重要的,使用无界queue可能会耗尽系统资源,有界队列则有助于防止资源耗尽,同时即使使用有界队列,也要尽量控制队列的大小在一个合适的范围。
threadFactory
创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。
Executors静态工厂里默认的threadFactory,线程的命名规则是“pool-数字-thread-数字”。
RejectedExecutionHandler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
(1)AbortPolicy:直接抛出异常,默认策略;
(2)CallerRunsPolicy:用调用者所在的线程来执行任务;
(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
五、线程池工作流程(机制)
1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务。
4. 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
六、关于两种提交方法的比较
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。