java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java线程池ThreadPoolExecutor

Java线程池ThreadPoolExecutor的使用及其原理详细解读

作者:外星喵

这篇文章主要介绍了Java线程池ThreadPoolExecutor的使用及其原理详细解读,线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务,线程池线程都是后台线程,需要的朋友可以参考下

什么是线程池

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

线程池的作用

关于Java线程池(ThreadPool)

jdk提供的线程池有哪些

在 JDK 1.5 之后推出了相关的 api,其提供的线程池有以下四种:

查看Executor源码会发现,Executor只是一个创建线程池的工具类,这四种方式创建的源码就会发现,都是利用 ThreadPoolExecutor 类实现的,真正的线程池接口是ExecutorService,其实现类为ThreadPoolExecutor。

Executors 部分源码:

public class Executors {
	public static ExecutorService newFixedThreadPool(int nThreads) {
        	return new ThreadPoolExecutor(nThreads, nThreads,
                             	         0L, TimeUnit.MILLISECONDS,
                            	          new LinkedBlockingQueue<Runnable>());
  	  }
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

ThreadPoolExecutor 最终的构造器源码:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
}

其它构造器如下:

// 构造器一
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
        Executors.defaultThreadFactory(), defaultHandler);
    }
	// 构造器二
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
	// 构造器三
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

关于创建线程池

有一点是肯定的,线程池肯定是不是越大越好。 通常我们是需要根据这批任务执行的性质来确定的。

当然这些都是经验值,最好的方式还是根据实际情况测试得出最佳配置。

通常我们使用线程池可以通过如下方式去使用:

使用jdk提供的线程池

//创建一个可缓存的线程池
        ExecutorService cachedThreadPool= Executors.newCachedThreadPool();
        //通过execute方法执行接口
        cachedThreadPool.execute(()->{
            //Runnable to do something.
        });

创建自定义线程池

        ExecutorService threadPool = new ThreadPoolExecutor(// 自定义一个线程池
                1, // 核心线程数
                2, // 最大线程数
                60, // 超过核心线程数的额外线程存活时间
                TimeUnit.SECONDS, // 线程存活时间的时间单位
                new ArrayBlockingQueue<>(3) // 有界队列,容量是3个
                , Executors.defaultThreadFactory()    // 线程工厂
                , new ThreadPoolExecutor.AbortPolicy() //线程的拒绝策略
        );
        //执行一个任务
        threadPool.execute(()->{
            //线程执行的具体逻辑
            //Runnable to do something.
        });

线程池的任务队列

队列有三种通用策略:

线程池的任务拒绝策略

jdk提供了四种拒绝策略:

  1. AbortPolicy: 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。同时它也是线程池默认的拒绝策略。
  2. CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
  3. DiscardPolicy: 直接丢弃,其他啥都没有
  4. DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入

线程池如何执行

在这里插入图片描述

所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。

主要分三步进行:

  1. 如果运行的线程少于corePoolSize,请尝试以给定的命令作为第一个线程开始一个新线程任务。对addWorke的调用以原子方式检查运行状态和workerCount,从而防止会增加当它不应该的时候,返回false。
  2. 如果任务可以成功排队,那么我们仍然需要仔细检查是否应该添加线程(因为自从上次检查以来已有的已经死了)或者自进入此方法后,池已关闭。所以我们重新检查状态,必要时回滚排队已停止,如果没有,则启动新线程。
  3. 如果无法将任务排队,则尝试添加新的线程。如果失败了,我们就知道我们已经关闭或者饱和了,所以拒绝这个任务。
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

线程池的任务添加流程

