java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java异步编程接口和类

Java开发异步编程中常用的接口和类示例详解

作者:羡寒.

在现代Java中异步编程通常通过不同的机制和接口来实现,这篇文章主要介绍了Java开发异步编程中常用的接口和类的相关资料,文中通过代码介绍的非常详细,+需要的朋友可以参考下

Callable 与 Runnable接口

  1. Callable接口类似于Runnable接口,都是用于实现一个线程任务。
  2. Callable实现的线程可以返回执行结果,但是Runnable不可以返回执行结果。
  3. Callable的run方法会抛出编译时异常,而Runnable没有抛出异常。
  4. 可以使用工厂类Executors把Runnable包装成Callable。

Runnable使用方式一:使用Thread运行

public class ThreadCreationDemo {

    // 第一种方式需要一个实现了Runnable接口的类
    static class MyRunnable implements Runnable {
        @Override
        public void run() {
            // 打印当前执行此代码的线程的名称
            System.out.println("方法一 (MyRunnable): 线程 " + Thread.currentThread().getName() + " 正在运行。");
        }
    }

    public static void main(String[] args) {


        // 第一种方式: 使用一个独立的类去实现Runnable接口
        // 这种方法结构清晰,适用于逻辑比较复杂的场景。
        Thread t1 = new Thread(new MyRunnable());
        t1.start();

        // 第二种方式: 使用Lambda表达式实现一个Runnable
        // 这是Java 8+中最推荐的简洁写法。
        Thread t2 = new Thread(() -> {
            System.out.println("方法二 (Lambda):   线程 " + Thread.currentThread().getName() + " 正在运行。");
        });
        t2.start();

        // 第三种方式: 使用匿名内部类实现一个Runnable
        // 这是在Java 8出现之前常用的方法。
        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("方法三 (匿名内部类): 线程 " + Thread.currentThread().getName() + " 正在运行。");
            }
        });
        t3.start();
    }
}

