java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java ThreadPoolExecut

一文弄懂Java中ThreadPoolExecutor

作者:赶路人儿

ThreadPoolExecutor是Java中的一个线程池实现,它可以管理和控制多个 Worker Threads,本文就详细的介绍一下Java中ThreadPoolExecutor,具有一定的参考价值,感兴趣的可以了解一下

一、ThreadPoolExecutor类讲解

1、线程池状态:

五种状态:

线程池 的状态

说明

RUNNING

允许提交并处理任务

SHUTDOWN

不允许提交新的任务,但是会处理完已提交的任务

STOP

不允许提交新的任务,也不会处理阻塞队列中未执行的任务,

并设置正在执行的线程的中断标志位

TIDYING

所有任务执行完毕,池中工作的线程数为0,等待执行terminated()勾子方法

TERMINATED

terminated()勾子方法执行完毕

注:SHUTDOWN 状态 和 STOP 状态 先会转变为 TIDYING 状态,最终都会变为 TERMINATED

2、ThreadPoolExecutor构造函数:

ThreadPoolExecutor继承自AbstractExecutorService,而AbstractExecutorService实现了ExecutorService接口。

接下来我们分别讲解这些参数的含义。

2.1)线程池工作原理:

当调用线程池execute() 方法添加一个任务时,线程池会做如下判断:

2.2)KeepAliveTime:

当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

注:如果线程池设置了allowCoreThreadTimeout参数为true(默认false),那么当空闲线程超过keepaliveTime后直接停掉。(不会判断线程数是否大于corePoolSize)即:最终线程数会变为0。

2.3)workQueue 任务队列:

ThreadPoolExecutor线程池推荐了三种等待队列,它们是:SynchronousQueue 、LinkedBlockingQueue 和 ArrayBlockingQueue。

1)有界队列:

2)无界队列:

注意:keepAliveTime和maximumPoolSize及BlockingQueue的类型均有关系。如果BlockingQueue是无界的,那么永远不会触发maximumPoolSize,自然keepAliveTime也就没有了意义。

2.4)threadFactory:

如果不指定线程工厂时,ThreadPoolExecutor 会使用ThreadPoolExecutor.defaultThreadFactory 创建线程。默认工厂创建的线程:同属于相同的线程组,具有同为 Thread.NORM_PRIORITY 的优先级,以及名为 “pool-XXX-thread-” 的线程名(XXX为创建线程时顺序序号),且创建的线程都是非守护进程。

2.5)handler 拒绝策略:

策略

BB

ThreadPoolExecutor.AbortPolicy()

抛出RejectedExecutionException异常。默认策略

ThreadPoolExecutor.CallerRunsPolicy()

由向线程池提交任务的线程来执行该任务

ThreadPoolExecutor.DiscardPolicy()

抛弃当前的任务

ThreadPoolExecutor.DiscardOldestPolicy()

抛弃最旧的任务(最先提交而没有得到执行的任务)

最科学的的还是 AbortPolicy 提供的处理方式:抛出异常,由开发人员进行处理。

3、常用方法:

除了在创建线程池时指定上述参数的值外,还可在线程池创建以后通过如下方法进行设置。

此外,还有一些方法:

4、Executors类:

Executors类的底层实现便是ThreadPoolExecutor! Executors 工厂方法有:

它们均为大多数使用场景预定义了设置。不过在阿里java文档中说明,尽量不要用该类创建线程池。

二、线程池相关接口介绍:

1、ExecutorService接口:

该接口是真正的线程池接口。上面的ThreadPoolExecutor以及下面的ScheduledThreadPoolExecutor都是该接口的实现类。改接口常用方法:

1.1)submit方法示例:

我们知道,线程池接口中有以下三个主要方法,接下来我们看一下具体示例:

1)Callable:

public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 50, 300, TimeUnit.SECONDS, 
			new ArrayBlockingQueue<Runnable>(50),  
			new ThreadFactory(){ public Thread newThread(Runnable r) {
                return new Thread(r, "schema_task_pool_" + r.hashCode());
            }}, new ThreadPoolExecutor.DiscardOldestPolicy());
