Java中的线程池ThreadPoolExecutor细致讲解
作者:y_initiate
1. 线程池状态
- RUNNING:允许提交并处理任务
- SHUTDOWN: 不允许提交新的任务,但是会处理完已提交的任务
- STOP:不允许提交新的任务,也不会处理阻塞队列未执行的,并设置正在执行的线程的中断标志位
- TIDYING:所有任务执行完毕,池中工作的线程数为0,等待执行terminated()方法
- TERMINATED:terminated()方法执行完毕
线程池的shutdown()方法,将线程池由RUNNING转为SHUTDOWN状态
线程池的shutdownNow()方法,将线程池由RUNNING或SHUTDOWN转为STOP状态
SHUTDOWN和STOP状态最终都会变为TERMINATED
2. ThreadPoolExecutor构造函数
- public ThreadPoolExecutor(int corePoolSize, 线程池中核心线程数最大值
- int maximumPoolSize, 线程池中能拥有最多线程数
- long keepAliveTime, 空闲线程存活时间
- TimeUnit unit, keepAliveTime单位
- BlockingQueue workQueue, 用于缓存任务的阻塞队列
- ThreadFactory threadFactory, 创建线程的工厂
- RejectedExecutionHandler handler 拒绝策略
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(“export_data_pool_factory” + “-%d”).build(); ExecutorService pool = new ThreadPoolExecutor(5, 10,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(500), factory, new ThreadPoolExecutor.CallerRunsPolicy());
3. 线程池工作原理
3.1 任务执行流程
注: execute()无返回值发生异常会抛出 submit()返回值为Feature,异常无感知需要通过返回的Feature获取异常信息
当调用线程池execute()或者submit() 方法向线程池提交一个任务时,线程池会做如下判断:
- 如果有空闲线程,且当前运行的线程数少于corePoolSize,则创建新的线程执行该任务;
- 如果没有空闲线程,且当前运行的线程数少于corePoolSize,则创建新的线程执行该任务;(与第一条相同,此处另写原因见下面注释)
注:此处网上教程为『如果有空闲线程,则直接执行该任务; 如果没有空闲线程,且当前运行的线程数少于corePoolSize,则创建新的线程执行该任务』 但是 ThreadPoolExecutor 的官方注释:“When a new task is submitted in method {@link #execute(Runnable)}, and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle”,经测试官方注释正确
package test; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.*; import java.util.*; import java.util.concurrent.*; public class Test1 { private final ExecutorService pool; { ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("test_pool_factory" + "-%d").build(); pool = new ThreadPoolExecutor(3, 5, 1000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10), factory, new ThreadPoolExecutor.CallerRunsPolicy()); } public void close() { this.pool.shutdown(); } public void runTask(int i) { pool.submit(new Runnable() { @Override public void run() { try { System.out.println("index:" + i + " id:" + Thread.currentThread().getId() + " poolSize: " + ((ThreadPoolExecutor)pool).getPoolSize() + " activeSize:" + ((ThreadPoolExecutor)pool).getActiveCount()); System.out.println("index:" + i + " id:" + Thread.currentThread().getId() + " name:" + Thread.currentThread().getName()); System.out.println("==============================================="); } catch (Exception e) { System.out.printf("ERROR: id:%s name:%s time:%s ERROR### \n", Thread.currentThread().getId(), Thread.currentThread().getName(), new Date()); } } }); } public static void main(String[] args) throws IOException, InterruptedException { Test1 t = new Test1(); for (int i = 0; i < 5; i++) { System.out.println("poolSize:" + ((ThreadPoolExecutor)t.pool).getPoolSize() + " activeSize:" + ((ThreadPoolExecutor)t.pool).getActiveCount()); t.runTask(i); Thread.sleep(100); } t.close(); } }
通过运行结果可以看到,任务执行完毕后poolSize已经有线程,并且都处于空闲状态,但当poolSize<corePoolSize时每次都会新建一个线程执行任务
- 如果没有空闲线程,且当前的线程数等于corePoolSize,同时阻塞队列未满,则将任务入队列,而不添加新的线程;
- 如果没有空闲线程,且阻塞队列已满,同时池中的线程数小于maximumPoolSize ,则创建新的线程执行任务;
- 如果没有空闲线程,且阻塞队列已满,同时池中的线程数等于maximumPoolSize ,则根据构造函数中的 handler 指定的策略来拒绝新的任务。
3.2 空闲线程释放策略
当线程空闲时间超过keepAliveTime时,如果线程池设置了allowCoreThreadTimeout参数为true(默认false)则直接释放掉该线程(它最终会收缩到0),如果没有设置则判断当前线程数 > corePoolSize,则该线程会被释放掉(它最终会收缩到 corePoolSize 的大小)。
3.3 任务队列(workQueue)
任务队列:决定了缓存任务的排队策略
- 有界队列
- SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于 阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool 使用了这个队列。
- ArrayBlockingQueue:一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
- 无界队列
- LinkedBlockingQueue:基于链表结构的无界阻塞队列,它可以指定容量也可以不指定容量(实际上任何无限容量的队列/栈都是有容量的,这个容量就是Integer.MAX_VALUE)
- PriorityBlockingQueue:是一个按照优先级进行内部元素排序的无界阻塞队列。队列中的元素必须实现 Comparable 接口,这样才能通过实现compareTo()方法进行排序。优先级最高的元素将始终排在队列的头部;PriorityBlockingQueue 不会保证优先级一样的元素的排序。
ThreadPoolExecutor线程池推荐了三种等待队列,SynchronousQueue 、LinkedBlockQueue和 ArrayBlockingQueue
3.4 threadFactory
threadFactory :指定创建线程的工厂。(可以不指定) 如果不指定线程工厂时,ThreadPoolExecutor 会使用ThreadPoolExecutor.defaultThreadFactory 创建线程。默认工厂创建的线程:同属于相同的线程组,具有同为 Thread.NORM_PRIORITY 的优先级,以及名为 “pool-XXX-thread-” 的线程名(XXX为创建线程时顺序序号),且创建的线程都是非守护进程。
3.5 handler 拒绝策略
handler :表示当 workQueue 已满,且池中的线程数达到 maximumPoolSize 时,线程池拒绝添加新任务时采取的策略。(可以不指定)
- ThreadPoolExecutor.AbortPolicy():抛出RejectedExecutionException异常。默认策略
- ThreadPoolExecutor.CallerRunsPolicy():由向线程池提交任务的线程来执行该任务
- ThreadPoolExecutor.DiscardPolicy():抛弃当前的任务
- ThreadPoolExecutor.DiscardOldestPolicy():抛弃最旧的任务(最先提交而没有得到执行的任务)
4. 常用方法
除了修改创建线程池参数的修改allowCoreThreadTimeOut(boolean value),setKeepAliveTime(long timt, TimeUnit unit),setMaximumPoolSize(int maximumPoolSize),setCorePoolSize(int corePoolSize),setThreadFactory(ThreadFactory threadFactory),setRejectedExecutionHandler(RejectedExecutionHandler handler)外
- getCorePoolSize():返回线程池的核心线程数,返回在线程池的coreSize大小;
- getMaximumPoolSize():返回线程池的最大线程数,返回线程池的coreSize大小;
- getLargestPoolSize():记录了曾经出现的最大线程个数(水位线);
- getPoolSize():线程池中当前线程的数量;
- getActiveCount():Returns the approximate(近似) number of threads that are actively executing tasks;
- prestartAllCoreThreads():会启动所有核心线程,无论是否有待执行的任务,线程池都会创建新的线程,直到池中线程数量达到 corePoolSize;
- prestartCoreThread():会启动一个核心线程(同上);
- allowCoreThreadTimeOut(true):允许核心线程在KeepAliveTime时间后,退出;
到此这篇关于Java中的线程池ThreadPoolExecutor细致讲解的文章就介绍到这了,更多相关Java的ThreadPoolExecutor内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!