Runnable使用方式二:使用线程池执行

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorServiceDemo {

    // 定义一个实现了Runnable接口的类,用于作为要执行的任务
    static class MyRunnable implements Runnable {
        @Override
        public void run() {
            System.out.println("任务由线程: " + Thread.currentThread().getName() + " 执行。");
            try {
                // 模拟任务执行耗时
                Thread.sleep(500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public static void main(String[] args) {

        // 第二种方式: 使用“线程池”去运行Runnable
        // 使用Executors工厂类创建一个单线程的线程池
        // 这个线程池保证任务是按提交顺序依次执行的
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        System.out.println("使用 executorService.execute() 提交任务。");
        // execute(Runnable): 提交一个任务,没有返回值。
        executorService.execute(new MyRunnable());

        System.out.println("使用 executorService.submit() 提交任务。");
        // submit(Runnable): 同样提交一个任务,但会返回一个Future对象。
        // 这个Future对象可以用来判断任务是否执行完毕.
        Future<?> future = executorService.submit(new MyRunnable());

        // --- 把Runnable通过工具类转成Callable ---
        // Callable与Runnable类似,但它可以有返回值并能抛出异常。
        // Executors.callable()方法可以将一个Runnable转换为Callable,
        // 其执行结果为null。
        Callable<Object> callable = Executors.callable(new MyRunnable());
        System.out.println("提交转换后的Callable任务。");
        Future<Object> callableFuture = executorService.submit(callable);


        // --- 关闭线程池 ---
        executorService.shutdown();

    }
}

Callable使用方式一:包装到FutureTask里面,然后通过Thread运行

因为FutureTask实现了Runnable接口,所以可以作为参数传入Thread中

一般写法

代码示例:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        
        // 1. 创建一个有返回值的任务 (Callable)
        Callable<String> myTask = () -> {
            System.out.println("子线程: 正在执行任务...");
            Thread.sleep(2000); // 模拟耗时2秒
            return "任务执行完毕,这是结果";
        };

        // 2. 将任务包装成 FutureTask (如图所示)
        FutureTask<String> futureTask = new FutureTask<>(myTask);

        // 3. 把 FutureTask 交给一个新线程去执行 (如图所示)
        // 因为 FutureTask 实现了 Runnable 接口,所以 Thread 可以直接运行它
        Thread thread = new Thread(futureTask);
        thread.start();

        System.out.println("主线程: 已启动子线程,我先忙别的...");

        // 4. 在主线程中获取任务结果
        // get() 方法会阻塞当前线程,直到子线程的任务完成
        System.out.println("主线程: 现在需要结果了,开始等待...");
        String result = futureTask.get(); // 等待并获取结果

        System.out.println("主线程: 终于拿到了结果 - \"" + result + "\"");
    }
}

运行结果:

主线程: 已启动子线程,我先忙别的...
子线程: 正在执行任务...
主线程: 现在需要结果了,开始等待...
(程序在这里会暂停2秒)
主线程: 终于拿到了结果 - "任务执行完毕,这是结果"

也可以用匿名内部类和Lambda表达式来定义Callable任务

匿名内部类写法

匿名内部类是在Java 8之前的标准写法,语法上会显得比较冗长。

// 使用匿名内部类来定义 Callable 任务
FutureTask<String> futureTask_anonymous = new FutureTask<>(new Callable<String>() {
    @Override
    public String call() throws Exception {
        System.out.println("通过匿名内部类执行...");
        Thread.sleep(1000);
        return "匿名内部类的结果";
    }
});

// 然后同样交给 Thread 或线程池执行
// Thread thread = new Thread(futureTask_anonymous);
// thread.start();

Lambda表达式写法

Lambda表达式是Java 8及以后推荐的函数式编程写法,非常简洁。

由于Callable接口是一个函数式接口(只有一个抽象方法call()),所以可以被Lambda表达式替代。

// 使用Lambda表达式来定义 Callable 任务
FutureTask<String> futureTask_lambda = new FutureTask<>(() -> {
    System.out.println("通过Lambda表达式执行...");
    Thread.sleep(1000);
    return "Lambda的结果";
});

// 然后同样交给 Thread 或线程池执行
// Thread thread = new Thread(futureTask_lambda);
// thread.start();

Callable使用方式二:使用线程池执行

execute方法只能接收Runnable对象,所以Callable对象不能使用线程池的execute方法执行,只能使用submit方法执行。

执行会返回Future类的对象,对这个对象调用get()方法即可以拿到返回值。

示例代码:

import java.util.concurrent.*;

public class ExecutorServiceDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 1. 创建一个线程池 (图中是单线程的,更常用的是固定大小的线程池)
        ExecutorService executorService = Executors.newFixedThreadPool(2); // 例如,一个有两个线程的池

        System.out.println("主线程: 准备将任务提交给线程池。");

        // 2. 将 Callable 任务提交给线程池 (直接使用Lambda表达式定义任务)
        // submit() 方法会立即返回一个 Future 对象
        Future<String> future = executorService.submit(() -> {
            System.out.println("子线程 (" + Thread.currentThread().getName() + "): 开始执行耗时任务...");
            Thread.sleep(2000); // 模拟耗时2秒
            return "任务完成,这是来自线程池的结果";
        });

        System.out.println("主线程: 任务已提交,我可以继续做其他事情...");
        // 在这里,主线程可以执行其他不受阻塞的代码

        // 3. 在需要结果时,调用 future.get() 阻塞等待
        System.out.println("主线程: 现在需要结果了,开始等待...");
        String result = future.get(); // 阻塞,直到子线程任务完成并返回结果

        System.out.println("主线程: 成功获取到结果 -> \"" + result + "\"");

        // 4. 关闭线程池 (这是必须的步骤,否则JVM不会退出)
        executorService.shutdown();
    }
}

