java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java的Executor框架

Java多线程中的Executor框架解析

作者:得过且过的勇者y

这篇文章主要介绍了Java多线程中的Executor框架解析,Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好,需要的朋友可以参考下

前言

Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)。

Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。

Executor框架的组成:

一、Executor接口

线程池简化了线程的管理工作, 并且JUC提供了一种灵活的线程池实现来作为Executor框架的一部分。

在Java类库中,任务执行的主要抽象不是Thread而是Executor,Executor只定义了execute一个方法,是最顶层的接口。

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);

execute方法接受一个Runnable参数,这个方法定义为在未来的某个时间执行传入的方法,方法的运行可以在一个新的线程,在线程池或者在调用的线程中,该方法无法接收到线程的执行结果(当然Runnable接口本身也没有返回值)。

虽然Executor是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。

Executor的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。

**Executor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。**如果要在程序中实现一个生产者-消费者的设计,那么最简单的方式就是使用Executor。

二、ExecutorService接口

Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,它的run方法执行任务后不能返回一个值或者抛出一个受检查的异常。许多任务实际上都是存在延迟的计算——执行数据库查询,从网络上获取资源,或者计算某个复杂的功能。对于这些任务,Callable是一种更好的抽象,它认为主入口点(call)将返回一个值,并可能抛出一个异常。

因此引入了ExecutorService,它继承自Executor,并且在其基础上增加了很多功能,可以生成用于跟踪一个或多个异步任务进度的Future的方法,以解决Executor的局限性。

Future表示一个任务的生命周期,并且提供了响应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。

在Future规范中包含的隐含意义是任务的生命周期只能前进,不能后退,就像ExecutorService的生命周期一样。当某个任务完成后,它就永远停留在完成状态。

get方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已完成那么get会立即返回或者抛出一个Exception,如果任务没有完成,那么get将阻塞并直到任务完成。如果任务抛出了一场,那么get将该异常封装为ExecutionException并重新抛出。如果任务被取消,那么get将抛出CancellationException。如果get抛出了ExecutionException,那么可以通过getCause来获得被封装的初始异常。

定义了以下方法:

三、ThreadPoolExecutor类

ThreadPoolExecutor实现了ExecutorService(实际上是继承了AbstractExecutorService),为了在广泛的上下文中发挥作用,该类提供了许多可调参数和可扩展性挂钩:

1、状态

  1. RUNNING(-1<<29):接受新任务并且处理排队任务
  2. SHUTDOWN(0<<29):不接受新任务但是处理排队任务
  3. STOP(1<<29):不接受新任务也不处理排队任务,同时中断处理中的任务
  4. TYDING(2<<29):所有任务被终止,工作线程数为0之后进入该状态,并且会执行terminated()钩子函数
  5. TERMINATED(3<<29):terminated()钩子函数执行完毕后

状态单调地随时间增加,但不需要达到每个状态。

ThreadPoolExecutor中对于状态的记录保存在一个AtomicInteger类型的变量中,其中高三位就是用于记录线程池的状态,而低的29位用于记录线程数量。

2、Worker

ThreadPoolExecutor中定义了一个私有静态类Worker,其继承自AbstractQueuedSynchronizer类,并实现了Runnable接口。其中维护了线程实例(Thread)、任务实例(Runnable)、线程任务计数器(long)变量。

这个类适当地扩展了AbstractQueuedSynchronizer,以简化获取和释放围绕每个任务执行的锁。这可以防止中断,这些中断旨在唤醒等待任务的工作线程,而不是中断正在运行的任务。我们实现了一个简单的不可重入互斥锁,而不是使用ReentrantLock,因为我们不希望工作任务在调用setCorePoolSize等池控制方法时能够重新获得锁。此外,为了在线程实际开始运行任务之前抑制中断,我们将锁状态初始化为负值,并在启动时(在runWorker中)清除它。

3、扩展

该类还定义了三个protected类型的钩子函数:

这几个方法在ThreadPoolExecutor中为空实现。

四、ForkJoinPool类

Fork/Join框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架

Fork就是把一个大任务切分为若干个子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+…+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。

1、工作窃取算法

ForkJoinPool是运行ForkJoinTasks的ExecutorService。**ForkJoinPool与其他类型的ExecutorService的区别主要在于使用了工作窃取。工作窃取算法是指某个线程从其他队列里窃取任务来执行。**那么为什么要使用工作窃取算法呢?**假如我们需要做一个比较大的任务,可以把这个任务分割为若干个互不干扰的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。**比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而这时它们会访问同一个队列,所以为了减少窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争

