Java多线程中的Executor框架解析
作者:得过且过的勇者y
前言
Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)。
Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。
Executor框架的组成:
- 任务(Runnable/Callable):任务通过Runnable接口或Callable接口进行定义。Runnable接口或Callable接口实现类都可以被 ThreadPoolExecutor执行
- 任务的执行(Executor):任务执行机制的核心接口 Executor,以及继承自Executor接口的ExecutorService接口。ThreadPoolExecutor实现了ExecutorService接口
- 异步的计算结果(Future):Future接口以及Future接口的实现类FutureTask类都可以代表异步计算的结果。当我们把Runnable接口或Callable接口的实现类提交给ThreadPoolExecutor执行后就会返回一个Future对象
一、Executor接口
线程池简化了线程的管理工作, 并且JUC提供了一种灵活的线程池实现来作为Executor框架的一部分。
在Java类库中,任务执行的主要抽象不是Thread而是Executor,Executor只定义了execute一个方法,是最顶层的接口。
/** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command);
execute方法接受一个Runnable参数,这个方法定义为在未来的某个时间执行传入的方法,方法的运行可以在一个新的线程,在线程池或者在调用的线程中,该方法无法接收到线程的执行结果(当然Runnable接口本身也没有返回值)。
虽然Executor是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。
Executor的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。
**Executor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。**如果要在程序中实现一个生产者-消费者的设计,那么最简单的方式就是使用Executor。
二、ExecutorService接口
Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,它的run方法执行任务后不能返回一个值或者抛出一个受检查的异常。许多任务实际上都是存在延迟的计算——执行数据库查询,从网络上获取资源,或者计算某个复杂的功能。对于这些任务,Callable是一种更好的抽象,它认为主入口点(call)将返回一个值,并可能抛出一个异常。
因此引入了ExecutorService,它继承自Executor,并且在其基础上增加了很多功能,可以生成用于跟踪一个或多个异步任务进度的Future的方法,以解决Executor的局限性。
Future表示一个任务的生命周期,并且提供了响应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。
在Future规范中包含的隐含意义是任务的生命周期只能前进,不能后退,就像ExecutorService的生命周期一样。当某个任务完成后,它就永远停留在完成状态。
get方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已完成那么get会立即返回或者抛出一个Exception,如果任务没有完成,那么get将阻塞并直到任务完成。如果任务抛出了一场,那么get将该异常封装为ExecutionException并重新抛出。如果任务被取消,那么get将抛出CancellationException。如果get抛出了ExecutionException,那么可以通过getCause来获得被封装的初始异常。
定义了以下方法:
- void shutdown():启动有序关机,其中执行先前提交的任务,但不接受新任务。如果调用已经关闭,则没有额外的效果。不会阻塞等待先前提交的任务执行完成
- List<Runnable> shutdownNow():尝试停止所有正在执行的任务,停止对等待任务的处理,并返回等待执行的任务列表。不会阻塞等待正在执行的任务终止只是尽最大努力停止处理正在执行的任务之外,没有任何保证。例如,典型的实现将通过Thread.interrupt取消,因此任何未能响应中断的任务可能永远不会终止
- boolean isShutdown():如果此执行器已关闭(调用了关闭方法),则返回true
- boolean isTerminated():如果关闭后所有任务都已完成,则返回true。注意,除非先调用shutdown或shutdownNow,否则isTerminated永远不会为真
- boolean awaitTermination(long timeout, TimeUnit unit):阻塞直到所有任务在关机请求后完成执行,或者超时发生,或者当前线程被中断,以先发生的为准
- <T> Future<T> submit(Callablet task):提交一个带返回值的任务以供执行,并返回表示该任务的挂起结果的Future。Future的get方法将在成功完成任务时返回任务的结果
- <T> Future<T> submit(Runnable task, T result):提交可运行任务以供执行,并返回表示该任务的Future。Future的get方法将在成功完成时返回给定的结果
- Future<?> submit(Runnable task):提交可运行任务以供执行,并返回表示该任务的Future。Future的get方法将在成功完成时返回null
- <T> Listfuture<t> invokeAll(Collection? extends Callable<T> tasks):执行给定的任务,并在所有任务完成时返回保存其状态和结果的future列表。每个Future对象的isDone方法都会返回true。请注意,已完成的任务可以正常终止,也可以抛出异常终止。如果在执行此操作时修改了给定的集合,则此方法的结果是未定义的
- <T> T invokeAny(Collection? extends Callable<T> tasks):执行给定的任务,如果有成功完成的任务,则返回成功完成的任务的结果(即不抛出异常)。在正常或异常返回时,未完成的任务将被取消。如果在执行此操作时修改了给定的集合,则此方法的结果是未定义的。
三、ThreadPoolExecutor类
ThreadPoolExecutor实现了ExecutorService(实际上是继承了AbstractExecutorService),为了在广泛的上下文中发挥作用,该类提供了许多可调参数和可扩展性挂钩:
- corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去
- maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量
- keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程(救急线程)会在多长时间内被销毁
- unit:keepAliveTime的单位
- workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种
- threadFactory:线程工厂,用于创建线程,一般用默认即可
- handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务
1、状态
- RUNNING(-1<<29):接受新任务并且处理排队任务
- SHUTDOWN(0<<29):不接受新任务但是处理排队任务
- STOP(1<<29):不接受新任务也不处理排队任务,同时中断处理中的任务
- TYDING(2<<29):所有任务被终止,工作线程数为0之后进入该状态,并且会执行terminated()钩子函数
- TERMINATED(3<<29):terminated()钩子函数执行完毕后
状态单调地随时间增加,但不需要达到每个状态。
- RUNNING->SHUTDOWN:调用shutdown()
- RUNNING/SHUTDOWN->STOP:调用shutdownNow()
- STOP->TYDING:当队列和池都为空时
- TIDYING -> TERMINATED:terminated()钩子函数执行完毕后
ThreadPoolExecutor中对于状态的记录保存在一个AtomicInteger类型的变量中,其中高三位就是用于记录线程池的状态,而低的29位用于记录线程数量。
2、Worker
ThreadPoolExecutor中定义了一个私有静态类Worker,其继承自AbstractQueuedSynchronizer类,并实现了Runnable接口。其中维护了线程实例(Thread)、任务实例(Runnable)、线程任务计数器(long)变量。
这个类适当地扩展了AbstractQueuedSynchronizer,以简化获取和释放围绕每个任务执行的锁。这可以防止中断,这些中断旨在唤醒等待任务的工作线程,而不是中断正在运行的任务。我们实现了一个简单的不可重入互斥锁,而不是使用ReentrantLock,因为我们不希望工作任务在调用setCorePoolSize等池控制方法时能够重新获得锁。此外,为了在线程实际开始运行任务之前抑制中断,我们将锁状态初始化为负值,并在启动时(在runWorker中)清除它。
3、扩展
该类还定义了三个protected类型的钩子函数:
- beforeExecute:线程池任务运行前执行
- afterExecute:线程池任务运行后执行
- terminated:线程池退出后执行
这几个方法在ThreadPoolExecutor中为空实现。
四、ForkJoinPool类
Fork/Join框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
Fork就是把一个大任务切分为若干个子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+…+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。
1、工作窃取算法
ForkJoinPool是运行ForkJoinTasks的ExecutorService。**ForkJoinPool与其他类型的ExecutorService的区别主要在于使用了工作窃取。工作窃取算法是指某个线程从其他队列里窃取任务来执行。**那么为什么要使用工作窃取算法呢?**假如我们需要做一个比较大的任务,可以把这个任务分割为若干个互不干扰的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。**比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而这时它们会访问同一个队列,所以为了减少窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争
工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。
2、Fork/Join的设计
想要设计一个Fork/Join框架,需要完成两个步骤:
- 分割任务:需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小
- 执行任务并合并结果:分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
Fork/Join使用两个类来完成以上两件事情:
- ForkJoinTask(抽象类):我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况下我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类:
- RecursiveAction(抽象类):用于没有返回结果的任务
- RecursiveTask(抽象类):用于有返回结果的任务
- ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行
任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个线程。
public class CountTask extends RecursiveTask<Integer> { private int start; private int end; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; // 如果任务足够小就计算任务 boolean canCompute = (end - start) <= THRESHOLD; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { // 如果任务大于阈值,就分裂成两个子任务计算 int middle = (start + end) / 2; CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle + 1, end); // 执行子任务 leftTask.fork(); rightTask.fork(); // 等待子任务执行完,并得到其结果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); // 合并子任务 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(1, 4); // 执行一个任务 Future<Integer> result = forkJoinPool.submit(task); try { System.out.println(result.get()); } catch (InterruptedException e) { } catch (ExecutionException e) { } } }
RecursiveTask需要实现compute方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小就必须分割成两个子任务,每个子任务在调用fork方法时又会进入compute方法。最后使用join方法等待子任务执行完成并得到其结果。
ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法来获取异常。
getException方法返回Throwable对象,如果任务被取消了则返回CancellationException,如果任务没有完成或者没有抛出异常则返回null。
3、执行原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果。
pushTask方法把当前任务存放在ForkJoinTask数组队列里,然后再调用ForkJoinPool的signalWork反复噶唤醒或创建一个工作线程来执行任务。
五、ScheduledThreadPool类
ScheduledThreadPoolExecutor主要用来在给定的延迟后运行任务,或者定期执行任务。ScheduledThreadPoolExecutor使用任务队列DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask的time变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask的squenceNumber变量小的先执行)。
1、ScheduledExecutorService
ScheduledThreadPool类继承自ThreadPoolExecutor,并且实现了ScheduledExecutorService接口。
ScheduledExecutorService接口定义了几个方法:
- ScheduleFuture<?> schedule(Runnable command, long delay, TimeUnit unit):提交在给定延迟后启用的一次性任务
- ScheduleFuture schedule(Callable command, long delay, TimeUnit unit):提交在给定延迟之后启用的带返回值的一次性任务
- ScheduleFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):提交一个周期性任务,任务开始时进行计时(如果任务执行时间过长甚至超过period时间,会导致任务连续执行)
- ScheduleFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):提交一个周期性任务,任务执行完成之后才进行计时
ScheduledFuture继承自Delayed接口和Future接口,自己本身没有定义新的方法。
Delayed接口是一个混合风格的接口,用于标记应该在给定延迟后执行的对象。这个接口定义了一个getDelay方法,用于返回剩余的延迟时间。此外这个接口继承了Comparable接口,意味着此接口的实现必须定义一个compareTo方法,该方法提供与其getDelay方法一致的排序
2、比较Timer
Timer对系统时钟的变化敏感,ScheduledThreadPoolExecutor不是
Timer只有一个执行线程,因此长时间运行的任务可以延迟其他任务。 ScheduledThreadPoolExecutor可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory),你可以完全控制创建的线程
在TimerTask中抛出的运行时异常会杀死一个线程,从而导致 Timer 死机,即计划任务将不再运行。ScheduledThreadExecutor不仅捕获运行时异常,还允许您在需要时处理它们(通过重写afterExecute方法ThreadPoolExecutor)。抛出异常的任务将被取消,但其他任务将继续运行
六、Executors类
Executors是Java中用于创建线程池的工厂类,它提供了一系列的静态工厂方法,用于创建不同类型的线程池。这些工厂方法隐藏了线程池的复杂性,使得线程池的创建变得非常简单。Executors工厂类提供的线程池有以下几种类型:
- newCachedThreadPool():CachedThreadPool的corePoolSize 被设置为0,maximumPoolSize被设置为Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新的线程。极端情况下,这样会导致耗尽 cpu和内存资源
- newFixedThreadPool(int nThreads):创建一个固定大小的线程池,其中包含指定数量的线程。线程数量是固定的,不会自动扩展,即没有救急线程
- newSingleThreadExecutor():创建一个单线程的线程池。这个线程池中只包含一个线程,用于串行执行任务。适用于需要按顺序执行任务的场景
- newScheduledThreadPool(int corePoolSize):创建一个固定大小的线程池,用于定时执行任务。线程数量固定,不会自动扩展。适用于定时执行任务的场景
- newSingleThreadScheduledExecutor():创建一个单线程的定时执行线程池。只包含一个线程,用于串行定时执行任务
- newWorkStealingPool(int parallelism):该线程池维护足够的线程以支持给定的并行级别,并且可以使用多个队列来减少争用。并行性级别对应于积极参与或可用参与任务处理的最大线程数。实际的线程数可以动态地增加和减少。工作窃取池不能保证所提交任务的执行顺序
除此之外还提供了创建ThreadFactory实例,将Runnable实例转换为Callable实例等方法。
到此这篇关于Java多线程中的Executor框架解析的文章就介绍到这了,更多相关Java的Executor框架内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!