java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > 线程池ThreadPoolExecutor应用

线程池ThreadPoolExecutor应用过程

作者:hi,你礼貌吗

这篇文章主要介绍了如何使用ThreadPoolExecutor创建线程池,包括其构造方法、常用方法、参数校验以及如何选择合适的拒绝策略,文章还讨论了为什么应该强制要求使用ThreadPoolExecutor创建线程池,并提供了在项目和Spring中创建线程池的示例

一个老生常谈的话题:线程的创建及销毁是非常消耗时间及资源的,所以线程应该交由线程池去执行。

ThreadPoolExecutor构造说明及常用方法

ThreadPoolExecutor提供了很多构造方法,这里主要说以下这个,其他构造方法都是基于此方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

下面是构造ThreadPoolExecutor的参数校验:

其他常用方法:

参数举例说明:

1、当创建一个任务并交给线程池执行执行后,会从消耗一个核心线程资源,当该任务执行完毕之后,会变成一个空闲线程,另外,如果核心线程数足以支撑任务运行,即使有核心空闲线程,依然会创建一个新的核心线程资源;

2、当加入线程池的任务超过核心线程数,会进入workQueue中等待执行;

3、当加入线程池的任务超过核心线程数,溢出的任务数如果超过等待队列长度,会直接创建大于corePoolSize且小于maximumPoolSize的线程资源用于执行任务,超过核心线程数的部分称为临时线程;

4、keepAliveTime:当线程池被扩大到corePoolSize与maximumPoolSize之间之后,当执行完所有任务后,不再有新的任务进来,超出corePoolSize的部分会作为空闲线程,会在keepAliveTime指定时间后,线程池大小缩小为corePoolSize,如果设置allowCoreThreadTimeOut(true),则空闲线程一直不会销毁;

5、BlockingQueue<Runnable> workQueue:用于保存等待执行的任务的阻塞队列接口,java提供了以下实现类:

线程池中,BlockingQueue是典型的"生产者消费者"模型:生产者调用插入元素的方法(add/offer/put等方法),消费者调用移除元素的方法(remove/poll/take等方法),其中,生产者插入和消费者移除都有阻塞和非阻塞的方法,offer/poll是非阻塞方法,put/take是阻塞方法(队列后面单独再写);

ThreadPoolExecutor提交任务采用的是不阻塞的offer方法,从队列中获取任务采用的是阻塞的take方法,正是因为ThreadPoolExecutor使用了不阻塞的offer方法,所以当队列容量已满,线程池会去创建新的临时线程,去处理队列中溢出的任务。

综上,需要根据每个队列的特点选择适合自己场景的队列。

6、RejectedExecutionHandler handler:当队列和线程池都满了(从上面可以看出,一个线程池可以容纳的的最大任务数是maximumPoolSize+队列长度,因此使用无界队列会造成队列永不会满的情况,一旦控制不好就容易出现OOM),说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。

1、AbortPolicy:直接抛出异常;

2、CallerRunsPolicy:只用调用者所在线程来运行任务;

3、DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务;

4、DiscardPolicy:不处理,丢弃掉;

5、也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务;

注:AbortPolicy/CallerRunsPolicy/DiscardOldestPolicy/DiscardPolicy都是ThreadPoolExecutor的内部类,如下:

所以,实例化需要用内部类的方式,如下:

AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();

自定义RejectedExecutionHandler拒绝策略示例:

public class RejectedExecutionHandlerTest implements RejectedExecutionHandler {

	@Override
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		// 啥逻辑都没有,表示忽略,比如DiscardPolicy
		try {
			executor.getQueue().put(r);
		} catch (InterruptedException e) {
			// 	TODO 记录错误日志等
			e.printStackTrace();
		}
		
	}
}

ThreadPoolExecutor的特点总结起来就是以下4点

1、当有任务提交的时候,会创建核心线程去执行任务(即使有核心线程空闲);

2、当核心线程数达到corePoolSize时,后续提交的都会进BlockingQueue中排队;