public static void callableTest() {
	int a = 1;
	//callable
	Future<Boolean> future = threadPool.submit(new Callable<Boolean>(){
		@Override
		public Boolean call() throws Exception {
			int b = a + 100;
			System.out.println(b);
			return true;
		}
	});
	try {
		System.out.println("feature.get");
		Boolean boolean1 = future.get();
		System.out.println(boolean1);
	} catch (InterruptedException e) {
		System.out.println("InterruptedException...");
		e.printStackTrace();
	} catch (ExecutionException e) {
		System.out.println("execute exception...");
		e.printStackTrace();
	} 
}

2)Runnable:

public static void runnableTest() {
	int a = 1;
	//runnable
	Future<?> future1 = threadPool.submit(new Runnable(){
		@Override
		public void run() {
			int b = a + 100;
			System.out.println(b);
		}
	});
	try {
		System.out.println("feature.get");
		Object x = future1.get(900,TimeUnit.MILLISECONDS);
		System.out.println(x);//null
	} catch (InterruptedException e) {
		e.printStackTrace();
	} catch (ExecutionException e) {
		System.out.println("execute exception...");
		e.printStackTrace();
	} catch (TimeoutException e) {
		e.printStackTrace();
	}
}

3)Runnable+result:

class RunnableTask implements Runnable {
	Person p;
	RunnableTask(Person p) {
		this.p = p;
	}
	@Override
	public void run() {
		p.setId(1);
		p.setName("Runnable Task...");
	}
}
class Person {
	private Integer id;
	private String name;
	public Person(Integer id, String name) {
		super();
		this.id = id;
		this.name = name;
	}
	public Integer getId() {
		return id;
	}
	public void setId(Integer id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	@Override
	public String toString() {
		return "Person [id=" + id + ", name=" + name + "]";
	}
}
public static void runnableTest2() {
	//runnable + result
	Person p = new Person(0,"person");
	Future<Person> future2 = threadPool.submit(new RunnableTask(p),p);
	try {
		System.out.println("feature.get");
		Person person = future2.get();
		System.out.println(person);
	} catch (InterruptedException e) {
		e.printStackTrace();
	} catch (ExecutionException e) {
		e.printStackTrace();
	}
}

1.2)线程池执行时,Callable的call方法(Runnable的run方法)抛出异常后,会出现什么?

在上面的例子中我们可以看到,线程池无论是执行Callable还是Runnable,调用返回的Future对象get()方法时需要处理两种异常(如果是调用get(timeout)方法,需要处理三种异常),如下:

//在线程池上运行
Future<Object> future = threadPool.submit(callable);
try {
	System.out.println("feature.get");
	Object x = future.get(900,TimeUnit.MILLISECONDS);
	System.out.println(x);
} catch (InterruptedException e) {
	e.printStackTrace();
} catch (ExecutionException e) {
	System.out.println("execute exception...");
	e.printStackTrace();
} catch (TimeoutException e) {
	e.printStackTrace();
}

1.3)submit()和execute()方法区别:

ExecutorService、ScheduledExecutorService接口的submit()和execute()方法都是把任务提交到线程池中,但二者的区别是

1)submit方法内部实现:

其实submit方法也没有什么神秘的,就是将我们的任务封装成了RunnableFuture接口(继承了Runnable、Future接口),再调用execute方法,我们看源码:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);  //转成 RunnableFuture,传的result是null
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

2)newTaskFor方法内部实现:

newTaskFor方法是new了一个FutureTask返回,所以三个方法其实都是把task转成FutureTask,如果task是Callable,就直接赋值,如果是Runnable 就转为Callable再赋值。

当submit参数是Callable 时:

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;      
    }

当submit参数是Runnable时:

   // 按顺序看,层层调用
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);  //转 runnable 为 callable 
        this.state = NEW; 
    }
   // 以下为Executors中的方法
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    static final class RunnableAdapter<T> implements Callable<T> {  //适配器
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {   
            task.run();
            return result;
        }
    }

看了源码就揭开了神秘面纱了,就是因为Future需要返回结果,所以内部task必须是Callable,如果task是Runnable 就偷天换日,在Runnable 外面包个Callable马甲,返回的结果在构造时就写好。

参考:搞懂Runnable、Callable、Future、FutureTask 及应用_赶路人儿的博客-CSDN博客

1.4)ScheduledExecutorService接口:

继承ExecutorService,并且提供了按时间安排执行任务的功能,它提供的方法主要有:

注:该接口的实现类是ScheduledThreadPoolExecutor。

2、Callable接口:

jdk1.5以后创建线程可以通过一下方式:

1)Callable和Runnale接口区别:

2)执行Callable的线程的方法可以通过以下两种方式:

注:Callable无法直接使用Thread来执行;

我们都知道,Callable带有返回值的,如果我们不需要返回值,却又想用Callable该如何做?

jdk中有个Void类型(大写V),但必须也要return null。

threadpool.submit(new Callable<Void>() {
    @Override
    public Void call() {
        //...
        return null;
    }
});

3)通过Executors工具类可以把Runnable接口转换成Callable接口:

Executors中的callable方法可以将Runnable转成Callable,如下:

public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
}

RunnableAdapter类在上面已经看过源码,原理就是将返回值result作为成员变量,通过参数传递进去,进而实现了Runnable可以返回值。

示例:

public static void test5() {
    	Person p = new Person(0,"person");
    	RunnableTask runnableTask = new RunnableTask(p);//创建runnable
    	Callable<Person> callable = Executors.callable(runnableTask,p);//转换
    	Future<Person> future1 = threadPool.submit(callable);//在线程池上执行Callable
    	try {
			Person person = future1.get();
			System.out.println(person);
		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
		}
    	Runnable runnable = new Runnable() {//创建Runnable
			@Override
			public void run() {
			}
    	};
    	Callable<Object> callable2 = Executors.callable(runnable);//转换
    	Future<Object> future2 = threadPool.submit(callable2);//在线程池上执行Callable
    	try {
    		Object o = future2.get();
			System.out.println(o);
		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
		}
    }

3、Future接口:

3.1)Future是用来获取异步计算结果的接口,常用方法:

通过方法分析我们也知道实际上Future提供了3种功能:

但是Future只是一个接口,我们无法直接创建对象,因此就需要其实现类FutureTask登场啦。

3.2)FutureTask类:

1)FutureTask类的实现:

public class FutureTask<V> implements RunnableFuture<V> {
//...
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

FutureTask实现了Runnable、Future两个接口。由于FutureTask实现了Runnable,因此它既可以通过Thread包装来直接执行,也可以提交给ExecuteService来执行。并且还可以直接通过get()函数获取执行结果,该函数会阻塞,直到结果返回。因此FutureTask既是Future、Runnable,又是包装了Callable( 如果是Runnable最终也会被转换为Callable ), 它是这两者的合体。

2)FutureTask的构造函数:

public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}

3.3)示例:(FutureTask两种构造函数、以及在Thread和线程池上运行)

1)FutureTask包装过的Callable在Thread、线程池上执行:

public static void test3() {
		int a = 1,b = 2;
		Callable<Integer> callable = new Callable<Integer>() {
			@Override
			public Integer call() throws Exception {
				return a + b;
			}
		};
		//通过futureTask来执行Callable
		FutureTask<Integer> futureTask = new FutureTask<>(callable);
		//1.使用Thread执行线程
		new Thread(futureTask).start();
		try {
			Integer integer = futureTask.get();
			System.out.println(integer);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
		//2.使用线程池执行线程
		Executors.newFixedThreadPool(1).submit(futureTask);
		threadPool.shutdown();
		try {
			Integer integer = futureTask.get();
			System.out.println(integer);
		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
		} 
	}

2)FutureTask包装过的Runnable在Thread、线程池上执行:

public static void test4() {
		Person p = new Person(0,"person");
		RunnableTask runnableTask = new RunnableTask(p);
		//创建futureTask来执行Runnable
		FutureTask<Person> futureTask = new FutureTask<>(runnableTask,p);
		//1.使用Thread执行线程
		new Thread(futureTask).start();
		try {
			Person x = futureTask.get();
			System.out.println(x);
		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
		} 
		//2.使用线程池执行线程
		threadPool.submit(futureTask);
		threadPool.shutdown();
		try {
			Person y = futureTask.get();
			System.out.println(y);
		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
		}
	}

Person、RunnableTask类同上面的示例中。

到此这篇关于一文弄懂Java中ThreadPoolExecutor的文章就介绍到这了,更多相关Java ThreadPoolExecut内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文