Future接口

  1. 一个Future表示一个异步任务的结果;
  2. 它提供了一些方法用来检查异步是不是已经完成,没有完成就等待,待其完成后取回异步执行的结果;
  3. 异步执行的结果,只能通过get()方法获取,get()方法是阻塞的,如果异步执行没有执行结束 ,则阻塞直至拿到结果;
  4. 它提供了一个cancel()方法用于取消异步执行,执行完毕后不可取消;
  5. 如果不想要返回的结果,也可以把底层的任务声明为返回null。

Future接口有5个方法:

import java.util.concurrent.*;

public class FutureMethodsDemo {

    public static void main(String[] args) {

        // 创建一个固定大小为2的线程池
        ExecutorService executor = Executors.newFixedThreadPool(2);

        System.out.println("--- 演示 get(), isDone() ---");
        // 提交一个 Callable 任务,该任务会模拟2秒的计算,然后返回一个字符串结果
        Future<String> future1 = executor.submit(() -> {
            System.out.println("任务1: 开始执行,预计耗时2秒...");
            Thread.sleep(2000);
            return "任务1执行完毕!";
        });

        // 1. isDone()
        // 在任务刚提交时,检查它是否完成
        System.out.println("提交任务1后,立即检查 isDone(): " + future1.isDone());

        try {
            // 2. get()
            // 这是一个阻塞方法,主线程会在这里等待,直到任务1执行完成并返回结果
            System.out.println("调用 future1.get(),主线程等待中...");
            String result1 = future1.get();
            System.out.println("future1.get() 获得结果: \"" + result1 + "\"");

            // 任务完成后,再次检查isDone()
            System.out.println("任务1完成后,再次检查 isDone(): " + future1.isDone());

        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        System.out.println("\n--- 演示 get(timeout, unit) ---");
        // 提交一个需要3秒才能完成的任务
        Future<String> future2 = executor.submit(() -> {
            System.out.println("任务2: 开始执行,预计耗时3秒...");
            Thread.sleep(3000);
            return "任务2执行完毕!";
        });

        try {
            // 3. get(long timeout, TimeUnit unit)
            // 我们只等待1秒,由于任务需要3秒,所以这必定会超时
            System.out.println("调用 future2.get(1, TimeUnit.SECONDS),主线程最多等待1秒...");
            String result2 = future2.get(1, TimeUnit.SECONDS);
            System.out.println("在1秒内获取到结果: " + result2);
        } catch (TimeoutException e) {
            // 这里会捕获到超时异常
            System.out.println("等待超时! " + e.getClass().getName());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }


        System.out.println("\n--- 演示 cancel(), isCancelled() ---");
        // 提交一个长时间运行的任务
        Future<String> future3 = executor.submit(() -> {
            System.out.println("任务3: 开始执行,这是一个可能被中断的长任务...");
            try {
                Thread.sleep(5000); // 模拟一个非常耗时的操作
            } catch (InterruptedException e) {
                // 如果任务被中断,会进入这里
                System.out.println("任务3: 执行被中断!");
                return "已被中断";
            }
            return "任务3正常完成";
        });
        
        // 为了确保 cancel() 在任务开始后执行,我们稍微等待一下
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 4. cancel(boolean mayInterruptIfRunning)
        // 尝试取消任务的执行。参数true表示如果任务已经在运行,就中断执行它的线程。
        System.out.println("尝试取消任务3...");
        boolean cancelResult = future3.cancel(true);
        System.out.println("cancel(true) 方法返回: " + cancelResult);

        // 5. isCancelled()
        // 检查任务是否已经被成功取消
        System.out.println("调用cancel后,检查 isCancelled(): " + future3.isCancelled());

        // 再次检查isDone()。一个被取消的任务也被认为是“完成”的。
        System.out.println("调用cancel后,检查 isDone(): " + future3.isDone());
        
        try {
            // 试图获取一个已取消任务的结果,会抛出 CancellationException
            System.out.println("尝试 get() 已取消的任务3...");
            String result3 = future3.get();
            System.out.println("获取到已取消任务的结果: " + result3);
        } catch (CancellationException e) {
            System.out.println("获取结果失败,因为任务已被取消! " + e.getClass().getName());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        // 关闭线程池,这是一个好习惯
        executor.shutdownNow();
    }
}

FutureTask类

基本特点

  1. FutureTask类实现了Runnable接口、Future接口(RunnableFuture接口)。
  2. FutureTask类除了实现接口的5+1个方法,还有两个构造方法。
  3. FutureTask代表了一个可以取消的异步执行。
  4. FutureTask可以用来包装Runnable或者Callable。
  5. FutureTask可以提交到Executor中去执行。
import java.util.concurrent.*;

public class FutureTaskFeaturesDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // =================================================================
        // 特点 5: FutureTask 可以用来包装 Runnable 或者 Callable
        // =================================================================
        // 创建一个 Callable 任务,它会模拟计算并返回一个结果
        Callable<String> callableTask = () -> {
            System.out.println("子线程 (来自Callable): 正在进行复杂的计算...");
            Thread.sleep(2000); // 模拟耗时操作
            return "计算完成,结果是ABC";
        };

        // 创建一个 Runnable 任务,它没有返回值
        Runnable runnableTask = () -> {
            System.out.println("子线程 (来自Runnable): 正在执行一个任务...");
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        };

        // 使用 Callable 构造 FutureTask
        FutureTask<String> futureTaskFromCallable = new FutureTask<>(callableTask);

        // 使用 Runnable 构造 FutureTask,需要提供一个默认的返回结果
        String defaultResult = "Runnable执行完毕";
        FutureTask<String> futureTaskFromRunnable = new FutureTask<>(runnableTask, defaultResult);


        // =================================================================
        // 特点 2 & 6: FutureTask 是一个实现了 Runnable 和 Future 的类,可以提交到 Executor 执行
        // =================================================================
        // 因为 FutureTask 实现了 Runnable,所以它可以被线程池执行
        ExecutorService executor = Executors.newFixedThreadPool(2);

        System.out.println("主线程: 将 futureTaskFromCallable 提交到线程池执行。");
        executor.submit(futureTaskFromCallable); // 特点6的演示

        // =================================================================
        // 特点 1 & 3: 代表可取消的异步计算,结果只能在执行完成后才能获取 (get() 会阻塞)
        // =================================================================
        System.out.println("主线程: 尝试获取 futureTaskFromCallable 的结果...");

        // 在任务完成前,isDone() 返回 false (特点2的演示)
        System.out.println("主线程: 任务完成了吗? " + futureTaskFromCallable.isDone());

        // 调用 get() 方法,主线程会在这里阻塞,直到子线程中的任务执行完毕 (特点3的演示)
        String result = futureTaskFromCallable.get(); // 阻塞点

        System.out.println("主线程: 终于等到了结果 - " + result); // 特点1的演示

        // 任务完成后,isDone() 返回 true
        System.out.println("主线程: 任务现在完成了吗? " + futureTaskFromCallable.isDone());
        System.out.println("------------------------------------------");

        // =================================================================
        // 特点 4: 一旦执行完成,则不能重新开始执行,也不能取消
        // =================================================================
        System.out.println("主线程: 演示任务完成后的状态。");

        // 尝试取消一个已经完成的任务
        boolean cancelResult = futureTaskFromCallable.cancel(true);
        System.out.println("主线程: 尝试取消已完成的任务,结果: " + cancelResult);
        System.out.println("主线程: 已完成的任务被取消了吗? " + futureTaskFromCallable.isCancelled());

        // 再次提交已经完成的任务到线程池
        // 注意:这不会让任务的 run() 方法被再次执行。
        // FutureTask 内部有状态控制,一旦任务完成(正常、异常或取消),它的状态就不会再改变。
        System.out.println("主线程: 再次提交同一个已完成的FutureTask...");
        executor.submit(futureTaskFromCallable); // 提交是有效的,但任务的run方法不会再执行
        // 你可以再次 get(), 它会立刻返回之前已经计算好的结果,而不会重新计算。
        System.out.println("主线程: 再次get(),立即返回结果: " + futureTaskFromCallable.get());
        System.out.println("------------------------------------------");


        // 演示取消一个尚未完成的任务 (再次体现特点1)
        FutureTask<String> longRunningTask = new FutureTask<>(() -> {
            System.out.println("子线程 (长任务): 我要睡10秒,除非被中断...");
            Thread.sleep(10000);
            return "我睡醒了";
        });

        executor.submit(longRunningTask);
        Thread.sleep(100); // 确保任务已经开始运行

        System.out.println("主线程: 任务太慢了,决定取消它。");
        longRunningTask.cancel(true); // 尝试取消

        System.out.println("主线程: 长任务被取消了吗? " + longRunningTask.isCancelled());
        System.out.println("主线程: 长任务算'完成'了吗? " + longRunningTask.isDone());
        
        try {
            longRunningTask.get(); // 对已取消的任务调用get会抛出CancellationException
        } catch (CancellationException e) {
            System.out.println("主线程: 果然,获取结果时抛出了 " + e.getClass().getSimpleName());
        }

        // 关闭线程池
        executor.shutdown();
    }
}

