Java中的FutureTask源码解析
作者:正经人z.
一、简介
1、FutureTask是一个可取消的异步计算。这个类是Future的实现类,有开始和取消一个计算的方法,如果一个计算已经完成可以查看结果。如果在计算没有完成的情况下调用get获取计算结果会阻塞。且一旦任务完成后,计算不能重新开始或被取消,除非计算被runAndReset调用执行。
2、FutureTask被用来去封装一个Callable或者Runnable,一个FutureTask能够被submit作为一个Executor
3、FutureTask 的线程安全由CAS来保证。
二、源码分析
1、成员属性
public class FutureTask<V> implements RunnableFuture<V> { //state表示的任务的状态 private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; //任务 private Callable<V> callable; //存储任务完成以后的结果 private Object outcome; //执行当前任务的线程 private volatile Thread runner; //执行当前任务被阻塞的线程 private volatile WaitNode waiters; }
可能有的状态转换:
NEW -> COMPLETING -> NORMAL NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED NEW -> INTERRUPTING -> INTERRUPTED
注意:state用volatile修饰的,如果在多线程并发的情况下,某一个线程改变了任务的状态,其他线程都能够立马知道,保证了state字段的可见性。
2、构造函数
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; }
很好的诠释了FutureTask封装了Runnable或Callable,构造完成后将任务的状态变为NEW。同时注意,封装Runnable时用的Executors的静态方法callable
顺带看下Executors.callable()这个方法,这个方法的功能是把Runnable转换成Callable,代码如下:
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
所以,FutureTask封装Runnable使用了适配器模式的设计模式
3、核心方法
//运行任务的方法 public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; //得到当前任务 if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); //当前任务调用call方法,执行,同时,执行完后将结果返回 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) //表示任务执行成功 set(result); //CAS改变任务的状态从NEW->COMPLETING->NORMAL,同时将任务返回的结果保存到outcome属性中,再移除并唤醒所有等待线程 } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; //将任务成功执行完后返回的结果保存到outcome中 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终的状态,表示任务结束 finishCompletion(); //移除并唤醒所有等待线程 } } //该方法用于移除并唤醒所有等待线程 private void finishCompletion() { for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); //唤醒 } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; } public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); //打断 } finally { // 设置成为最终态INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); //移除并唤醒所有等待线程 } return true; } public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); //如果任务没有完成或者其他的问题,将阻塞;创建一个新节点存入阻塞栈中 return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
三、示例
常用使用方式:
- 第一种方式: Future + ExecutorService
- 第二种方式: FutureTask + ExecutorService
- 第三种方式: FutureTask + Thread
第一种方式:Future + ExecutorService
public class FutureDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); Future future = executorService.submit(new Callable<Object>() { @Override public Object call() throws Exception { Long start = System.currentTimeMillis(); while (true) { Long current = System.currentTimeMillis(); if ((current - start) > 1000) { return 1; } } } }); try { Integer result = (Integer)future.get(); System.out.println(result); }catch (Exception e){ e.printStackTrace(); } } }
第二种方式:FutureTask + ExecutorService
ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); executor.submit(futureTask);
第三种方式:FutureTask + Thread
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task()); Thread thread = new Thread(futureTask); thread.setName("Task thread"); thread.start();
四、总结
1、FutureTask用来封装Runnable或者Callable接口,可以当成一个任务。
2、在Java并发程序中FutureTask表示一个可以取消的异步运算。它有启动和取消运算、查询运算是否完成和取回运算结果等方法。只有当运算完成的时候结果才能取回,如果运算尚未完成get方法将会阻塞。一个FutureTask对象可以对调用了Callable和Runnable的对象进行包装,由于FutureTask也是调用了Runnable接口所以它可以提交给Executor来执行。
3、FutureTask可用于异步获取执行结果或取消执行任务的场景,通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。
4、FutureTask间接继承了Runnable和Callable
5、FutureTask的线程安全由CAS操作来保证
6、FutureTask结果返回机制 :只有任务成功执行完成后,通过get方法能够得到任务返回的结果,其他情况都会导致阻塞。
到此这篇关于Java中的FutureTask源码解析的文章就介绍到这了,更多相关FutureTask源码解析内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!