java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java8 CompletableFuture异步编程

Java8 CompletableFuture异步编程解读

作者:骑个小蜗牛

Java8引入的CompletableFuture是Java异步编程的重要进展,提供了基于未来结果的异步编程模型,它适用于异步计算、多个并行任务组合、异步回调、超时控制、错误处理和多任务组合与合成等场景

CompletableFuturede介绍

Java 8 引入了 CompletableFuture 类,这是 Java 异步编程的一个重要进展。

CompletableFuture 提供了一种基于未来结果的异步编程模型,允许你以更加直观和易于理解的方式编写非阻塞代码。

CompletableFuturede使用场景

CompletableFuture 主要用于:

常用异步编程实现方案

- Thread

特点:

使用示例:

	public static void main(String[] args) {
        Thread thread = new Thread(() -> {
        	 System.out.println(Thread.currentThread().getName() + " is running...");
        });
        thread.start();
    }

- ExecutorService

特点:

使用示例:

有返回值:

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(2);  // 创建线程池
        
        Callable<Integer> task = () -> {
            Thread.sleep(1000);
            return 42;
        };
        
        Future<Integer> result = executor.submit(task);  // 提交任务并获得 Future 对象
        System.out.println("Task result: " + result.get());  // 获取结果
        
        executor.shutdown();  // 关闭线程池
    }

无返回值:

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);  // 创建线程池
        
        Runnable task = () -> {
            System.out.println(Thread.currentThread().getName() + " is running...");
        };
        
        executor.execute(task);  // 提交任务
        
        executor.shutdown();  // 关闭线程池
    }

- CountDownLatch

特点:

使用示例:

    public static void main(String[] args) throws InterruptedException {
        int totalThreads = 3;
        CountDownLatch latch = new CountDownLatch(totalThreads);  // 初始化计数器为3
        
        Runnable task = () -> {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " finished.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                latch.countDown();  // 每个线程完成后减少计数器
            }
        };
        
        // 启动多个线程
        for (int i = 0; i < totalThreads; i++) {
            new Thread(task).start();
        }
        
        latch.await();  // 等待计数器归零
        System.out.println("All tasks are finished.");
    }

- CyclicBarrier

特点:

使用示例:

    public static void main(String[] args) throws InterruptedException {
        int totalThreads = 3;
        CyclicBarrier barrier = new CyclicBarrier(totalThreads, () -> {
            System.out.println("All threads reached the barrier point, proceeding...");
        });
        
        Runnable task = () -> {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " reached the barrier.");
                barrier.await();  // 等待其他线程到达屏障点
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
        
        // 启动多个线程
        for (int i = 0; i < totalThreads; i++) {
            new Thread(task).start();
        }
    }

- ForkJoinPool

特点:

使用示例:

import java.util.concurrent.*;

public class ForkJoinPoolExample {
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();  // 创建 ForkJoinPool
        int[] array = {1, 2, 3, 4, 5, 6, 7, 8};
        RecursiveTask<Integer> task = new SumTask(array, 0, array.length);
        int result = pool.invoke(task);  // 执行任务并获取结果
        System.out.println("Sum is: " + result);
    }
}

class SumTask extends RecursiveTask<Integer> {
    private int[] array;
    private int start, end;
    
    public SumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected Integer compute() {
        if (end - start <= 2) {  // 基础情况
            int sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            int mid = (start + end) / 2;
            SumTask task1 = new SumTask(array, start, mid);
            SumTask task2 = new SumTask(array, mid, end);
            task1.fork();  // 异步执行
            task2.fork();
            return task1.join() + task2.join();  // 合并结果
        }
    }
}

- CompletableFuture

特点:

使用示例:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 42;
        });
        
        // 链式调用,处理结果
        CompletableFuture<Integer> result = future.thenApplyAsync(value -> value * 2);
        
        System.out.println("Result: " + result.get());  // 输出结果
    }

各种实现方案总结

并发方式特点优点缺点
Thread- 最基本的线程创建方式- 通过继承Thread 或实现Runnable 接口创建任务- 简单直观- 需要手动管理线程,容易资源浪费或死锁- 无法直接返回任务结果- 对复杂任务协调不便
ExecutorService- 通过线程池管理线程- 提供任务的调度、执行、生命周期管理- 提供线程池避免手动创建和销毁线程,减少资源浪费- 支持任务的结果返回- 任务间依赖和组合较复杂-get() 方法阻塞线程,难以实现非阻塞
CountDownLatch- 用于等待多个任务完成后执行后续操作- 使用计数器控制任务执行- 可以控制任务同步,确保多个任务完成后继续执行- 只适用于等待任务完成,无法处理任务的依赖关系- 只能使用一次
CyclicBarrier- 用于多个线程在某一点上等待- 可重复使用,适合同步多任务- 可重复使用,适合多次任务同步- 不如CompletableFuture 灵活- 仅适合特定的同步场景
ForkJoinPool- 专为递归分治任务设计的线程池- 支持任务拆分和合并- 高效利用多核处理器,适合分治算法- 支持任务拆分和合并- 对于非递归任务不适合- 异常处理不如CompletableFuture 灵活
CompletableFuture- 基于Future 设计的异步编程API- 支持非阻塞的任务组合和回调处理- 支持链式调用,异步任务组合,避免阻塞- 可以处理异常,支持并行处理和同步等待- 支持thenApply、thenAccept 等多种处理方式,简化代码- 复杂任务时调试困难- 异常处理仍较为复杂- 比ExecutorService 稍显复杂

