java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java CompletableFuture 异步编程

Java CompletableFuture 异步编程最佳实践指南

作者:鹅城剑仙

这段文章详细介绍了CompletableFuture在Java并发编程中的核心应用,包括非阻塞异步任务的创建与执行、链式调用、多任务编排及异常处理等,它强调展示了CompletableFuture如何解决Future类的痛点,提升异步编程效率,并提供了实战案例与最佳实践

一、为什么需要 CompletableFuture?

在 Java 并发编程的发展历程中,异步任务的处理方式经历了多次演进:

Thread / Runnable(JDK 1.0)
    ↓ 痛点:无返回值、难以管理
Future + Callable(JDK 5)
    ↓ 痛点:get() 阻塞、无法链式组合
CompletableFuture(JDK 8)✅
    ↓ 优势:非阻塞、链式调用、函数式编排

💡 CompletableFuture 是 JDK 8 引入的终极异步编程工具,它解决了传统 Future 的所有痛点:

二、核心 API 一览图

                    ┌─────────────────────────────┐
                    │     创建 CompletableFuture    │
                    └──────────┬──────────────────┘
                               │
              ┌────────────────┼────────────────┐
              ▼                ▼                ▼
     supplyAsync(U)      runAsync()       completedFuture(U)
     (有返回值)        (无返回值)       (立即完成)
                               │
          ┌────────────────────┼────────────────────┐
          ▼                    ▼                     ▼
   ┌─────────────┐    ┌──────────────┐    ┌────────────────┐
   │ 转换类方法   │    │ 消费类方法    │    │  组合类方法     │
   ├─────────────┤    ├──────────────┤    ├────────────────┤
   │thenApply    │    │thenAccept    │    │thenCombine     │
   │thenCompose  │    │thenRun       │    │thenAcceptBoth  │
   │applyToEither│    │acceptEither  │    │runAfterBoth    │
   │handle       │    │whenComplete  │    │runAfterEither  │
   └─────────────┘    └──────────────┘    └────────────────┘
          │                                      │
          ▼                                      ▼
   ┌─────────────┐                    ┌────────────────┐
   │ 异常处理     │    ┌──────────────►│ 多任务组合      │
   ├─────────────┤    │               ├────────────────┤
   │exceptionally│    │               │allOf(全部完成)  │
   │handle       │    │               │anyOf(任一完成)  │
   │whenComplete │    │               └────────────────┘
   └─────────────┘    │
                      ▼
               ┌─────────────┐
               │ 获取结果     │
               ├─────────────┤
               │get()        │
               │get(timeout) │
               │join()       │
               │orTimeout()  │
               │completeOnTimeout()
               └─────────────┘

三、创建异步任务

3.1 supplyAsync — 有返回值的异步任务

// 基础用法:使用默认 ForkJoinPool
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟耗时操作(如远程API调用)
    try { Thread.sleep(1000); } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    return "Hello, CompletableFuture!";
});
// 非阻塞获取结果
String result = future.join();  // join() 不抛检查异常
System.out.println(result);     // Hello, CompletableFuture!

3.2 runAsync — 无返回值的异步任务

// 执行一个没有返回值的异步操作
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("执行异步任务: " + Thread.currentThread().getName());
    // 发送邮件、写日志等不需要返回值的操作
});
future.join();  // 等待任务完成

3.3 指定自定义线程池(⚠️ 生产环境必须)

// ⚠️ 默认使用 ForkJoinPool.commonPool(),生产环境建议指定线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("线程: " + Thread.currentThread().getName());
    return "使用自定义线程池";
}, executor);
// 记得关闭线程池
executor.shutdown();

⚠️ 重要提醒:默认的 ForkJoinPool.commonPool() 是全局共享的,如果任务阻塞或耗时过长会影响其他组件。生产环境务必传入自定义 Executor

四、链式回调(核心用法)

4.1 thenApply — 转换结果(相当于 map)

CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() -> 10)
    .thenApply(x -> x * 2)           // 20:将结果乘以2
    .thenApply(x -> x + 5);          // 25:再加5
System.out.println(result.join());  // 25

4.2 thenAccept — 消费结果(无返回值)

CompletableFuture.supplyAsync(() -> "World")
    .thenAccept(s -> System.out.println("Hello, " + s));
    // 输出: Hello, World

4.3 thenRun — 完成后执行操作(不关心结果)

CompletableFuture.supplyAsync(() -> {
    // 复杂计算...
    return "done";
}).thenRun(() -> {
    System.out.println("任务已完成!清理资源...");
});

4.4 thenCompose — 扁平映射(相当于 flatMap)

// 场景:第一个异步任务的结果是第二个异步任务的输入
CompletableFuture<String> userId = CompletableFuture.supplyAsync(() -> "user_123");
CompletableFuture<String> userInfo = userId.thenCompose(id ->
    CompletableFuture.supplyAsync(() -> {
        // 根据id查询用户详情(模拟RPC调用)
        return "用户" + id + ": 张三, age=28";
    })
);
System.out.println(userInfo.join()); // 用户user_123: 张三, age=28

thenApply vs thenCompose 的区别:

方法返回值适用场景
thenApplyCompletableFuture<U>同步转换,类似 Stream.map
thenComposeCompletableFuture<CompletableFuture<U>> → 扁平化异步嵌套,类似 Stream.flatMap

五、多任务组合

5.1 thenCombine — 两个任务都完成后合并结果

CompletableFuture<Double> priceFuture = CompletableFuture.supplyAsync(() -> 99.9);
CompletableFuture<Integer> quantityFuture = CompletableFuture.supplyAsync(() -> 3);
// 两个任务都完成后,计算总价
CompletableFuture<Double> totalFuture = priceFuture.thenCombine(
    quantityFuture,
    (price, qty) -> price * qty
);
System.out.printf("总价: %.2f 元%n", totalFuture.join()); // 总价: 299.70 元

5.2 allOf — 所有任务全部完成

CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
    sleep(500); return "任务1完成";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
    sleep(300); return "任务2完成";
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
    sleep(700); return "任务3完成";
});
// 所有任务完成后触发
CompletableFuture<Void> allDone = CompletableFuture.allOf(task1, task2, task3);
allDone.thenRun(() -> {
    System.out.println(task1.join());  // 任务1完成
    System.out.println(task2.join());  // 任务2完成
    System.out.println(task3.join());  // 任务3完成
    System.out.println("✅ 全部任务执行完毕!");
}).join();

5.3 anyOf — 任一任务完成即返回

CompletableFuture<String> fastTask = CompletableFuture.supplyAsync(() -> {
    sleep(200); return "快速任务";
});
CompletableFuture<String> slowTask = CompletableFuture.supplyAsync(() -> {
    sleep(2000); return "慢速任务"
});
// 返回最先完成的任务结果
Object firstResult = CompletableFuture.anyOf(fastTask, slowTask).join();
System.out.println("最快完成的: " + firstResult); // 快速任务

六、异常处理

6.1 exceptionally — 恢复异常(类似 catch)

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) throw new RuntimeException("模拟异常!");
    return "正常结果";
}).exceptionally(ex -> {
    System.err.println("捕获到异常: " + ex.getMessage());
    return "降级默认值";  // 返回兜底数据
});
System.out.println(future.join()); // 降级默认值

6.2 handle — 统一处理正常和异常(类似 finally)

CompletableFuture.supplyAsync(() -> {
    // 可能抛出异常的操作
    int result = 10 / 0;
    return "成功: " + result;
}).handle((result, ex) -> {
    if (ex != null) {
        System.err.println("发生错误: " + ex.getMessage());
        return "错误处理后的默认响应";
    }
    return "处理成功: " + result;
}).thenAccept(System.out::println);

6.3 whenComplete — 类似 finally(不改变结果)

CompletableFuture.supplyAsync(() -> "重要数据")
    .whenComplete((result, ex) -> {
        // 无论成功失败都会执行(用于日志记录、指标上报)
        if (ex != null) {
            System.err.println("任务失败,记录日志");
        } else {
            System.out.println("任务成功,耗时统计...");
        }
        // 注意:不返回值,不会改变原始结果
    });

三种异常处理方式对比

方法触发时机能否恢复能否改变结果
exceptionally仅异常时✅ 返回替代值
handle正常+异常时✅ 返回新值
whenComplete正常+异常时❌ 仅副作用

七、超时控制

// JDK 9+ 的 orTimeout
CompletableFuture<String> futureWithTimeout = CompletableFuture.supplyAsync(() -> {
    sleep(5000);  // 模拟耗时5秒的任务
    return "结果";
}).orTimeout(2, TimeUnit.SECONDS)  // 最多等待2秒
  .exceptionally(ex -> {
      if (ex instanceof TimeoutException) {
          return "超时降级结果";
      }
      return "其他异常";
  });
// JDK 9+ 的 completeOnTimeout(不抛异常,直接返回默认值)
CompletableFuture<String> safeFuture = CompletableFuture.supplyAsync(() -> {
    sleep(5000);
    return "真实结果";
}).completeOnTimeout("默认超时值", 2, TimeUnit.SECONDS);

八、实战案例:电商商品详情页聚合

以下是一个真实的业务场景:从多个微服务并行获取数据并组装响应。