“不可重复执行”特性(含源码)

还有一点需要注意的就是,一旦执行完成,就不能重新开始执行,也不能取消。

FutureTask之所以再次执行没有效果,是因为它内部设计了一个“一次性”的状态机(State Machine)。一旦任务进入“完成”状态(无论是正常结束、异常终止还是被取消),它的状态就无法再回到“未开始”状态。

示例如下:

错误的做法:

Callable<String> taskLogic = () -> "Hello";
FutureTask<String> myTask = new FutureTask<>(taskLogic);

executor.submit(myTask); // 第一次执行,有效
executor.submit(myTask); // 第二次提交同一个对象,无效

正确的做法:

Callable<String> taskLogic = () -> "Hello"; // 业务逻辑可以复用

// 第一次执行
FutureTask<String> task1 = new FutureTask<>(taskLogic);
executor.submit(task1);
System.out.println(task1.get());

// 第二次执行,必须创建新对象
FutureTask<String> task2 = new FutureTask<>(taskLogic);
executor.submit(task2);
System.out.println(task2.get());

为什么会有这种现象?下面对原理进行分析:

FutureTask被设计的首要目的不仅仅是“去执行一个任务”,更是“持有(或代表)一个异步计算的结果”。

可以将其想象成一张一次性的彩票

FutureTask也是如此。它的run()方法被设计为最多只执行一次。一旦执行完毕,它就从一个“任务执行器”转变为一个“结果容器”。任何后续对get()的调用都会立刻返回已经缓存的结果,而任何再次执行它的尝试都会被直接忽略。