3、当BlockingQueue满了(offer失败),就会创建临时线程(临时线程空闲超过一定时间后,会被销毁),其中临时线程最大数量=maximumPoolSize - corePoolSize,空闲最大时间由keepAliveTime控制,如果设置allowCoreThreadTimeOut(true),临时线程永不销毁;

4、当线程总数达到maximumPoolSize 时,后续提交的任务都会被RejectedExecutionHandler拒绝。

为什么强制要求使用ThreadPoolExecutor创建线程池

在Executor类中,提供了以下场景的创建线程池的方法:

1、newCachedThreadPool,特点:必要时创建新线程,空闲线程会保留60s;

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

根据上面的ThreadPoolExecutor,允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM,还无法指定拒绝策略。

2、newFixedThreadPool,特点:包含固定的线程数,空闲线程会被一直保留;

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }


public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM,还无法指定拒绝策略。

3、newSingleThreadExecutor,特点:只有一个线程的池,顺序执行每一个提交的任务;

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }



public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM,还无法指定拒绝策略。

4、ScheduledThreadPoolExecutor,特点:用于构建具有延时队列的线程池,空闲线程一直被保留;

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}

允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

............省略Executors其他创建线程的方法。

从上面可以看到,使用Executors创建的线程池,虽然都是ThreadPoolExecutor,但是灵活度都不高,且创建出来的是具有一定特色的线程池,使用ThreadPoolExecutor类本身去创建线程池,除了可以更加明确线程池的运行规则,还能更多的规避资源耗尽的风险,毕竟我命由我不由天。

当然,ThreadPoolExecutor提供了相关参数的set方法,如果一定要用Executors去创建,那么记得调整一下相关参数,避免大量任务堆积,产生OOM。

在项目中创建线程池

如果需要在项目中创建线程池,那么应该将它设置成单例模式。

示例,用枚举的方式创建单例:

// 线程工厂
public class UserThreadFactory implements ThreadFactory {

	// 线程组命名标识
	private final String namePrefix;
	// 线程编号
	private final AtomicInteger nextId = new AtomicInteger(1);

	// 定义线程组名称,在 jstack 问题排查时,非常有帮助
	UserThreadFactory(String whatFeaturOfGroup) {
		namePrefix = "From UserThreadFactory's " + whatFeaturOfGroup + "-Worker-";
	}

	@Override
	public Thread newThread(Runnable task) {
		String name = namePrefix + nextId.getAndIncrement();
		Thread thread = new Thread(task, name);
		return thread;
	}

}


// 单例线程池
public enum ThreadPoolEnum {

	INSTANCE;
	
	private ThreadPoolExecutor threadPoolExecutor;

	// 枚举的特性,在JVM中只会被实例化一次
	private ThreadPoolEnum() {
		threadPoolExecutor = new ThreadPoolExecutor(2, 
				4, 10, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(), 
				new UserThreadFactory("first-threadPool"),
				new ThreadPoolExecutor.DiscardPolicy());
	}
	
	public ThreadPoolExecutor getInstance() {
        return threadPoolExecutor;
    }
	
	public static void main(String[] args) throws InterruptedException {
		ThreadPoolExecutor executor1 = ThreadPoolEnum.INSTANCE.getInstance();
		ThreadPoolExecutor executor2 = ThreadPoolEnum.INSTANCE.getInstance();
		System.out.println(executor1 == executor2);
		Runnable r = () -> {
			System.out.println("新创建的线程任务,交由线程池执行");
			System.out.println(Thread.currentThread().getName());
		};
		executor1.execute(r);
		// 销毁线程池
		executor1.shutdown();
	}
	
}

执行结果:

在Spring中创建线程池

@Configuration
public class ThreadPoolConfig {

	// 线程池的参数配置可由外部配置引入
	@Bean(destroyMethod = "shutdown")
	public ThreadPoolExecutor initThreadPoolExecutor() {
		return new ThreadPoolExecutor(2, 
				4, 10, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(), 
				new ThreadPoolExecutor.DiscardPolicy());
	}
	
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文