关于Future机制原理及解析
作者:小小少年_
future机制是
在通过线程去执行某个任务的时候,如果比较耗时,我们可以通过futureTask机制,异步返回,继续去执行其他的任务,在需要获取执行结果的时候,通过future.get()方法去获取,如果任务没有执行完毕,会通过lockSupport.park()方法去阻塞主线程,直到run()方法执行完毕之后,会通过lockSupport.unpark()方法去唤醒线程
应用
public class FutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException { MyThread myThread = new MyThread(); FutureTask<String> futureTask = new FutureTask<>(myThread); /** * 同一个futureTask对象,通过多个线程进行多次调用,但是只会执行一次 * 如果是计算的操作,需要进行多次计算操作,需要声明不同的futureTask对象 */ new Thread(futureTask, "A").start(); new Thread(futureTask, "B").start(); System.out.println(Thread.currentThread().getName() + "测试,在调用future.get()方法之前,可以执行其他逻辑 "); System.out.println(futureTask.get()); System.out.println("测试futureTask的get方法阻塞"); } } class MyThread implements Callable<String> { @Override public String call() throws Exception { System.out.println(Thread.currentThread().getName() + " 测试callable"); TimeUnit.SECONDS.sleep(4); return "success"; } }
这里就是模拟thread比较耗时,此时就可以通过futureTask异步返回之后,在需要使用的时候,调用其get方法去获取执行结果,如果call()方法还没有执行完,那futureTask.get()方法会一直阻塞,直到线程执行完毕,获取到执行结果
原理
我们先说future.get()是如何阻塞的,也就是说在线程对应的方法还未执行完时,主线程如何去阻塞?
在源码中,有一个重要的属性
private volatile int state; /** * 在构造函数中,设置为new */ private static final int NEW = 0; /** * 线程正常执行完毕,通过CAS将state修改为completing */ private static final int COMPLETING = 1; /** * */ private static final int NORMAL = 2; /** * 执行线程的时候,如果抛出异常,通过cas修改为exceptional */ private static final int EXCEPTIONAL = 3; /** * 如果调用了cancel(boolean mayInterruptIfRunning)方法 * 入参的mayInterruptIfRunning为true,就通过cas将state设置为INTERRUPTING * 如果为false,就通过cas将state修改为cancelled */ private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; /** * 在调用cancel()方法,入参为true的情况下,如果中断成功,通过cas将state从 * INTERRUPTING修改为INTERRUPTED */ private static final int INTERRUPTED = 6;
接着来说get()方法
get()
可以看到,在get()方法中,会先判断当前state是否小于等于 COMPLETING,如果是,就去阻塞
/** * @throws CancellationException {@inheritDoc} * 在调用future.get()方法的时候,如果线程的run()没有执行完毕,主线程会等待,直到run()方法正确的执行完毕 * 在内部,是通过lockSupport.park()方法去阻塞线程的 * 在该机制中,涉及到一个state状态标识 */ public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
对于get方法来说,最为核心的逻辑是在awaitDone()方法中,在该方法中,会调用lockSupport.park(this),去阻塞当前线程;
所以,我们可以知道,如果主线程在调用future.get()方法的时候,假如此时run()方法还没有执行完毕,会阻塞当前线程,那阻塞之后,总要有一个地方去唤起,在run()方法正常执行完毕之后,会唤醒当前阻塞的线程,继续执行业务逻辑
run()
public void run() { /** * 1.校验当前state是否是new状态且通过cas设置当前线程成功 */ if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; /** * 2.校验当前线程中的callable是否有效且state为new */ if (c != null && state == NEW) { V result; boolean ran; try { /** * 2.1 执行call()方法,如果正常执行完毕,设置ran为true */ result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; /** * 2.2 假如执行报错,被捕获到,在setException方法中,也会去唤醒阻塞的线程 */ setException(ex); } /** * 3.如果ran为true,就调用set方法,在set方法中,有一步跟重要的操作,就是通过lockSupport.unpark()唤醒线程 */ if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
在这里的run()方法中,setException和set(result)方法中,都会调用一个方法:finishCompletion()
set()
/* * 在set方法中,会通过cas更新当前的state状态 * 然后在调用finishCompletion的时候,会唤醒阻塞的线程 * @param v the value */ protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
在线程正常执行完之后,会通过cas将state的状态从new修改为completing,然后再修改为normal
finishCompletion()
/** * Removes and signals all waiting threads, invokes done(), and * nulls out callable. * 这个方法大致的意思是:从等待队列中获取到当前在等待的线程信息,然后通过cas将q设置为null * 最后会通过lockSupport.unPark()方法唤醒线程 */ private void finishCompletion() { // assert state > COMPLETING; /** * 1.从waiters中获取到等待的节点 */ for (WaitNode q; (q = waiters) != null;) { /** * 2.通过cas将q设置为null */ if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; /** * 3.唤醒线程 */ if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
这个方法也不需要做过多解释了,就是从waitNode中,获取到当前等待的线程,然后唤醒
总结
所以:对于future.get()方法,如果run()方法没有执行完成,该方法会阻塞线程,并且在方法正常执行完毕之后,唤醒阻塞的线程,继续去执行对应的业务代码
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。