CompletableFuturede结构

CompletableFuture实现了Future接口和CompletionStage接口。

结构梳理

相关接口描述
Future是一个表示异步计算结果的接口。它提供了方法来检查异步计算是否完成、获取计算的结果以及取消计算。
CompletionStage是一个表示异步计算结果的接口,提供了处理计算结果的非阻塞操作。与 Future 不同,CompletionStage 采用链式调用,可以更灵活地组合多个异步操作。

- Future接口

Future接口是JDK 5引入的,该接口属于java.util.concurrent包。

Future接口的目的是表示异步计算的结果,它允许你提交一个任务给一个 Executor(执行器),并在稍后获取任务的结果。

主要方法:

方法描述
get()阻塞当前线程,直到异步计算完成,并返回计算结果
get(long timeout, TimeUnit unit)阻塞当前线程,直到异步计算完成或超时,并返回计算结果
isDone()检查异步计算是否完成
cancel(boolean mayInterruptIfRunning)尝试取消异步计算
isCancelled()检查异步计算是否被取消。

- CompletionStage接口

CompletionStage 接口是 Java 8 引入的一个重要接口,用于描述异步计算的生命周期和结果。

CompletionStage 提供了一套方法,用于处理异步计算的结果、组合多个计算、处理异常等。

主要方法:

方法描述
thenApply在当前阶段完成后,应用给定的 Function,并返回一个新的 CompletionStage。
thenAcceptAsync异步地执行指定的 Consumer,并返回一个新的 CompletionStage,该阶段没有结果。
thenComposeAsync异步地将当前阶段的结果应用于一个返回 CompletionStage 的函数,并返回一个新的 CompletionStage。
thenCombine在两个 CompletionStage 都完成后,使用给定的 BiFunction 合并它们的结果,并返回一个新的 CompletionStage。
runAfterEitherAsync在任意一个给定的两个 CompletionStage 完成后,异步地执行指定的 Runnable。
thenAccept在当前阶段完成后,执行指定的 Consumer,并返回一个新的 CompletionStage,该阶段没有结果。
runAfterEither在任意一个给定的两个 CompletionStage 完成后,执行指定的 Runnable。
thenCombineAsync在两个 CompletionStage 都完成后,异步地使用给定的 BiFunction 合并它们的结果,并返回一个新的 CompletionStage。
thenAcceptBothAsync在两个 CompletionStage 都完成后,异步地执行指定的 BiConsumer,并返回一个新的 CompletionStage。
applyToEither在两个 CompletionStage 中任意一个完成后,应用给定的 Function,并返回一个新的 CompletionStage。
applyToEitherAsync在两个 CompletionStage 中任意一个完成后,异步地应用给定的 Function,并返回一个新的 CompletionStage。
runAfterBothAsync在两个 CompletionStage 都完成后,异步地执行指定的 Runnable,并返回一个新的 CompletionStage。
thenAcceptBothAsync在两个 CompletionStage 都完成后,异步地执行指定的 BiConsumer。
acceptEitherAsync在两个 CompletionStage 中任意一个完成后,异步地执行指定的 Consumer,并返回一个新的 CompletionStage。
handleAsync异步地处理当前阶段的结果或异常,应用给定的 BiFunction,并返回一个新的 CompletionStage。
thenComposeAsync同 thenCompose,但异步地应用给定的函数,并返回一个新的 CompletionStage。
thenCombineAsync同 thenCombine,但异步地使用给定的 BiFunction 合并两个 CompletionStage 的结果。
exceptionally如果当前阶段以异常完成,则应用指定的 Function 处理该异常,并返回一个新的 CompletionStage。
acceptEither在两个 CompletionStage 中任意一个完成后,执行指定的 Consumer。
thenCompose将当前阶段的结果应用于一个返回 CompletionStage 的函数,并返回一个新的 CompletionStage。
handle处理当前阶段的结果或异常,应用给定的 BiFunction,并返回一个新的 CompletionStage。
thenAcceptBoth在两个 CompletionStage 都完成后,执行指定的 BiConsumer。
thenApplyAsync异步地应用给定的 Function,并返回一个新的 CompletionStage。
whenCompleteAsync异步地执行指定的 BiConsumer,无论结果如何,并返回一个新的 CompletionStage。
applyToEitherAsync同 applyToEither,但异步地应用给定的 Function,并返回一个新的 CompletionStage。
acceptEitherAsync同 acceptEither,但异步地执行指定的 Consumer,并返回一个新的 CompletionStage。
runAfterEitherAsync同 runAfterEither,但异步地执行指定的 Runnable,并返回一个新的 CompletionStage。
thenRunAsync异步地执行指定的 Runnable,并返回一个新的 CompletionStage,该阶段没有结果。
runAfterBoth在两个 CompletionStage 都完成后,执行指定的 Runnable。
whenComplete在当前阶段完成后,无论结果如何,执行指定的 BiConsumer,并返回一个新的 CompletionStage。
thenRunAsync异步地执行指定的 Runnable,并返回一个新的 CompletionStage,该阶段没有结果。