import java.util.*;
import java.util.concurrent.*;
public class EcommerceDetailAggregator {
    // 模拟各个微服务
    private static CompletableFuture<ProductInfo> fetchProduct(String productId) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(300);
            return new ProductInfo(productId, "MacBook Pro 16寸", 18999.00);
        });
    }
    private static CompletableFuture<List<Review>> fetchReviews(String productId) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(400);
            return Arrays.asList(
                new Review("用户A", "性能强劲,开发利器!", 5),
                new Review("用户B", "屏幕素质顶级", 5),
                new Review("用户C", "略重但可以接受", 4)
            );
        });
    }
    private static CompletableFuture<Inventory> fetchInventory(String productId) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(200);
            return new Inventory(productId, 128, "北京仓");
        });
    }
    private static CompletableFuture<List<Recommendation>> fetchRecommendations(String productId) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(350);
            return Arrays.asList(
                new Recommendation("Magic Mouse", 699),
                new Recommendation("USB-C Hub", 259)
            );
        });
    }
    public static void main(String[] args) {
        String productId = "P20240614";
        long startTime = System.currentTimeMillis();
        // 1. 并行发起4个异步请求
        CompletableFuture<ProductInfo> productFuture = fetchProduct(productId);
        CompletableFuture<List<Review>> reviewsFuture = fetchReviews(productId);
        CompletableFuture<Inventory> inventoryFuture = fetchInventory(productId);
        CompletableFuture<List<Recommendation>> recsFuture = fetchRecommendations(productId);
        // 2. 组装最终结果
        CompletableFuture<PageResponse> pageData = productFuture
            .thenCombine(reviewsFuture, (product, reviews) -> {
                PageResponse resp = new PageResponse();
                resp.product = product;
                resp.reviews = reviews;
                return resp;
            })
            .thenCombine(inventoryFuture, (resp, inventory) -> {
                resp.inventory = inventory;
                return resp;
            })
            .thenCombine(recsFuture, (resp, recs) -> {
                resp.recommendations = recs;
                return resp;
            });
        // 3. 设置超时 + 异常处理
        pageData = pageData.orTimeout(3, TimeUnit.SECONDS)
            .exceptionally(ex -> {
                System.err.println("聚合超时或异常: " + ex.getMessage());
                return PageResponse.fallback();
            });
        // 4. 输出结果
        PageResponse response = pageData.join();
        long elapsed = System.currentTimeMillis() - startTime;
        System.out.println("\n========== 商品详情页响应 ==========");
        System.out.println("📦 商品: " + response.product.name);
        System.out.println("💰 价格: ¥" + response.product.price);
        System.out.println("📦 库存: " + response.inventory.stock + "件 (" + response.inventory.warehouse + ")");
        System.out.println("⭐ 评分: " + avgRating(response.reviews));
        System.out.println("🎯 推荐: " + formatRecs(response.recommendations));
        System.out.println("⏱️  总耗时: " + elapsed + "ms(串行需~1250ms)");
        System.out.println("======================================\n");
    }
    // ========== 辅助方法 ==========
    private static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    private static double avgRating(List<Review> reviews) {
        return reviews.stream().mapToInt(r -> r.rating).average().orElse(0);
    }
    private static String formatRecs(List<Recommendation> recs) {
        StringBuilder sb = new StringBuilder();
        for (Recommendation r : recs) sb.append(r.name).append("(¥").append(r.price).append(") ");
        return sb.toString().trim();
    }
    // ========== 数据模型 ==========
    static class ProductInfo { String id, name; double price;
        ProductInfo(String id, String name, double price) { this.id=id; this.name=name; this.price=price; } }
    static class Review { String user, comment; int rating;
        Review(String user, String comment, int rating) { this.user=user; this.comment=comment; this.rating=rating; } }
    static class Inventory { String productId, warehouse; int stock;
        Inventory(String pid, int stock, String wh) { this.productId=pid; this.stock=stock; this.warehouse=wh; } }
    static class Recommendation { String name; double price;
        Recommendation(String name, double price) { this.name=name; this.price=price; } }
    static class PageResponse {
        ProductInfo product; List<Review> reviews; Inventory inventory; List<Recommendation> recommendations;
        static PageResponse fallback() { PageResponse r = new PageResponse(); r.product = new ProductInfo("", "暂不可用", 0); return r; }
    }
}

预期输出:

========== 商品详情页响应 ==========
📦 商品: MacBook Pro 16寸
💰 价格: ¥18999.0
📦 库存: 128件 (北京仓)
⭐ 评分: 4.67
🎯 推荐: Magic Mouse(¥699.0) USB-C Hub(¥259.0)
⏱️  总耗时: ~420ms(串行需~1250ms)
======================================

🚀 性能提升约 3 倍! 这就是 CompletableFuture 在实际项目中的核心价值。

九、CompletableFuture vs 传统 Future 对比

特性Future(JDK 5)CompletableFuture(JDK 8)
手动完成❌ 不支持complete()
链式调用❌ 不支持thenApply/thenAccept...
多任务组合❌ 不支持allOf/anyOf/thenCombine
异常处理❌ 只能 get() 时捕获exceptionally/handle
非阻塞获取❌ get() 阻塞thenAccept 回调
超时控制get(timeout)orTimeout(JDK9+)

十、最佳实践清单

十一、总结

要点内容
核心价值非阻塞异步编程,告别 callback hell
创建方式supplyAsync(有返回值) / runAsync(无返回值)
链式调用thenApply(转换) → thenAccept(消费) → thenRun(收尾)
组合编排thenCombine(双任务) / allOf(全完成) / anyOf(任一完成)
异常处理exceptionally(恢复) / handle(统一) / whenComplete(日志)
生产要点自定义线程池 + 超时控制 + 早期异常处理

📚 延伸阅读

本文基于 JDK 8+ 编写,部分 API(如 orTimeout)需要 JDK 9+。如有疑问欢迎交流讨论!

到此这篇关于Java CompletableFuture 异步编程最佳实践指南的文章就介绍到这了,更多相关Java CompletableFuture 异步编程内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文