使用ThreadPoolExecutor之高效处理并发任务
作者:xindoo
ThreadPoolExecutor是Java concurrent中用于管理线程池的类,它是Executor框架的一个实现。线程池是一种提高应用程序性能和可靠性的技术,它将多个任务分配给多个线程执行,从而实现并发处理。
ThreadPoolExecutor提供了一种灵活的方式来管理线程池,可以控制线程池的大小、阻塞队列的大小、线程池的状态、线程的创建和销毁等。
使用ThreadPoolExecutor可以帮助我们避免线程创建和销毁的开销,提高应用程序的性能和可伸缩性。
具体来说,它有以下这些优点:
- 复用线程:线程池可以重用已经创建的线程,避免了线程创建和销毁的开销。
- 控制线程数量:线程池可以控制线程的数量,避免了线程数量过多或过少的问题。
- 提高响应速度:线程池可以提高应用程序的响应速度,因为线程可以立即执行任务,而不需要等待线程创建和启动。
- 提高可伸缩性:线程池可以提高应用程序的可伸缩性,因为它可以自动调整线程的数量,以适应不同的工作负载。
- 提高可靠性:线程池可以提高应用程序的可靠性,因为它可以避免线程崩溃或死锁的问题,从而保证应用程序的稳定性。
总之,ThreadPoolExecutor在多线程开发中是绕不开的一个类,用的好它可以显著提升代码的性能,用不好就有可能代理一些其他的问题,比如我曾经见过因错误设置阻塞队列大小,导致严重的业务故障。
这也是阿里巴巴Java开发手册中不允许用Executors去创建线程池的原因。
如何使用
接下来,我们通过一个完整的代码示例来看下ThreadPoolExecutor具体如何使用:
import java.util.concurrent.*; public class ThreadPoolExecutorExample { public static void main(String[] args) { int corePoolSize = 2; int maximumPoolSize = 4; long keepAliveTime = 10; TimeUnit unit = TimeUnit.SECONDS; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2); ThreadFactory threadFactory = Executors.defaultThreadFactory(); RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); ThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler ); for (int i = 1; i <= 10; i++) { executor.execute(new Task(i)); } executor.shutdown(); } static class Task implements Runnable { private int taskId; public Task(int taskId) { this.taskId = taskId; } @Override public void run() { System.out.println("Task #" + taskId + " is running on " + Thread.currentThread().getName()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task #" + taskId + " is completed on " + Thread.currentThread().getName()); } } }
上面代码也很简单,就是构造了一个线程,让其多线程打印一些信息。从上面代码来看,它构造方法一共有7个参数,其中keepAliveTime和timeUnit其实是搭配使用的。
我们来看下具体每个参数的含义和作用:
- corePoolSize:线程池的核心线程数。当有新的任务提交到线程池时,如果当前线程池中的线程数小于corePoolSize,那么线程池会创建新的线程来执行任务。如果当前线程池中的线程数大于或等于corePoolSize,那么线程池会将任务添加到阻塞队列中等待执行。默认值为1。
- maximumPoolSize:线程池的最大线程数。如果阻塞队列已满,且当前线程池中的线程数小于maximumPoolSize,那么线程池会创建新的线程来执行任务。如果当前线程池中的线程数已经达到maximumPoolSize,那么线程池会根据拒绝策略来处理新的任务。默认值为Integer.MAX_VALUE。
- keepAliveTime和timeUnit:线程的空闲时间。当线程池中的线程数大于corePoolSize时,如果一个线程在keepAliveTime后没有执行任何任务,那么该线程将被终止。默认值为0,表示线程池中的所有线程都是长期存活的。
- workQueue:阻塞队列。当线程池中的线程数已经达到corePoolSize时,新的任务将被添加到阻塞队列中等待执行。ThreadPoolExecutor提供了多种类型的阻塞队列,包括SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue等。默认值为LinkedBlockingQueue。
- threadFactory:线程工厂。用于创建新的线程。如果没有指定线程工厂,ThreadPoolExecutor将使用默认的线程工厂来创建线程。默认值为DefaultThreadFactory。
- handler:拒绝策略。当阻塞队列已满,且当前线程池中的线程数已经达到maximumPoolSize时,ThreadPoolExecutor会根据指定的拒绝策略来处理新的任务。ThreadPoolExecutor提供了多种拒绝策略,包括AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy和DiscardPolicy等。默认值为AbortPolicy。
在不同的应用场景下,我们可能需要调整这些参数,以提高应用程序的性能和可伸缩性。
例如,可以通过增加corePoolSize和maximumPoolSize来提高线程池的并发能力,或者通过增加阻塞队列的大小来提高线程池的缓冲能力。同时,也可以通过指定不同的拒绝策略来处理新的任务,以避免任务丢失或应用程序崩溃的问题。
线程池状态
ThreadPoolExecutor在实际使用中有四种状态,分别是RUNNING、SHUTDOWN、STOP和TERMINATED。
- RUNNING:线程池正在运行。在这个状态下,线程池可以接受新的任务,并且会将任务添加到阻塞队列中等待执行,或者直接创建新的线程来执行任务。
- SHUTDOWN: 线程池正在关闭。在这个状态下,线程池不会接受新的任务,但会将阻塞队列中的任务继续执行完毕。如果有新的任务提交到线程池,那么线程池会拒绝这些任务并抛出RejectedExecutionException异常。
- STOP: 线程池已经停止。在这个状态下,线程池不会接受新的任务,并且会中断正在执行的任务。如果有新的任务提交到线程池,那么线程池会拒绝这些任务并抛出RejectedExecutionException异常。
- TERMINATED: 线程池已经终止。在这个状态下,线程池中的所有任务都已经执行完毕,并且所有的线程都已经被销毁。如果需要重新使用线程池,那么需要重新创建一个新的线程池。
通过合理地控制线程池的状态,可以避免任务丢失或线程池崩溃的问题,并且可以提高应用程序的性能和可靠性。
例如,在应用程序关闭时,可以先将线程池的状态设置为SHUTDOWN,等待线程池中的所有任务执行完毕后再将状态设置为STOP,最终将线程池状态设置为TERMINATED,以确保线程池中的所有任务都能够得到执行。
线程池执行任务的过程
ThreadPoolExecutor如何执行任务涉及到任务的提交、执行、取消和完成等多个方面。
- 任务的提交:可以通过execute()方法将任务提交到线程池中,execute()方法会将任务添加到阻塞队列中等待执行。也可以通过submit()方法将任务提交到线程池中,submit()方法会返回一个Future对象,可以用于获取任务的执行结果。
- 任务的执行:线程池会从阻塞队列中取出任务,并将任务分配给空闲的线程执行。如果当前线程池中的线程数小于corePoolSize,那么线程池会创建新的线程来执行任务。如果当前线程池中的线程数已经达到corePoolSize,那么线程池会将任务添加到阻塞队列中等待执行。如果阻塞队列已满,且当前线程池中的线程数小于maximumPoolSize,那么线程池会创建新的线程来执行任务。如果当前线程池中的线程数已经达到maximumPoolSize,那么线程池会根据拒绝策略来处理新的任务。
- 任务的取消:可以通过cancel()方法将任务从阻塞队列中移除,如果任务还没有开始执行,那么任务将被取消。如果任务已经在执行,那么可以通过interrupt()方法中断任务的执行。
- 任务的完成:可以通过Future对象来获取任务的执行结果,也可以通过isDone()方法来判断任务是否已经执行完毕。当任务执行完毕后,线程池会将任务从阻塞队列中移除,并将线程返回到线程池中等待下一个任务的执行。
通过合理地控制任务的提交、执行、取消和完成等方面的内容,可以提高线程池的性能和可靠性,避免任务丢失或线程池崩溃的问题。
例如,在提交任务时,可以根据任务的类型和优先级来选择合适的阻塞队列,以确保任务能够得到及时执行。在取消任务时,可以先使用isCancelled()方法来判断任务是否已经被取消,以避免重复取消任务的问题。
在获取任务的执行结果时,可以使用get()方法来等待任务的执行结果,或者使用get(timeout, unit)方法来设置超时时间,以避免任务执行时间过长导致线程池阻塞的问题。
阻塞队列
ThreadPoolExecutor提供了多种类型的阻塞队列,用于存储等待执行的任务。不同类型的阻塞队列有不同的特点和适用场景。
接下来介绍ThreadPoolExecutor的三种常用阻塞队列:
- SynchronousQueue:同步队列。SynchronousQueue是一个没有容量的阻塞队列,它的作用是将任务直接交给线程来执行,而不是先将任务存储在队列中等待执行。如果当前没有空闲的线程来执行任务,那么SynchronousQueue会阻塞任务的提交,直到有线程空闲为止。SynchronousQueue适用于任务执行时间短、任务量大的场景,可以避免任务在队列中等待的时间,提高线程池的响应速度。
- LinkedBlockingQueue:链表阻塞队列。LinkedBlockingQueue是一个有容量的阻塞队列,它的作用是将任务存储在队列中等待执行。如果队列已满,那么新的任务将被阻塞,直到队列中有空闲位置为止。LinkedBlockingQueue适用于任务执行时间长、任务量大的场景,可以避免任务在线程池中等待执行的时间过长,提高线程池的缓冲能力。
- ArrayBlockingQueue:数组阻塞队列。ArrayBlockingQueue是一个有容量的阻塞队列,它的作用和LinkedBlockingQueue类似,但是它是一个基于数组的队列,而不是基于链表的队列。ArrayBlockingQueue适用于任务执行时间长、任务量大的场景,可以避免任务在线程池中等待执行的时间过长,提高线程池的缓冲能力。与LinkedBlockingQueue相比,ArrayBlockingQueue的吞吐量更高,但是它的性能可能会受到数组大小的限制。
通过合理地选择不同类型的阻塞队列,可以根据应用程序的需求来提高线程池的性能和可靠性。
例如,对于任务执行时间短、任务量大的场景,可以选择SynchronousQueue来避免任务在队列中等待的时间;对于任务执行时间长、任务量大的场景,可以选择LinkedBlockingQueue或ArrayBlockingQueue来提高线程池的缓冲能力。同时,也可以通过调整阻塞队列的大小来控制线程池的缓冲能力,以适应不同的工作负载。
拒绝策略
ThreadPoolExecutor的拒绝策略用于处理新的任务提交到线程池时,如果线程池已经达到最大线程数和阻塞队列已满的情况下,线程池应该如何处理这些新的任务。
ThreadPoolExecutor提供了四种常用的拒绝策略:
- AbortPolicy:抛出RejectedExecutionException异常。这是ThreadPoolExecutor的默认拒绝策略,如果线程池已经达到最大线程数和阻塞队列已满的情况下,新的任务将被拒绝并抛出异常。
- CallerRunsPolicy: 由提交任务的线程来执行任务。如果线程池已经达到最大线程数和阻塞队列已满的情况下,新的任务将被提交到线程池的调用线程中执行。这种策略可以避免任务丢失,但是会降低线程池的吞吐量。
- DiscardOldestPolicy:丢弃最老的任务。如果线程池已经达到最大线程数和阻塞队列已满的情况下,新的任务将被丢弃,而不是抛出异常或者执行任务。这种策略可以避免线程池阻塞,但是可能会丢失一些重要的任务。
- DiscardPolicy:直接丢弃任务。如果线程池已经达到最大线程数和阻塞队列已满的情况下,新的任务将被直接丢弃,而不是抛出异常或者执行任务。这种策略可以避免线程池阻塞,但是会丢失所有的任务。
通过合理地选择不同类型的拒绝策略,可以根据应用程序的需求来处理新的任务提交到线程池时的情况。
例如,对于重要的任务,可以选择CallerRunsPolicy来确保任务能够得到执行;对于不重要的任务,可以选择DiscardPolicy来避免线程池阻塞。同时,也可以通过实现RejectedExecutionHandler接口来自定义拒绝策略,例如我们可以丢下任务前将其记录下:
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class DiscardOldestWithKafkaRejectedExecutionHandler implements RejectedExecutionHandler { private String kafkaTopic; public DiscardOldestWithKafkaRejectedExecutionHandler(String kafkaTopic) { this.kafkaTopic = kafkaTopic; } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 获取最早的未执行任务 Runnable earliestTask = executor.getQueue().peek(); if (earliestTask != null) { // 将最早的未执行任务发到kafka里 KafkaProducer.send(kafkaTopic, earliestTask.toString()); } // 将当前任务提交到线程池 executor.execute(r); } }
在这个示例中,我们实现了一个DiscardOldestWithKafkaRejectedExecutionHandler,它继承了RejectedExecutionHandler接口,并重写了rejectedExecution()方法。它的作用是当线程池中的任务队列和线程池都满了,并且新的任务被拒绝时,这个方法会将最早未运行的任务拿出来,丢弃到kafka中,然后提交当前的任务。
线程池的监控和调优
线程池稳定和高效的运行也是非常重要的,要想它稳定高效运行,就离不开监控和调优。
下面给出一些监控和调优ThreadPoolExecutor的方法论:
- 监控指标: 可以通过ThreadPoolExecutor提供的一些监控指标来了解线程池的状态和性能,例如线程池大小、活跃线程数、任务队列大小、已完成任务数、拒绝任务数等。可以通过调用ThreadPoolExecutor的getPoolSize()、getActiveCount()、getQueueSize()、getCompletedTaskCount()、getRejectedExecutionCount()等方法来获取这些监控指标,以便及时发现和解决线程池的问题。
- 调优策略: 可以通过调整线程池的参数来优化线程池的性能和可靠性,例如调整corePoolSize和maximumPoolSize来提高线程池的并发能力,调整keepAliveTime来控制线程的空闲时间,调整阻塞队列的大小来提高线程池的缓冲能力,选择合适的拒绝策略来处理新的任务提交时的情况等。同时,也可以通过监控指标来实时调整线程池的参数,以适应不同的工作负载。
- 线程池的优化: 可以通过线程池的优化来提高线程池的性能和可靠性,例如使用线程池前先评估任务的类型和优先级,选择合适的阻塞队列和拒绝策略,避免任务的等待和丢失;使用线程池时避免过度提交任务,控制任务的数量和质量;使用线程池时避免长时间的空闲或者过度使用线程池,以避免资源浪费和线程池崩溃的问题。
通过合理地监控和调优ThreadPoolExecutor,可以提高线程池的性能和可靠性,避免任务丢失或线程池崩溃的问题,并且可以提高应用程序的性能和可靠性。
最佳实践
使用ThreadPoolExecutor的最佳实践涉及到线程池的参数设置、任务处理、异常处理等多个方面。
下面介绍一些使用ThreadPoolExecutor的最佳实践:
- 线程池参数设置:在创建ThreadPoolExecutor时,需要根据应用程序的需求来设置线程池的参数,例如corePoolSize、maximumPoolSize、keepAliveTime、阻塞队列类型和大小、拒绝策略等。可以根据任务的类型和优先级来选择合适的阻塞队列和拒绝策略,以避免任务的等待和丢失。
- 任务处理:在提交任务时,需要根据任务的类型和优先级来选择合适的提交方式,例如使用execute()方法或submit()方法提交任务。同时,还需要注意任务的异常处理,可以通过实现UncaughtExceptionHandler接口来自定义异常处理方式,以避免任务异常导致线程池崩溃的问题。
- 线程池的关闭:在关闭线程池时,需要注意线程池的状态和任务的处理。可以通过调用shutdown()方法或shutdownNow()方法来关闭线程池,前者会等待所有任务执行完毕再关闭线程池,后者会立即关闭线程池并中断所有任务的执行。同时,还需要注意线程池的状态,可以通过isShutdown()方法和isTerminated()方法来判断线程池是否已经关闭。
- 线程池的监控和调优:在使用ThreadPoolExecutor时,需要及时监控线程池的状态和性能,以便及时发现和解决线程池的问题。可以通过监控指标和调优策略来优化线程池的性能和可靠性,例如调整线程池的参数、选择合适的阻塞队列和拒绝策略、避免任务的等待和丢失等。
通过遵循ThreadPoolExecutor的最佳实践,可以提高线程池的性能和可靠性,避免任务丢失或线程池崩溃的问题,并且可以提高应用程序的性能和可靠性。
总结
ThreadPoolExecutor是Java中用于管理线程池的一个类,它能够创建和管理线程池,以提高应用程序的性能和可靠性。它具有灵活的线程池管理、高效的任务处理和可靠的异常处理等特点。尤其适用于任务量大、执行时间长、任务类型多样的应用场景,例如Web服务器、数据库连接池、文件处理等。通过合理地设置线程池参数、处理任务和异常、监控和调优线程池,可以提高应用程序的性能和可靠性,避免任务丢失或线程池崩溃的问题。
为了充分利用ThreadPoolExecutor提高应用程序的性能和可靠性,我们可以采取以下措施:
- 合理设置线程池参数,例如corePoolSize(核心线程数)、maximumPoolSize(最大线程数)、keepAliveTime(线程空闲时间)、阻塞队列类型和大小、拒绝策略等,以适应不同的工作负载和应用场景。
- 处理任务和异常,例如选择合适的任务提交方式(如execute或submit),实现UncaughtExceptionHandler接口来自定义异常处理方式,以避免任务异常导致线程池崩溃的问题。
- 监控和调优线程池,例如及时监控线程池的状态和性能、调整线程池的参数、选择合适的阻塞队列和拒绝策略、避免任务的等待和丢失等,以提高线程池的性能和可靠性。
通过以上措施,我们可以充分发挥ThreadPoolExecutor的优势,提高应用程序的性能和可靠性,避免任务丢失或线程池崩溃的问题。同时,这也有助于我们在面临大量任务、长时间执行和多种任务类型的应用场景时,更好地应对挑战,实现高效、稳定的应用程序运行。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。