常用方法

方法描述
supplyAsync()异步地运行一个带返回值的任务。
runAsync()异步地运行一个无返回值的任务。
thenApply()当 CompletableFuture 任务完成时执行某个操作,并返回新的结果。
thenAccept()当任务完成时执行某个操作,但不返回结果。
thenRun()当任务完成时执行某个操作,无需返回结果。
exceptionally()用于处理任务执行中发生的异常。
handle()处理任务执行中的正常结果或异常结果。
allOf()等待多个 CompletableFuture 全部完成,返回一个新的 CompletableFuture。
anyOf()等待多个 CompletableFuture 中的任意一个完成。

CompletableFuture使用示例

1. 基本异步操作

CompletableFuture.supplyAsync() 和 CompletableFuture.runAsync() 是最常用的启动异步任务的方法。

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 带返回值的异步任务
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);  // 模拟耗时任务
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 42;  // 返回结果
        });

        // 获取异步任务的结果
        Integer result = future.get();  // 阻塞,直到任务完成
        System.out.println("Result: " + result);
    }

2. 任务链式调用

通过 thenApply(), thenAccept(), thenRun() 等方法,可以将多个异步任务串联在一起。

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 42;  // 返回结果
        });

        // 链式调用,先处理结果,再转换
        CompletableFuture<Integer> resultFuture = future
            .thenApply(value -> value * 2)  // 将值乘以2
            .thenApply(value -> value + 10);  // 再加10

        Integer result = resultFuture.get();  // 获取最终结果
        System.out.println("Final Result: " + result);  // 输出 94
    }

3. 多个异步任务组合

使用 thenCombine()、thenCompose()、allOf() 和 anyOf() 等方法可以组合多个异步任务,执行复杂的操作。

示例1:组合两个异步任务

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            return 10;
        });

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            return 20;
        });

        // 合并两个任务的结果
        CompletableFuture<Integer> combinedFuture = future1
            .thenCombine(future2, (result1, result2) -> result1 + result2);  // 将两个结果相加

        Integer result = combinedFuture.get();  // 获取最终结果
        System.out.println("Combined Result: " + result);  // 输出 30
    }

示例2:使用 allOf() 等待多个任务完成

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
                System.out.println("Task 1 completed");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1500);
                System.out.println("Task 2 completed");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 等待多个任务全部完成
        CompletableFuture.allOf(future1, future2).join();

        System.out.println("All tasks are completed.");
    }

4. 异常处理

在异步任务中,异常可能会发生。CompletableFuture 提供了 exceptionally() 和 handle() 方法来处理异常。

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            if (true) {
                throw new RuntimeException("Something went wrong!");
            }
            return 42;
        });

        // 使用 exceptionally 处理异常并提供默认值
        CompletableFuture<Integer> resultFuture = future.exceptionally(ex -> {
            System.out.println("Exception occurred: " + ex.getMessage());
            return -1;  // 返回默认值
        });

        Integer result = resultFuture.get();  // 获取结果
        System.out.println("Result: " + result);  // 输出 -1
    }

5. 并行执行多个任务

使用 CompletableFuture.supplyAsync() 或 runAsync() 来并行执行多个任务,然后使用 allOf() 或 anyOf() 等方法等待这些任务的完成。

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
                return 1;
            } catch (InterruptedException e) {
                return 0;
            }
        });

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(500);
                return 2;
            } catch (InterruptedException e) {
                return 0;
            }
        });

        // 等待所有任务完成并合并结果
        CompletableFuture<Integer> result = future1
            .thenCombine(future2, (res1, res2) -> res1 + res2);  // 将两个结果相加

        System.out.println("Combined result: " + result.get());  // 输出 3
    }

6. 处理返回值的转换

通过 thenApply() 等方法可以对异步任务的结果进行转换处理。

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 10);

        // 转换结果:将值乘以2
        CompletableFuture<Integer> transformedFuture = future.thenApply(value -> value * 2);

        System.out.println("Transformed Result: " + transformedFuture.get());  // 输出 20
    }

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文