FutureTask内部通过一个state字段来管理其生命周期。这个状态流转是单向的,不可逆转。

其主要状态有:

关键点:一旦状态从 NEW 变为任何其他状态(如NORMAL, EXCEPTIONAL, CANCELLED),就再也无法回到 NEW 状态。

FutureTaskrun()方法完美地体现了这种状态检查机制(以下为简化后的逻辑):

public void run() {
    // 1. 关键检查:如果当前状态不是 NEW,或者设置执行线程失败(说明其他线程抢先执行了),
    //    则直接返回,不执行任何操作。
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;

    try {
        Callable<V> c = callable; // 获取包装的任务
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 2. 真正执行任务
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // 3. 如果有异常,设置异常结果,并将状态变为 EXCEPTIONAL
                setException(ex);
            }
            if (ran)
                // 4. 如果正常完成,设置正常结果,并将状态变为 NORMAL
                set(result);
        }
    } finally {
        // 清理工作
        runner = null;
        // ...
    }
}

从源码可以看出,run()方法的第一行就是一个防御性检查。如果你拿着一个已经执行过(state不再是NEW)的FutureTask实例去提交给线程池,线程池调用它的run()方法时,这个检查会直接失败,方法立即return,任务的逻辑自然就不会被再次执行了。

set()方法是状态变更的开始。

