java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java的Future实现

Java线程池中的Future实现详解

作者:buzhbuzh

这篇文章主要介绍了Java线程池中的Future实现详解, FutureTask是一个任务,FutureTask继承了Runnable、Callable, 通过FutureTask可以获取到任务执行的状态,任务执行完成完成后,将结构通过Future接口返回,调用者可以调用Future#get()方法获取到数据,需要的朋友可以参考下

一 线程池的提交方式

在向线程池提交线程的时候,有两个方法, 一个是executor(Runnable runnable)方法, 另一个是submit(Runnable runnable)方法,

  Future<Integer> result = executor.submit(task); //有返回值
  executor.executor(task)  //没有返回值

二 Runnable、Callable、FutureTask的使用

(1) Runnable是一个任务的概念,这个任务没有返回值

(2) Callable也是一个任务,这个任务有返回值, 不能单独使用,必须配合FutureTask一起使用

(3) FutureTask是一个任务,FutureTask继承了Runnable、Callable, 通过FutureTask可以获取到任务执行的状态

(4) FutureTask任务执行完成完成后,将结构通过Future接口返回,调用者可以调用Future#get()方法获取到数据

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } 
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}

三 FutureTask的实现

3.1  字段属性

(1) 任务的状态:

在这里插入图片描述

(2) 具体执行的任务和返回值、执行的线程、等待结果的线程队列

在这里插入图片描述

3.2  构造方法

构造方法的主要作用是将Runnable、Callable转换为FutureTask

在这里插入图片描述

3.3 任务的执行

任务执行

在这里插入图片描述

FutureTask#run() 方法

public void run() {
   // 如果状态不是 NEW,说明任务已经执行过或者已经被取消,直接返回
   // 如果状态是 NEW,则尝试把执行线程保存在 runnerOffset(runner字段),如果赋值失败,则直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
       // 获取构造函数传入的 Callable 值
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
               // 正常调用 Callable 的 call 方法就可以获取到返回值
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
               // 保存 call 方法抛出的异常
                setException(ex);
            }
            if (ran)
               // 保存 call 方法的执行结果
                set(result);
        }
    } finally {        
        runner = null;       
        int s = state;
       // 如果任务被中断,则执行中断处理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

FutureTask中保存了Callable,在run()方法中调用Callable.call()方法就可以获取到返回值,然后通过 set(result) 保存正常程序运行结果,或通过 setException(ex) 保存程序异常信息

FutureTask#设置返回值方法

** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes

// 保存异常结果
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

// 保存正常结果
protected void set(V v) {
  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    outcome = v;
    UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    finishCompletion();
  }
}

setException() 和 set ()方法非常相似,都是将异常或者结果保存在 Object 类型的 outcome 变量中,outcome 是成员变量,就要考虑线程安全,所以他们要通过 CAS方式设置 outcome 变量的值,既然是在 CAS 成功后 更改 outcome 的值,这也就是 outcome 没有被 volatile 修饰的原因所在。

FutureTask#finishCompletion() 方法

这个方法是唤醒关注执行结果的线程,通知他们去获取任务的执行结果

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                   // 唤醒等待队列中的线程
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}  

3.4 任务的获取

get()方法

public V get() throws InterruptedException, ExecutionException {
    int s = state;
   // 如果 state 还没到 set outcome 结果的时候,则调用 awaitDone() 方法阻塞自己
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
   // 返回结果
    return report(s);
}

awaitDone(boolean timed, long nanos)阻塞线程并且加入阻塞队列中

// get 方法支持超时限制,如果没有传入超时时间,则接受的参数是 false 和 0L
// 有等待就会有队列排队或者可响应中断,从方法签名上看有 InterruptedException,说明该方法这是可以被中断的
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
   // 计算等待截止时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
       // 如果当前线程被中断,如果是,则在等待对立中删除该节点,并抛出 InterruptedException
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
       // 状态大于 COMPLETING 说明已经达到某个最终状态(正常结束/异常结束/取消)
       // 把 thread 只为空,并返回结果
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
       // 如果是COMPLETING 状态(中间状态),表示任务已结束,但 outcome 赋值还没结束,这时主动让出执行权,让其他线程优先执行(只是发出这个信号,至于是否别的线程执行一定会执行可是不一定的)
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
       // 等待节点为空
        else if (q == null)
           // 将当前线程构造节点
            q = new WaitNode();
       // 如果还没有入队列,则把当前节点加入waiters首节点并替换原来waiters
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
       // 如果设置超时时间
        else if (timed) {
            nanos = deadline - System.nanoTime();
           // 时间到,则不再等待结果
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
           // 阻塞等待特定时间
            LockSupport.parkNanos(this, nanos);
        }
        else
           // 挂起当前线程,知道被其他线程唤醒
            LockSupport.park(this);
    }
}

进入这个方法会经历三个循环:

3.5 任务的取消

将一个任务修改为终态的只有三种方法: set()方法 setException()方法 cancel 方法

查看 Future cancel(),该方法注释上明确说明三种 cancel 操作一定失败的情形

其它情况下,cancel操作将返回true。值得注意的是,cancel操作返回 true 并不代表任务真的就是被取消, 这取决于发动cancel状态时,任务所处的状态

如果mayInterruptIfRunning 为true, 则当前在执行的任务会被中断

如果mayInterruptIfRunning 为false, 则可以允许正在执行的任务继续运行,直到它执行完

public boolean cancel(boolean mayInterruptIfRunning) {
  
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
       // 需要中断任务执行线程
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
               // 中断线程
                if (t != null)
                    t.interrupt();
            } finally { // final state
               // 修改为最终状态 INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
       // 唤醒等待中的线程
        finishCompletion();
    }
    return true;
}

到此这篇关于Java线程池中的Future实现详解的文章就介绍到这了,更多相关Java的Future实现内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文