在excute方法中调用了addWorker方法

    private boolean addWorker(Runnable firstTask, boolean core) {
     	// 外层循环,负责判断线程池状态,处理线程池状态变量加1操作
        retry:
        for (;;) {
		    // 状态总体相关值:运行状态 + 执行线程任务数量 
            int c = ctl.get();
			// 读取状态值 - 运行状态
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
			// 满足下面两大条件的,说明线程池不能接受任务了,直接返回false处理
			// 主要目的就是想说,只有线程池的状态为 RUNNING 状态时,线程池才会接收
			// 新的任务,增加新的Worker工作线程
			// 线程池的状态已经至少已经处于不能接收任务的状态了
            if (rs >= SHUTDOWN &&
			  //目的是检查线 程池是否处于关闭状态
                ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                       return false;
				// 内层循环,负责worker数量加1操作
				for (;;) {
				// 获取当前worker线程数量
					int wc = workerCountOf(c);
					if (wc >= CAPACITY ||
						// 如果线程池数量达到最大上限值CAPACITY
						// core为true时判断是否大于corePoolSize核心线程数量
						// core为false时判断是否大于maximumPoolSize最大设置的线程数量
						wc >= (core ? corePoolSize : maximumPoolSize))
						return false;
					// 调用CAS原子操作,目的是worker线程数量加1
					if (compareAndIncrementWorkerCount(c)) //
						break retry;
					c = ctl.get();  // Re-read ctl
					// CAS原子操作失败的话,则再次读取ctl值
					if (runStateOf(c) != rs)
					// 如果刚刚读取的c状态不等于先前读取的rs状态,则继续外层循环判断
						continue retry;
					// else CAS failed due to workerCount change; retry inner loop
					// 之所以会CAS操作失败,主要是由于多线程并发操作,导致workerCount
					// 工作线程数量改变而导致的,因此继续内层循环尝试操作
				}
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
			// 创建一个Worker工作线程对象,将任务firstTask,
			// 新创建的线程thread都封装到了Worker对象里面
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
				// 由于对工作线程集合workers的添加或者删除,
				// 涉及到线程安全问题,所以才加上锁且该锁为非公平锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
					// 获取锁成功后,执行临界区代码,首先检查获取当前线程池的状态rs
                    int rs = runStateOf(ctl.get());
					// 当线程池处于可接收任务状态
					// 或者是不可接收任务状态,但是有可能该任务等待队列中的任务
					// 满足这两种条件时,都可以添加新的工作线程
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                         workers.add(w);
						// 添加新的工作线程到工作线程集合workers,workers是set集合
                        int s = workers.size();
                        if (s > largestPoolSize) 
						// 变量记录了线程池在整个生命周期中曾经出现的最大线程个数
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) { 
				// 往workers工作线程集合中添加成功后,则立马调用线程start方法启动起来
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
			// 如果启动线程失败的话,还得将刚刚添加成功的线程共集合中移除并且做线				// 程数量做减1操作
                addWorkerFailed(w);
        }
        return workerStarted;
    }

线程池中任务的执行流程

runWorker 通过调用t.start()启动了线程,线程池真正核心执行任务的地方就在此

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
		// allow interrupts 允许中断
        w.unlock();
        boolean completedAbruptly = true;
        try {
			// 不断从等待队列blockingQueue中获取任务
			// 之前addWorker(null, false)这样的线程执行时,
			// 会通过getTask中再次获取任务并执行
            while (task != null || (task = getTask()) != null) {
                w.lock(); 
				// 上锁,并不是防止并发执行任务,
				// 而是为了防止shutdown()被调用时不终止正在运行的worker线程
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
					// task.run()执行前,由子类实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run(); // 执行线程Runable的run方法
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
						// task.run()执行后,由子类实现
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

如何关闭线程池

有运行任务自然也有关闭任务。 通过查看ExecutorService接口,其实无非就是两个方法 shutdown()/shutdownNow()。

但他们有着重要的区别:

        threadPool.shutdown();
        threadPool.shutdownNow();

到此这篇关于Java线程池ThreadPoolExecutor的使用及其原理详细解读的文章就介绍到这了,更多相关Java线程池ThreadPoolExecutor内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文