Java并发线程池实例分析讲解
作者:飞奔的小付
一.为什么要用线程池
先来看个简单的例子
1.直接new Thread的情况:
public static void main(String[] args) throws InterruptedException { long start = System.currentTimeMillis(); final List<Integer> list = new ArrayList<>(); final Random random = new Random(); for (int i = 0; i < 100000; i++) { Thread thread = new Thread() { @Override public void run() { list.add(random.nextInt()); } }; thread.start(); thread.join(); } System.out.println("执行时间:" + (System.currentTimeMillis() - start)); System.out.println("执行大小:" + list.size()); }
执行时间:6437
执行大小:100000
2.使用线程池时
public static void main(String[] args) throws InterruptedException { long start = System.currentTimeMillis(); final List<Integer> list = new ArrayList<>(); final Random random = new Random(); ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 100000; i++) { executorService.execute(()->{ list.add(random.nextInt()); }); } executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); System.out.println("执行时间:" + (System.currentTimeMillis() - start)); System.out.println("执行大小:" + list.size()); }
执行时间:82
执行大小:100000
从执行时间可以看出来,使用线程池的效率要远远超过直接new Thread。
二.线程池的好处
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
三.原理解析
四.4种线程池
1.newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
特点:newCachedThreadPool会创建一个可缓存线程池,如果当前线程池的长度超过了处理的需要时,可以灵活的回收空闲的线程,当需要增加时,它可以灵活的添加新的线程,而不会对线程池的长度作任何限制。
因为其最大线程数是Integer.MAX_VALUE,若新建的线程数多了,会超过机器的可用内存而OOM,但是因为其不是无界队列,所以在OOM之前一般会CPU 100%。
2.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
该方法会创建一个固定长度的线程池,控制最大并发数,超出的线程会在队列中等待,因为线程的数量是固定的,但是阻塞队列是无界的,如果请求数较多时,会造成阻塞队列越来越长,超出可用内存 进而OOM,所以要根据系统资源设置线程池的大小。Runtime.getRuntime().availableProcessors()
3.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
会创建一个单一的线程,前一个任务执行完毕才会执行下一个线程,FIFO,保证顺序执行。但是高并发下不太适用
4.newScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
创建一个固定长度的线程池,而且支持定时的以及周期性的任务执行,所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。
阿里规范中不推荐使用以上线程池,推荐使用自定义的线程池,当然如果你的项目中的数量级比较小的话那到没什么影响。
自定义线程池:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10),new MonkeyRejectedExecutionHandler());
执行优先级 : 核心线程>非核心线程>队列
提交优先级 : 核心线程>队列>非核心线程
五.线程池处理流程
流程图:
六.源码分析
流程图
ThreadPoolExecutor的execute方法
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //1.判断线程数是否小于核心线程数,如果是则使用入参任务通过addWorker方法创建一个新的线程,如果能完成新线程创建execute方法结束,成功提交任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //2.在第一步没有完成任务提交;状态为运行并且能成功加入任务到工作队列后,再进行一次check,如果状态在任务加入队列后变为了非运行(有可能是在执行到这里线程池shtdown了),非运行状态下当然是需要reject; // offer和add方法差不多,add方法就是调用的offer,只不过比offer多抛出一个异常 throw new IllegalStateException("Queue full") if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); //3.判断当前工作线程池数是否为0,如果是创建一个null任务,任务在堵塞队列存在了就会从队列中取出这样做的意义是保证线程池在running状态必须有一个任务在执行 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //4.如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池已经达到饱和状态,所以reject.拒绝策略不仅仅是在饱和状态下使用,在线程池进入到关闭阶段同样需要使用到; else if (!addWorker(command, false)) reject(command); } }
再进入到addWork方法
private boolean addWorker(Runnable firstTask, boolean core) { // goto写法 重试 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) //线程状态非运行并且非shutdown状态任务为空,队列非空就不能新增线程了 return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) //当前线程达到了最大阈值 就不再新增线程了 return false; if (compareAndIncrementWorkerCount(c)) //ctl+1工作线程池数量+1如果成功 就跳出死循环 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) //进来的状态和此时的状态发生改变重头开始重试 continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask);//内部类封装了线程和任务 通过threadfactory创建线程 //毎一个worker就是一个线程数 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //重新获取线程状态 int rs = runStateOf(ctl.get()); // 状态小于shutdown 就是running状态 或者 为shutdown并且firstTask为空是从队列中处理 任务那就可以放到集合中 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 线程还没start就是alive就直接异常 if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) // 记录最大线程数 largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) //失败回退从wokers移除w线程数减1尝试结束线程池 addWorkerFailed(w); } return workerStarted; }
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ //正在运行woker线程 final Thread thread; /** Initial task to run. Possibly null. */ //传入的任务 Runnable firstTask; /** Per-thread task counter */ //完成的任务数监控用 volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { //禁止线程中断 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }
再来看runworker方法
final void runWorker(Worker w) { //获取当前线程 Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts 把state从‐1改为0意思是可以允许中断 boolean completedAbruptly = true; try { //task不为空或者阻塞队列中拿到了任务 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt //如果当前线程池状态等于stop就中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { //这设置为空等下次循环就会从队列里面获取 task = null; //完成任务数+1 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
获取任务的方法
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c);//获取线程池运行状态 // Check if queue empty only if necessary. //shutdown或者为空那就工作线程‐1同时返回为null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //重新获取工作线程数 int wc = workerCountOf(c); // Are workers subject to culling? // timed是标志超时销毁 核心线程池也是可以销毁的 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
runWorker中的processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
ThreadPoolExecutor内部有实现4个拒绝策略:(1)、
- CallerRunsPolicy,由调用execute方法提交任务的线程来执行这个任务;
- AbortPolicy,抛出异常RejectedExecutionException拒绝提交任务;
- DiscardPolicy,直接抛弃任务,不做任何处理;
- DiscardOldestPolicy,去除任务队列中的第一个任务(最旧的),重新提交
ScheduledThreadPoolExecutor
- schedule:延迟多长时间之后只执行一次;
- scheduledAtFixedRate固定:延迟指定时间后执行一次,之后按照固定的时长周期执行;
- scheduledWithFixedDelay非固定:延迟指定时间后执行一次,之后按照:上一次任务执行时长+周期的时长的时间去周期执行;
private void delayedExecute(RunnableScheduledFuture<?> task) { //如果线程池不是RUNNING状态,则使用拒绝策略把提交任务拒绝掉 if (isShutdown()) reject(task); else { //与ThreadPoolExecutor不同,这里直接把任务加入延迟队列 super.getQueue().add(task); //如果当前状态无法执行任务,则取消 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else //和ThreadPoolExecutor不一样,corePoolSize没有达到会增加Worker; //增加Worker,确保提交的任务能够被执行 ensurePrestart(); } }
add方法里其实是调用了offer方法
public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) //容量扩增50% grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { //插入堆尾 siftUp(i, e); } if (queue[0] == e) { //如果新加入的元素成为了堆顶,则原先的leader就无效了 leader = null; //由于原先leader已经无效被设置为null了,这里随便唤醒一个线程(未必是原先的leader)来取走堆顶任务 available.signal(); } } finally { lock.unlock(); } return true; }
siftup方法:主要是对队列进行排序
private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { //获取父节点 int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; //如果key节点的执行时间大于父节点的执行时间,不需要再排序了 if (key.compareTo(e) >= 0) break; //如果key.compareTo(e)<0,说明key节点的执行时间小于父节点的执行时间,需要把父节点移到后面 queue[k] = e; setIndex(e, k); //设置索引为k k = parent; } //key设置为排序后的位置中 queue[k] = key; setIndex(key, k); }
run方法:
public void run() { //是否周期性,就是判断period是否为0 boolean periodic = isPeriodic(); //检查任务是否可以被执行 if (!canRunInCurrentRunState(periodic)) cancel(false); //如果非周期性任务直接调用run运行即可 else if (!periodic) ScheduledFutureTask.super.run(); //如果成功runAndRest,则设置下次运行时间并调用reExecutePeriodic else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); //需要重新将任务(outerTask)放到工作队列中。此方法源码会在后文介绍ScheduledThreadPoolExecutor本身API时提及 reExecutePeriodic(outerTask); } }
private void setNextRunTime() { long p = period; //fixed‐rate模式,时间设置为上一次时间+p,这里的时间只是可以被执行的最小时间,不代表到点就要执行 if (p > 0) time += p; else //fixed‐delay模式,计算下一次任务可以被执行的时间, 差不多就是当前时间+delay值 time = triggerTime(-p); } long triggerTime(long delay) { //如果delay<Long.Max_VALUE/2,则下次执行时间为当前时间+delay,否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } /** * 主要就是有这么一种情况: * 工作队列中维护任务顺序是基于compareTo的,在compareTo中比较两个任务的顺序会用time相减,负数则说明优先级高,那么就有可能出现一个delay为正数,减去另一个为负数的delay,结果上溢为负数,则会导致compareTo产生错误的结果. * 为了特殊处理这种情况,首先判断一下队首的delay是不是负数,如果是正数不用管了,怎么减都不会溢出。 * 否则可以拿当前delay减去队首的delay来比较看,如果不出现上溢,则整个队列都ok,排序不会乱。 * 不然就把当前delay值给调整为Long.MAX_VALUE+队首delay / private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; }
到此这篇关于Java并发线程池实例分析讲解的文章就介绍到这了,更多相关Java并发线程池内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!