工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。

2、Fork/Join的设计

想要设计一个Fork/Join框架,需要完成两个步骤:

Fork/Join使用两个类来完成以上两件事情:

任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个线程。

public class CountTask extends RecursiveTask<Integer> {
	private int start;
	private int end;
	public CountTask(int start, int end) {
		this.start = start;
		this.end = end;
	}
	@Override
	protected Integer compute() {
		int sum = 0;
		// 如果任务足够小就计算任务
		boolean canCompute = (end - start) <= THRESHOLD;
		if (canCompute) {
			for (int i = start; i <= end; i++) {
				sum += i;
			}
		} else {
			// 如果任务大于阈值,就分裂成两个子任务计算
			int middle = (start + end) / 2;
			CountTask leftTask = new CountTask(start, middle);
			CountTask rightTask = new CountTask(middle + 1, end);
			// 执行子任务
			leftTask.fork();
			rightTask.fork();
			// 等待子任务执行完,并得到其结果
			int leftResult = leftTask.join();
			int rightResult = rightTask.join();
			// 合并子任务
			sum = leftResult + rightResult;
		}
		return sum;
	}
	public static void main(String[] args) {
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		CountTask task = new CountTask(1, 4);
		// 执行一个任务
		Future<Integer> result = forkJoinPool.submit(task);
		try {
			System.out.println(result.get());
		} catch (InterruptedException e) {
		} catch (ExecutionException e) {
		}
	}
}

RecursiveTask需要实现compute方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小就必须分割成两个子任务,每个子任务在调用fork方法时又会进入compute方法。最后使用join方法等待子任务执行完成并得到其结果。

ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法来获取异常。

getException方法返回Throwable对象,如果任务被取消了则返回CancellationException,如果任务没有完成或者没有抛出异常则返回null。

3、执行原理

ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。

当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果。

pushTask方法把当前任务存放在ForkJoinTask数组队列里,然后再调用ForkJoinPool的signalWork反复噶唤醒或创建一个工作线程来执行任务。

五、ScheduledThreadPool类

ScheduledThreadPoolExecutor主要用来在给定的延迟后运行任务,或者定期执行任务。ScheduledThreadPoolExecutor使用任务队列DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask的time变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask的squenceNumber变量小的先执行)。

1、ScheduledExecutorService

ScheduledThreadPool类继承自ThreadPoolExecutor,并且实现了ScheduledExecutorService接口。

ScheduledExecutorService接口定义了几个方法:

  1. ScheduleFuture<?> schedule(Runnable command, long delay, TimeUnit unit):提交在给定延迟后启用的一次性任务
  2. ScheduleFuture schedule(Callable command, long delay, TimeUnit unit):提交在给定延迟之后启用的带返回值的一次性任务
  3. ScheduleFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):提交一个周期性任务,任务开始时进行计时(如果任务执行时间过长甚至超过period时间,会导致任务连续执行)
  4. ScheduleFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):提交一个周期性任务,任务执行完成之后才进行计时

ScheduledFuture继承自Delayed接口和Future接口,自己本身没有定义新的方法。

Delayed接口是一个混合风格的接口,用于标记应该在给定延迟后执行的对象。这个接口定义了一个getDelay方法,用于返回剩余的延迟时间。此外这个接口继承了Comparable接口,意味着此接口的实现必须定义一个compareTo方法,该方法提供与其getDelay方法一致的排序

2、比较Timer

Timer对系统时钟的变化敏感,ScheduledThreadPoolExecutor不是

Timer只有一个执行线程,因此长时间运行的任务可以延迟其他任务。 ScheduledThreadPoolExecutor可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory),你可以完全控制创建的线程

在TimerTask中抛出的运行时异常会杀死一个线程,从而导致 Timer 死机,即计划任务将不再运行。ScheduledThreadExecutor不仅捕获运行时异常,还允许您在需要时处理它们(通过重写afterExecute方法ThreadPoolExecutor)。抛出异常的任务将被取消,但其他任务将继续运行

六、Executors类

Executors是Java中用于创建线程池的工厂类,它提供了一系列的静态工厂方法,用于创建不同类型的线程池。这些工厂方法隐藏了线程池的复杂性,使得线程池的创建变得非常简单。Executors工厂类提供的线程池有以下几种类型:

除此之外还提供了创建ThreadFactory实例,将Runnable实例转换为Callable实例等方法。

到此这篇关于Java多线程中的Executor框架解析的文章就介绍到这了,更多相关Java的Executor框架内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文