// FutureTask.java

protected void set(V v) {
    // 使用 CAS(Compare-And-Swap)原子地将状态从 NEW 变为 COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // outcome 字段用于存储最终结果
        this.outcome = v;
        // 将最终状态设置为 NORMAL (正常完成)
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        
        // *** 关键调用!进入收尾阶段 ***
        finishCompletion();
    }
}

finishCompletion() —— 唤醒所有等待者,这个方法负责唤醒所有因调用get()而被阻塞的线程。

// FutureTask.java

private void finishCompletion() {
    // 'waiters' 是一个单向链表,存储了所有正在等待结果的线程 (WaitNode)。
    for (WaitNode q; (q = waiters) != null;) {
        // 使用 CAS 将 'waiters' 链表头置为 null,确保这个唤醒过程只被执行一次。
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // 遍历整个等待者链表
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    // *** 唤醒! ***
                    // 使用 LockSupport.unpark() 唤醒那个正在等待的线程。
                    // 那个线程会从 get() -> awaitDone() 的阻塞中醒来。
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // help GC
                q = next;
            }
            break;
        }
    }

    // 做一些完成后的清理工作
    done();
    callable = null; // a micro-optimization
}

整个流程串起来就是:

  1. A线程调用futureTask.get()get()发现任务未完成(stateNEW),于是将A线程包装成一个WaitNode节点,加入到waiters等待链表中,然后LockSupport.park()让自身(A线程)挂起等待。
  2. B线程(线程池中的工作线程)开始执行futureTask.run()
  3. run()方法检查状态为NEW,通过检查,开始执行业务代码。
  4. 业务代码执行完毕,run()调用set(result)
  5. set(result)stateNEW最终改为NORMAL,把结果存入outcome然后调用finishCompletion()
  6. finishCompletion()遍历waiters链表,找到代表A线程的节点,并调用LockSupport.unpark(A线程)
  7. A线程被唤醒,从get()方法中返回,并读取outcome字段中已经准备好的结果。

因为state的状态被永久地改变为了NORMAL,并且finishCompletion保证了所有等待者都被唤醒,这个FutureTask的使命就此终结。任何后续对run()的调用都会在第一步的if (state != NEW)检查中失败,从而实现了“一次性”的语义。

从原始FutureTask对象中取值(含源码)

将一个已经创建好的FutureTask对象提交给ExecutorService时,submit方法会返回一个新的、包装了你原始任务的Future对象。而计算的真正结果,仍然存储在原始的那个FutureTask对象里。调用这个新的包装对象的get()方法,往往得到的是null,而不是想要的结果。

代码示例:

import java.util.concurrent.*;

public class FutureTaskGetDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 1. 创建一个 Callable,这是我们真正的业务逻辑
        Callable<String> myCallable = () -> {
            System.out.println("子线程: 正在执行核心业务逻辑...");
            Thread.sleep(1000);
            return "这是真正的计算结果";
        };

        // 2. 创建我们自己的、原始的 FutureTask 对象
        // 我们需要用这个对象来获取最终结果
        FutureTask<String> originalFutureTask = new FutureTask<>(myCallable);

        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 3. 将我们原始的 FutureTask 提交给线程池
        // submit(Runnable) 会返回一个新的 Future 对象,我们称之为 wrapperFuture
        Future<?> wrapperFuture = executor.submit(originalFutureTask);

        // 4. 分别从两个 Future 对象中获取结果

        // 错误的做法:尝试从 submit 方法返回的 Future 中获取结果
        // 这个 get() 等待的是 wrapperFuture 的完成,其结果是 null
        Object resultFromWrapper = wrapperFuture.get();
        System.out.println("从 submit() 返回的 wrapperFuture.get() 拿到的结果是: " + resultFromWrapper);
        System.out.println("wrapperFuture is done: " + wrapperFuture.isDone());

        System.out.println("-------------------------------------------------");

        // 正确的做法:从我们自己创建的原始 FutureTask 对象中获取结果
        // 这个 get() 获取的是存储在 originalFutureTask 内部的真实结果
        String resultFromOriginal = originalFutureTask.get();
        System.out.println("从原始的 originalFutureTask.get() 拿到的结果是: " + resultFromOriginal);
        System.out.println("originalFutureTask is done: " + originalFutureTask.isDone());

        executor.shutdown();
    }
}

