java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > executors线程池创建的线程不释放问题

如何解决executors线程池创建的线程不释放的问题

作者:TimiWang

这篇文章主要介绍了如何解决executors线程池创建的线程不释放的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

executors线程池创建的线程不释放问题

我们通过executors.newFixThreadPool 创建指定大小的线程池,在所有线程都结束后,线程池并未释放线程。

我们之后再新建的线程池的线程将一直累加。

解决这个问题

只需要设置如下:

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);
             executor.setKeepAliveTime(10, TimeUnit.SECONDS);
             executor.allowCoreThreadTimeOut(true);

这样设置后,队列的任务全部结束后将,释放所有的线程。

线程池中的线程为什么不会释放而是循环等待任务呢

线程池

之前一直有这个疑问:我们平时使用线程都是各种new Thread(),然后直接在run()方法里面执行我们要做的各种操作,使用完后需要做什么管理吗?线程池为什么能维持住核心线程不释放,一直接收任务进行处理呢?

线程

线程无他,主要有两个方法,我们先看看start()方法介绍:

    /**
     * Causes this thread to begin execution; the Java Virtual Machine
     * calls the <code>run</code> method of this thread.
     * <p>
     * The result is that two threads are running concurrently: the
     * current thread (which returns from the call to the
     * <code>start</code> method) and the other thread (which executes its
     * <code>run</code> method).
     * <p>
     * It is never legal to start a thread more than once.
     * In particular, a thread may not be restarted once it has completed
     * execution.
     *
     * @exception  IllegalThreadStateException  if the thread was already
     *               started.
     * @see        #run()
     * @see        #stop()
     */
    public synchronized void start() {
        if (threadStatus != 0)
            throw new IllegalThreadStateException();
        /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. */
        group.add(this);
        started = false;
        try {
            nativeCreate(this, stackSize, daemon);
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }

带着各种疑问,我们去看看线程池自己是怎么实现的。

线程池

线程池常用的创建方法有那么几种:

这4个方法创建的线程池实例具体就不一一介绍,无非是创建线程的多少,以及回收等问题,因为其实这4个方法最后都会调用统一的构造方法:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

具体来说只是这几个值的不同决定了4个线程池的作用:

1. corePoolSize 代表核心线程池的个数,当线程池当前的个数大于核心线程池的时候,线程池会回收多出来的线程

2. maximumPoolSize 代表最大的线程池个数,当线程池需要执行的任务大于核心线程池的时候,会创建更多的线程,但是最大不能超过这个数

3. keepAliveTime 代表空余的线程存活的时间,当多余的线程完成任务的时候,需要多长时间进行回收,时间单位是unit 去控制

4. workQueue 非常重要,这个工作队列会存放所有待执行的Runnable对象

@param workQueue the queue to use for holding tasks before they areexecuted.  This queue will hold only the {@code Runnable} tasks submitted by the {@code execute} method.

我们平时在使用线程池的时候,都是直接 实例.execute(Runnable),一起跟进去,看看这个方法具体做了什么

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        //结合上文的注释,我们得知,第一次,先判断当前的核心线程数,
        //如果小于初始化的值,马上创建;然后第二个if,将这个任务插入到工作线程,双重判断任务,
        //假定如果前面不能直接加入到线程池Worker集合里,则加入到workQueue队列等待执行。
        //里面的if else判断语句则是检查当前线程池的状态。如果线程池本身的状态是要关闭并清理了,
        //我们则不能提交线程进去了。这里我们就要reject他们。
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

所以其实主要起作用的还是addWorker()方法,我们继续跟踪进去:

private boolean addWorker(Runnable firstTask, boolean core) {
        ···多余代码
        try {
            w = new Worker(firstTask);  1.重点
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();   2. 重点
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

我们看重点部分,其实最重要的是firstTask这个Runnable,我们一直跟踪这个对象就可以了,这个对象会new Worker(),那么这个wroker()就是一个包装类,里面带着我们实际需要执行的任务,后面进行一系列的判断就会执行t.start(); 这个t 就是包装类worker类里面的Thread,所以整个逻辑又转化进去Worker内部。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }
        ...省略代码
    }

这个Worker包装类,重要的属性两个,thread 就是刚才上面那个方法执行的start()对象,这个thread又是把这个worker对象本身作为一个Runnable对象构建出来的,那么当我们调用thread.start()方法时候,实际调用的就是Worker类的run()方法。现在又要追踪进去,看这个runWorker(this),做的是什么鬼东西

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

这个方法还是比较好懂的:

1. 一个大循环,判断条件是task != null || (task = getTask()) != null,task自然就是我们要执行的任务了,当task空而且getTask()取不到任务的时候,这个while()就会结束,循环体里面进行的就是task.run();

2.这里我们其实可以打个心眼,那基本八九不离十了,肯定是这个循环一直没有退出,所以才能维持着这一个线程不断运行,当有外部任务进来的时候,循环体就能getTask()并且执行。

3.下面最后放getTask()里面的代码,验证猜想

   private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

真相大白了,里面进行的也是一个死循环,主要看 Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();

工作队列workQueue会一直去拿任务,属于核心线程的会一直卡在 workQueue.take()方法,直到拿到Runnable 然后返回,非核心线程会 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果超时还没有拿到,下一次循环判断compareAndDecrementWorkerCount就会返回null,Worker对象的run()方法循环体的判断为null,任务结束,然后线程被系统回收

注意:

一句话可以概述了,线程池就是用一堆包装住Thread的Wroker类的集合,在里面有条件的进行着死循环,从而可以不断接受任务来进行。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文