运行结果:

子线程: 正在执行核心业务逻辑...
从 submit() 返回的 wrapperFuture.get() 拿到的结果是: null
wrapperFuture is done: true
-------------------------------------------------
从原始的 originalFutureTask.get() 拿到的结果是: 这是真正的计算结果
originalFutureTask is done: true

从源码的角度看这种原因:

第1步: 进入ExecutorService.submit()

FutureTask本身实现了Runnable接口,所以这里实际调用的是submit(Runnable task)。我们看AbstractExecutorService中的实现:

// AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    // 关键!它调用了 newTaskFor 来创建一个新的 RunnableFuture 对象。
    RunnableFuture<Void> ftask = newTaskFor(task, null); 
    // 然后执行这个新创建的任务
    execute(ftask);
    // 最后返回这个新创建的任务
    return ftask;
}

初始的FutureTask被作为对象传入,newTaskFor(originalFutureTask, null)被调用了。这个方法默认会返回一个新的FutureTask实例

所以,代码等价于:RunnableFuture<Void> ftask = new FutureTask<Void>(originalFutureTask, null);

现在你就有了两个FutureTask

  1. originalFutureTask: 你自己创建的,它包装了myCallable,期望的结果是String类型。
  2. wrapperFuture (即ftask): submit方法内部创建的,它包装了originalFutureTask,期望的结果是null

第2步: 任务执行

线程池实际执行的是外层的wrapperFuture

  1. wrapperFuture.run()被调用。
  2. wrapperFutureCallable是什么?是originalFutureTask自己(因为它实现了Runnable)。所以,wrapperFuturerun方法内部会调用originalFutureTask.run()
  3. originalFutureTask.run()被调用。它会执行最初的myCallable,计算出结果“真正的结果”。
  4. originalFutureTask执行成功后,会调用set("真正的结果"),将结果保存在自己的outcome字段中。

第3步:finishCompletion()和关键的null操作

originalFutureTask在调用set()并最终进入finishCompletion()方法后,会执行一项重要的清理工作。

// FutureTask.java

private void finishCompletion() {
    // ... (唤醒等待者线程的代码) ...

    done(); // 这是一个空方法,留给子类扩展

    // *** 最关键的一行 ***
    // 为了帮助垃圾回收,将内部的 callable 引用置为 null。
    // 因为任务已经执行完了,理论上不再需要它了。
    callable = null;
}

originalFutureTask完成它的使命后,它扔掉了对myCallable的引用。它的outcome字段已经安全地保存了结果“真正的结果”。

第4步: 结果返回

结论:

新返回Future对象的作用

wrapperFuture的核心价值在于提供统一的控制和状态管理接口,而不是为了传递结果。它是ExecutorService框架为了保持API一致性和健壮性而设计的关键一环。

任务取消wrapperFuture最重要的用途之一。当你把任务提交给ExecutorService后,你手中唯一能直接操作的句柄就是submit方法返回的wrapperFuture。你需要通过它来尝试取消任务的执行。

当你在wrapperFuture上调用cancel()时,它会将这个取消请求传播给内部包装的originalFutureTask

FutureTask.cancel()方法会检查任务状态,如果任务还没开始跑,就将其状态设置为CANCELLED。如果任务正在跑,它会根据你传入的mayInterruptIfRunning参数来决定是否要中断执行该任务的线程。

import java.util.concurrent.*;

public class WrapperFutureCancelDemo {

    public static void main(String[] args) throws InterruptedException {
        FutureTask<String> originalFutureTask = new FutureTask<>(() -> {
            System.out.println("子线程: 任务开始,准备睡5秒...");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // cancel(true) 会让线程在这里抛出中断异常
                System.out.println("子线程: 我被中断了,任务提前结束!");
                return "未完成的结果";
            }
            System.out.println("子线程: 任务正常完成。");
            return "已完成的结果";
        });

        ExecutorService executor = Executors.newSingleThreadExecutor();
        // 提交后,我们只能通过 wrapperFuture 来控制这个任务
        Future<?> wrapperFuture = executor.submit(originalFutureTask);

        // 让主线程睡1秒,确保子任务已经开始运行
        Thread.sleep(1000);

        System.out.println("主线程: 任务已经跑了1秒,现在决定取消它。");
        // 调用 wrapperFuture 的 cancel(true) 来中断任务
        boolean cancelled = wrapperFuture.cancel(true); // true表示如果任务正在运行,就中断它

        System.out.println("主线程: 任务取消成功了吗? " + cancelled);
        // isCancelled() 也会被传播
        System.out.println("主线程: wrapperFuture 的状态是否是已取消? " + wrapperFuture.isCancelled());
        System.out.println("主线程: originalFutureTask 的状态是否是已取消? " + originalFutureTask.isCancelled());

        executor.shutdown();
    }
}

运行结果:

子线程: 任务开始,准备睡5秒...
主线程: 任务已经跑了1秒,现在决定取消它。
子线程: 我被中断了,任务提前结束!
主线程: 任务取消成功了吗? true
主线程: wrapperFuture 的状态是否是已取消? true
主线程: originalFutureTask 的状态是否是已取消? true

在这个例子中,完全没有用到get(),但wrapperFuture.cancel()成功地管理了任务的生命周期。

虽然wrapperFuture.get()返回的是null,但它依然是一个阻塞方法。它的作用是让当前线程等待,直到任务执行完成。这是一个非常重要的同步机制。

更重要的是,如果你的原始任务在执行时抛出了异常,调用wrapperFuture.get()重新抛出这个异常(包装在ExecutionException中)。这使得主线程能够捕获并处理后台任务的错误。

import java.util.concurrent.*;

public class WrapperFutureExceptionDemo {

    public static void main(String[] args) {
        FutureTask<String> originalFutureTask = new FutureTask<>(() -> {
            System.out.println("子线程: 任务开始,即将抛出异常!");
            throw new RuntimeException("计算出错!");
        });

        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<?> wrapperFuture = executor.submit(originalFutureTask);

        try {
            System.out.println("主线程: 调用 wrapperFuture.get() 等待任务完成...");
            // 这里会阻塞,直到任务结束。因为任务抛了异常,get()也会抛出异常。
            wrapperFuture.get();
        } catch (InterruptedException e) {
            System.err.println("主线程: 等待时被中断了。");
        } catch (ExecutionException e) {
            System.err.println("主线程: 成功捕获到后台任务的异常!");
            System.err.println("  - 根本原因: " + e.getCause());
        }

        executor.shutdown();
    }
}

运行结果:

子线程: 任务开始,即将抛出异常!
主线程: 调用 wrapperFuture.get() 等待任务完成...
主线程: 成功捕获到后台任务的异常!
  - 根本原因: java.lang.RuntimeException: 计算出错!

即便不关心返回值,wrapperFuture.get()在错误处理和流程同步上也是不可或缺的。

ExecutorService的设计哲学是提供一套统一、可预测的接口。

无论你提交什么类型的任务,submit方法总是返回一个Future对象。这让使用者可以依赖一个统一的模型来管理所有异步任务,而不必写if-else来判断提交的是Callable还是RunnablewrapperFuture (Future<?>)正是这个统一模型中,用于代表“无返回值任务”的那个标准占位符。

总结来说,新返回的Future对象能够安全、统一地管理后台任务,即便并不关心那个任务的返回值。

总结

到此这篇关于Java开发异步编程中常用的接口和类的文章就介绍到这了,更多相关Java异步编程接口和类内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文