Java CompletableFuture 多任务编排方法实践指南
作者:ljh_learn_from_base
本文详细介绍了CompletableFuture的核心方法,按功能分类整理,包括创建方法、完成处理方法、链式转换方法、组合方法、多Future聚合方法、状态检查方法,并结合实例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧
CompletableFuture核心方法的系统对比,按功能分类整理
一、创建方法(静态工厂)
| 方法 | 返回值 | 执行线程 | 特点 | 示例 |
|---|---|---|---|---|
| runAsync(Runnable) | CompletableFuture<Void> | ForkJoinPool.commonPool() | 无返回值 | runAsync(() -> log.info("ok")) |
| runAsync(Runnable, Executor) | CompletableFuture<Void> | 指定线程池 | 无返回值,自定义线程池 | runAsync(task, executor) |
| supplyAsync(Supplier<T>) | CompletableFuture<T> | ForkJoinPool.commonPool() | 有返回值 | supplyAsync(() -> fetchData()) |
| supplyAsync(Supplier<T>, Executor) | CompletableFuture<T> | 指定线程池 | 有返回值,自定义线程池 | supplyAsync(() -> fetch(), executor) |
| completedFuture(U value) | CompletableFuture<U> | 立即完成 | 包装已存在的值 | completedFuture(cachedValue) |
| failedFuture(Throwable) | CompletableFuture<T> | 立即完成 | 包装异常(Java 9+) | failedFuture(new TimeoutException()) |
二、完成处理方法(终结操作)
| 方法 | 输入 | 输出 | 异常处理 | 使用场景 |
|---|---|---|---|---|
| whenComplete(BiConsumer<T,Throwable>) | 结果+异常 | 原类型T | 异常继续传播 | 资源清理、日志记录 |
| whenCompleteAsync(...) | 同上 | 原类型T | 异常继续传播 | 异步执行清理 |
| handle(BiFunction<T,Throwable,U>) | 结果+异常 | 新类型U | 异常被转换 | 容错恢复、默认值 |
| handleAsync(...) | 同上 | 新类型U | 异常被转换 | 异步容错处理 |
| exceptionally(Function<Throwable,T>) | 异常 | 原类型T | 仅异常时触发 | 异常降级处理 |
| exceptionallyAsync(...)(Java 12+) | 异常 | 原类型T | 仅异常时触发 | 异步异常处理 |
关键区别:
// whenComplete:异常继续抛,只能观察
future.whenComplete((v, e) -> log.info("done")).join(); // 可能抛异常
// handle:异常被"吃掉",返回新值
future.handle((v, e) -> e != null ? "default" : v).join(); // 一定成功
// exceptionally:仅处理异常情况
future.exceptionally(e -> "error: " + e.getMessage()).join();三、链式转换方法(中间操作)
| 方法 | 前置结果 | 转换函数 | 返回值 | 异常时是否跳过 |
|---|---|---|---|---|
| thenApply(Function<T,U>) | T | T → U | CompletableFuture<U> | ✅ 跳过,传异常 |
| thenApplyAsync(...) | T | T → U | CompletableFuture<U> | ✅ 跳过,传异常 |
| thenAccept(Consumer<T>) | T | T → void | CompletableFuture<Void> | ✅ 跳过,传异常 |
| thenAcceptAsync(...) | T | T → void | CompletableFuture<Void> | ✅ 跳过,传异常 |
| thenRun(Runnable) | 忽略 | () → void | CompletableFuture<Void> | ✅ 跳过,传异常 |
| thenRunAsync(...) | 忽略 | () → void | CompletableFuture<Void> | ✅ 跳过,传异常 |
对比示例:
supplyAsync(() -> "Hello")
.thenApply(s -> s + " World") // Hello World
.thenAccept(System.out::println) // 打印,返回Void
.thenRun(() -> log.info("Done")); // 不依赖前置结果四、组合方法(合并多个Future
| 方法 | 组合方式 | 前置完成 | 函数签名 | 输出类型 |
|---|---|---|---|---|
| thenCompose(Function<T,CF<U>>) | 扁平化链式 | 等前一个 | T → CF<U> | CF<U> |
| thenComposeAsync(...) | 同上 | 同上 | 同上 | CF<U> |
| thenCombine(CF<U>, BiFunction<T,U,V>) | 并行合并 | 等两个 | (T,U) → V | CF<V> |
| thenCombineAsync(...) | 同上 | 同上 | 同上 | CF<V> |
| thenAcceptBoth(CF<U>, BiConsumer<T,U>) | 并行消费 | 等两个 | (T,U) → void | CF<Void> |
| runAfterBoth(CF<?>, Runnable) | 并行运行 | 等两个 | () → void | CF<Void> |
| applyToEither(CF<T>, Function<T,U>) | 竞速取快 | 任一个 | T → U | CF<U> |
| acceptEither(CF<T>, Consumer<T>) | 竞速消费 | 任一个 | T → void | CF<Void> |
| runAfterEither(CF<?>, Runnable) | 竞速运行 | 任一个 | () → void | CF<Void> |
场景对比:
// thenCompose:串行依赖(扁平化避免嵌套)
fetchUserId().thenCompose(id -> fetchUserDetail(id)); // CF<User>
// thenCombine:并行聚合
CompletableFuture<String> f1 = fetchPrice();
CompletableFuture<String> f2 = fetchStock();
f1.thenCombine(f2, (price, stock) -> "Price:" + price + ", Stock:" + stock);
// applyToEither:超时降级
fetchFromCache()
.applyToEither(fetchFromDB(), result -> result); // 哪个快用哪个五、多Future聚合方法(静态方法)
| 方法 | 输入 | 触发条件 | 输出 | 特点 |
|---|---|---|---|---|
| allOf(CF<?>... cfs) | 多个CF | 全部完成 | CF<Void> | 等待全部,无返回值 |
| anyOf(CF<?>... cfs) | 多个CF | 任一个完成 | CF<Object> | 竞速,返回最快的结果 |
| join() | - | - | T | 阻塞获取结果,抛非检查异常 |
| get() | - | - | T | 阻塞获取,抛检查异常 |
| get(long, TimeUnit) | - | - | T | 带超时的阻塞获取 |
allOf vs anyOf:
CompletableFuture<String> f1 = asyncTask1();
CompletableFuture<String> f2 = asyncTask2();
CompletableFuture<String> f3 = asyncTask3();
// allOf:等全部完成,再收集结果
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.thenRun(() -> {
String r1 = f1.join(); // 此时都已完成,直接取
String r2 = f2.join();
String r3 = f3.join();
});
// anyOf:取最快完成的
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);
any.thenAccept(result -> {
// result是最快完成的那个(类型是Object,需强转)
});六、状态检查方法
| 方法 | 返回值 | 说明 | 注意 |
|---|---|---|---|
| isDone() | boolean | 是否已完成(正常/异常/取消) | 不区分成功失败 |
| isCancelled() | boolean | 是否被取消 | - |
| isCompletedExceptionally() | boolean | 是否异常完成 | Java 8+ |
| getNow(T valueIfAbsent) | T | 立即获取,未完成返回默认值 | 非阻塞 |
| complete(T value) | boolean | 强制完成(如果未完成) | 只能调用一次 |
| completeExceptionally(Throwable) | boolean | 强制异常完成 | 只能调用一次 |
| cancel(boolean) | boolean | 取消任务 | 可能不中断执行线程 |
七、方法命名规律总结
| 关键词 | 含义 | 示例 |
|---|---|---|
| Async | 异步执行(新线程/线程池) | thenApplyAsync |
| Apply | 有返回值,转换类型 | thenApply(T→U) |
| Accept | 无返回值,消费结果 | thenAccept(T→void) |
| Run | 无输入无输出,纯副作用 | thenRun(→void) |
| Compose | 扁平化链式(避免CF<CF<T>>) | thenCompose(T→CF<U>) |
| Combine | 合并两个并行结果 | thenCombine(CF<U>, (T,U)→V) |
| Either | 两个中任一个完成即触发 | applyToEither |
| Both | 两个都完成才触发 | runAfterBoth |
| Handle | 处理结果或异常,可转换 | handle((T,Throwable)→U) |
| Exceptionally | 仅处理异常情况 | exceptionally(Throwable→T) |
八、快速选择指南
需要创建异步任务?
├── 无返回值 → runAsync
└── 有返回值 → supplyAsync
需要在完成后做操作?
├── 要转换结果 → thenApply
├── 只消费结果 → thenAccept
└── 不依赖结果 → thenRun
需要处理异常?
├── 只清理资源,异常继续抛 → whenComplete
├── 异常转默认值 → handle
└── 异常降级 → exceptionally
需要组合多个Future?
├── 串行依赖(A→B)→ thenCompose
├── 并行聚合(A+B→C)→ thenCombine
├── 多个等全部 → allOf
└── 多个取最快 → anyOf掌握这个,可以覆盖 90% 的 CompletableFuture 使用场景。
九:实战代码
import java.util.concurrent.*;
/**
* CompletableFuture 全方法指南示例
* 涵盖:创建、转换、组合、异常处理、聚合等所有核心操作
*/
public class CompletableFutureGuide {
private final ExecutorService executor = Executors.newFixedThreadPool(4);
// ==================== 一、创建方法 ====================
/**
* 指南:无返回值异步任务,不使用默认线程池
* 【除了使用自定义线程池外,还可以使用CompletableFuture的join和get方法】
* CompletableFuture,没有指定自定义线程池的时候【可以把executor去掉试一下】,创建的居然是守护线程。
* 当只有守护线程和主线程【非守护线程】时,所有非守护程结束,守护线程也结束,jvm直接退出结束,等不了2秒。【子线程执行完毕】这句话永远不会执行
*/
public void demoRunAsync() {
System.out.println("=== runAsync ===");
CompletableFuture.runAsync(() -> {
// System.out.println("当前线程是否是守护线程:"+Thread.currentThread().isDaemon());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("demoRunAsync执行日志记录,线程: " + Thread.currentThread().getName());
}, this.executor).whenComplete((result, exception) -> {
//子线程执行完毕
System.out.println("demoRunAsync子线程执行完毕");
});
//future.join(); // 等待子线程完成
System.out.println("=== demoRunAsync主线程结束,等待子线程执行结束===");
}
/**
* 指南:无返回值异步任务,使用自定义线程池
*/
public void demoRunAsyncWithExecutor() {
System.out.println("=== runAsync + Executor ===");
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("自定义线程池执行,线程: " + Thread.currentThread().getName());
}, executor);
future.join();
}
/**
* 指南:有返回值异步任务,使用默认线程池
*/
public void demoSupplyAsync() {
System.out.println("=== supplyAsync ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
sleep(100);
return "从数据库获取的数据";
});
System.out.println("结果: " + future.join());
}
/**
* 指南:有返回值异步任务,使用自定义线程池
*/
public void demoSupplyAsyncWithExecutor() {
System.out.println("=== supplyAsync + Executor ===");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("计算中... 线程: " + Thread.currentThread().getName());
return 42;
}, executor);
System.out.println("计算结果: " + future.join());
}
/**
* 指南:包装已存在的值(立即完成)
* <pre>
* // 使用场景:从缓存获取结果(如果缓存命中)
* public CompletableFuture<String> getFromCache(String key) {
* String cached = cache.get(key);
* if (cached != null) {
* return CompletableFuture.completedFuture(cached); // 缓存命中,直接返回已完成的结果
* }
* // 缓存未命中,需要异步加载
* return CompletableFuture.supplyAsync(() -> loadFromDb(key));
* }
* </pre>
*/
public void demoCompletedFuture() {
System.out.println("=== completedFuture ===");
String cachedValue = "缓存的用户信息";
CompletableFuture<String> future = CompletableFuture.completedFuture(cachedValue);
// 立即完成,不会异步执行
System.out.println("是否完成: " + future.isDone());
System.out.println("值: " + future.join());
}
// ==================== 二、完成处理方法 ====================
/**
* 指南:whenComplete - 只观察不干预,异常继续传播
* 场景:资源清理、日志记录
*/
public void demoWhenComplete() {
System.out.println("=== whenComplete ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) throw new RuntimeException("随机失败");
return "成功";
});
CompletableFuture<String> result = future.whenComplete((value, exception) -> {
// whenComplete只能观察,不能改变结果,需要改变结果的,要使用handle方法
System.out.println("whenComplete 观察到: " +
(exception != null ? "异常" : "值=" + value));
});
try {
System.out.println("最终结果: " + result.join());
} catch (Exception e) {
System.out.println("join() 抛出异常: " + e.getCause().getMessage());
}
}
/**
* 指南:whenCompleteAsync - 异步执行清理
*/
public void demoWhenCompleteAsync() {
System.out.println("=== whenCompleteAsync ===");
CompletableFuture.supplyAsync(() -> "任务完成")
.whenCompleteAsync((v, e) -> {
sleep(200);
System.out.println("异步清理,线程: " + Thread.currentThread().getName());
}, executor)
.join();
System.out.println("=== whenCompleteAsync 结束===");
}
/**
* 指南:handle - 转换结果,异常可被修复
* 注意与whenComplete方法的区别:whenComplete不能改变返回值
* 场景:容错恢复、提供默认值
*/
public void demoHandle() {
System.out.println("=== handle ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("服务不可用");
});
CompletableFuture<String> result = future.handle((value, exception) -> {
// 可以返回新值,改变结果
if (exception != null) {
System.out.println("handle 修复异常,返回默认值");
return "默认数据(降级)";
}
return value + "(加工后)";
});
// 不会抛异常,因为已被handle处理
System.out.println("最终结果: " + result.join());
}
/**
* 指南:handleAsync - 异步容错处理
*/
public void demoHandleAsync() {
System.out.println("=== handleAsync ===");
// CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture.completedFuture(100)
.handleAsync((v, e) -> {
System.out.println("异步处理结果,线程: " + Thread.currentThread().getName());
return v != null ? v * 2 : 0;
}, executor)
.thenAccept(System.out::println)
.join();
}
/**
* 指南:exceptionally - 仅处理异常情况
* 场景:异常降级处理
*/
public void demoExceptionally() {
System.out.println("=== exceptionally ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("数据库连接失败");
});
CompletableFuture<String> result = future.exceptionally(ex -> {
System.out.println("exceptionally 捕获: " + ex.getMessage());
return "从缓存读取的备份数据";
});
System.out.println("结果: " + result.join());
}
// ==================== 三、链式转换方法 ====================
/**
* 指南:thenApply - 转换结果类型 T → U
*/
public void demoThenApply() {
System.out.println("=== thenApply ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "123")
.thenApply(Integer::parseInt) // String → Integer
.thenApply(n -> n * 2) // Integer → Integer
.thenApply(String::valueOf); // Integer → String
System.out.println("转换结果: " + future.join());
}
/**
* 指南:thenApplyAsync - 异步转换
*/
public void demoThenApplyAsync() {
System.out.println("=== thenApplyAsync ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApplyAsync(s -> {
System.out.println("异步转换,线程: " + Thread.currentThread().getName());
return s.toUpperCase();
}, executor);
System.out.println(future.join());
}
/**
* 指南:thenAccept - 消费结果,无返回值
*/
public void demoThenAccept() {
System.out.println("=== thenAccept ===");
CompletableFuture.supplyAsync(() -> "订单创建成功")
.thenAccept(msg -> System.out.println("发送通知: " + msg))
.join(); // 返回Void
}
/**
* 指南:thenAcceptAsync - 异步消费
*/
public void demoThenAcceptAsync() {
System.out.println("=== thenAcceptAsync ===");
CompletableFuture.supplyAsync(() -> "用户登录")
.thenAcceptAsync(event -> {
System.out.println("异步记录日志,线程: " + Thread.currentThread().getName());
}, executor)
.join();
}
/**
* 指南:thenRun - 不依赖前置结果,纯副作用
*/
public void demoThenRun() {
System.out.println("=== thenRun ===");
CompletableFuture.supplyAsync(() -> "步骤1完成")
.thenRun(() -> System.out.println("执行步骤2(不依赖步骤1结果)"))
.thenRun(() -> System.out.println("执行步骤3"))
.join();
}
/**
* 指南:thenRunAsync - 异步执行副作用
*/
public void demoThenRunAsync() {
System.out.println("=== thenRunAsync ===");
CompletableFuture.supplyAsync(() -> "任务")
.thenRunAsync(() -> {
System.out.println("异步清理资源,线程: " + Thread.currentThread().getName());
}, executor)
.join();
}
// ==================== 四、组合方法 ====================
/**
* 指南:thenCompose - 扁平化链式,避免 CF<CF<T>>
* 场景:串行依赖,如先取ID再取详情
*/
public void demoThenCompose() {
System.out.println("=== thenCompose ===");
CompletableFuture<String> future = fetchUserId()
.thenCompose(this::fetchUserDetail); // 返回 CF<User> 被扁平化为 CF<User>
System.out.println("用户信息: " + future.join());
}
private CompletableFuture<String> fetchUserId() {
return CompletableFuture.supplyAsync(() -> "user_123");
}
private CompletableFuture<String> fetchUserDetail(String userId) {
return CompletableFuture.supplyAsync(() -> "详情 of " + userId);
}
/**
* 指南:thenComposeAsync - 异步扁平化
*/
public void demoThenComposeAsync() {
System.out.println("=== thenComposeAsync ===");
fetchUserId().thenComposeAsync(id -> {
System.out.println("异步获取详情,线程: " + Thread.currentThread().getName());
return fetchUserDetail(id);
}, executor).join();
}
/**
* 指南:thenCombine - 并行合并两个结果 (T, U) → V
* 场景:同时查询价格和库存,合并展示
*/
public void demoThenCombine() {
System.out.println("=== thenCombine ===");
CompletableFuture<String> priceFuture = CompletableFuture.supplyAsync(() -> {
sleep(100);
return "¥199";
});
CompletableFuture<Integer> stockFuture = CompletableFuture.supplyAsync(() -> {
sleep(100);
return 50;
});
CompletableFuture<String> result = priceFuture.thenCombine(stockFuture,
(price, stock) -> "价格:" + price + ", 库存:" + stock + "件"
);
System.out.println(result.join());
}
/**
* 指南:thenCombineAsync - 异步合并
*/
public void demoThenCombineAsync() {
System.out.println("=== thenCombineAsync ===");
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");
f1.thenCombineAsync(f2, (a, b) -> {
System.out.println("异步合并,线程: " + Thread.currentThread().getName());
return a + "-" + b;
}, executor).thenAccept(System.out::println).join();
}
/**
* 指南:thenAcceptBoth - 并行消费两个结果
*/
public void demoThenAcceptBoth() {
System.out.println("=== thenAcceptBoth ===");
CompletableFuture<String> orderFuture = CompletableFuture.supplyAsync(() -> "订单123");
CompletableFuture<String> paymentFuture = CompletableFuture.supplyAsync(() -> "支付成功");
orderFuture.thenAcceptBoth(paymentFuture, (order, payment) -> {
System.out.println("订单: " + order);
System.out.println("支付: " + payment);
System.out.println("可以发货了!");
}).join();
}
/**
* 指南:runAfterBoth - 两个都完成后执行Runnable
*/
public void demoRunAfterBoth() {
System.out.println("=== runAfterBoth ===");
CompletableFuture<String> cacheUpdate = CompletableFuture.supplyAsync(() -> {
System.out.println("更新缓存");
return "done";
});
CompletableFuture<String> dbUpdate = CompletableFuture.supplyAsync(() -> {
System.out.println("更新数据库");
return "done";
});
cacheUpdate.runAfterBoth(dbUpdate, () -> {
System.out.println("缓存和数据库都更新完毕,发送通知");
}).join();
}
/**
* 指南:applyToEither - 竞速,取最快结果 T → U
* 场景:超时降级,多个服务商竞速
*/
public void demoApplyToEither() {
System.out.println("=== applyToEither ===");
CompletableFuture<String> localCache = CompletableFuture.supplyAsync(() -> {
sleep(50); // 快
return "缓存结果";
});
CompletableFuture<String> remoteService = CompletableFuture.supplyAsync(() -> {
sleep(200); // 慢
return "远程结果";
});
CompletableFuture<String> result = localCache.applyToEither(remoteService,
winner -> "最快返回的是: " + winner
);
System.out.println(result.join());
}
/**
* 指南:acceptEither - 竞速消费
*/
public void demoAcceptEither() {
System.out.println("=== acceptEither ===");
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
sleep(100);
return "服务A";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
sleep(50);
return "服务B";
});
f1.acceptEither(f2, winner -> {
System.out.println("使用 " + winner + " 的结果");
}).join();
}
/**
* 指南:runAfterEither - 任一个完成就执行
*/
public void demoRunAfterEither() {
System.out.println("=== runAfterEither ===");
CompletableFuture<String> backup = CompletableFuture.supplyAsync(() -> {
sleep(500);
return "备份完成";
});
CompletableFuture<String> primary = CompletableFuture.supplyAsync(() -> {
sleep(100);
return "主任务完成";
});
backup.runAfterEither(primary, () -> {
System.out.println("至少有一个完成了,继续下一步");
}).join();
}
// ==================== 五、多Future聚合方法 ====================
/**
* 指南:allOf - 等待全部完成
* 场景:批量操作后统一处理
*/
public void demoAllOf() {
System.out.println("=== allOf ===");
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
sleep(100);
return "任务1";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
sleep(200);
return "任务2";
});
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
sleep(150);
return "任务3";
});
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.thenRun(() -> {
// 全部完成后收集结果
String r1 = f1.join();
String r2 = f2.join();
String r3 = f3.join();
System.out.println("全部完成: " + r1 + ", " + r2 + ", " + r3);
}).join();
}
/**
* 指南:anyOf - 取最快完成
* 场景:多源查询,谁快用谁
*/
public void demoAnyOf() {
System.out.println("=== anyOf ===");
CompletableFuture<Object> any = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> {
sleep(300);
return "慢服务";
}),
CompletableFuture.supplyAsync(() -> {
sleep(50);
return "快服务";
}),
CompletableFuture.supplyAsync(() -> {
sleep(200);
return "中服务";
})
);
System.out.println("最快的是: " + any.join());
}
/**
* 指南:join vs get
*/
public void demoJoinVsGet() throws Exception {
System.out.println("=== join vs get ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "ok");
// join(): 抛出非检查异常,可直接使用
String result1 = future.join();
// get(): 抛出检查异常,需要try-catch或throws
String result2 = future.get();
// get(timeout): 带超时
String result3 = future.get(1, TimeUnit.SECONDS);
System.out.println("join: " + result1 + ", get: " + result2);
}
// ==================== 六、状态检查方法 ====================
/**
* 指南:状态检查方法
*/
public void demoStatusCheck() {
System.out.println("=== 状态检查 ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
sleep(100);
return "done";
});
System.out.println("创建后立即检查:");
System.out.println(" isDone: " + future.isDone());
System.out.println(" isCancelled: " + future.isCancelled());
System.out.println(" isCompletedExceptionally: " + future.isCompletedExceptionally());
future.join(); // 等待完成
System.out.println("完成后检查:");
System.out.println(" isDone: " + future.isDone());
}
/**
* 指南:getNow - 立即获取,未完成返回默认值
*/
public void demoGetNow() {
System.out.println("=== getNow ===");
CompletableFuture<String> incomplete = new CompletableFuture<>();
// 未完成,返回默认值
String value = incomplete.getNow("默认值");
System.out.println("未完成时: " + value);
// 完成后
incomplete.complete("实际值");
value = incomplete.getNow("默认值");
System.out.println("完成后: " + value);
}
/**
* 指南:complete - 强制完成
*/
public void demoComplete() {
System.out.println("=== complete ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
sleep(1000); // 长时间任务
return "原始结果";
});
// 强制提前完成
boolean success = future.complete("强制结果");
System.out.println("强制完成是否成功: " + success);
System.out.println("结果: " + future.join());
// 再次complete无效
boolean failed = future.complete("新结果");
System.out.println("再次complete: " + failed);
}
/**
* 指南:completeExceptionally - 强制异常完成
*/
public void demoCompleteExceptionally() {
System.out.println("=== completeExceptionally ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "不会返回";
});
// 强制标记为异常
future.completeExceptionally(new CancellationException("用户取消"));
try {
future.join();
} catch (CompletionException e) {
System.out.println("异常: " + e.getCause().getClass().getSimpleName());
}
}
/**
* 指南:cancel - 取消任务
*/
public void demoCancel() {
System.out.println("=== cancel ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
while (!Thread.currentThread().isInterrupted()) {
// 模拟长时间运行
if (Math.random() > 0.99) break;
}
return "完成";
});
sleep(10); // 让任务启动
boolean cancelled = future.cancel(true); // true=尝试中断线程
System.out.println("取消结果: " + cancelled);
System.out.println("isCancelled: " + future.isCancelled());
System.out.println("isDone: " + future.isDone());
}
// ==================== 综合实战示例 ====================
/**
* 综合场景:订单处理流程
* 展示多方法组合使用
*/
public void demoOrderProcess() {
System.out.println("=== 综合实战:订单处理 ===");
// 1. 创建订单(supplyAsync)
CompletableFuture<String> orderFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("1. 创建订单");
return "ORDER_123";
});
// 2. 同时执行:扣库存 + 扣款(thenCombine)
CompletableFuture<String> inventoryFuture = orderFuture.thenApply(order -> {
System.out.println("2a. 扣减库存: " + order);
return "库存OK";
});
CompletableFuture<String> paymentFuture = orderFuture.thenCompose(order ->
CompletableFuture.supplyAsync(() -> {
System.out.println("2b. 处理支付: " + order);
return "支付OK";
})
);
// 3. 两者都完成才发货(thenAcceptBoth)
CompletableFuture<Void> shippingFuture = inventoryFuture.thenAcceptBoth(paymentFuture,
(inv, pay) -> System.out.println("3. 发货!(" + inv + ", " + pay + ")")
);
// 4. 异常处理:如果失败发送通知(exceptionally)
CompletableFuture<String> result = shippingFuture
.thenApply(v -> "订单完成")
.exceptionally(ex -> {
System.out.println("错误: " + ex.getMessage());
return "订单失败";
});
// 5. 最终清理(whenComplete)
result.whenComplete((res, ex) -> {
System.out.println("4. 最终状态: " + res);
System.out.println("5. 清理资源");
}).join();
}
// ==================== 辅助方法 ====================
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void shutdown() {
executor.shutdown();
}
// ==================== Main ====================
public static void main(String[] args) throws Exception {
CompletableFutureGuide guide = new CompletableFutureGuide();
// 一、创建方法
guide.demoRunAsync();
guide.demoRunAsyncWithExecutor();
guide.demoSupplyAsync();
guide.demoSupplyAsyncWithExecutor();
guide.demoCompletedFuture();
// 二、完成处理
guide.demoWhenComplete();
guide.demoWhenCompleteAsync();
guide.demoHandle();
guide.demoHandleAsync();
guide.demoExceptionally();
// 三、链式转换
guide.demoThenApply();
guide.demoThenApplyAsync();
guide.demoThenAccept();
guide.demoThenAcceptAsync();
guide.demoThenRun();
guide.demoThenRunAsync();
// 四、组合方法【组合其它CompletableFuture】
guide.demoThenCompose();
guide.demoThenComposeAsync();
guide.demoThenCombine();
guide.demoThenCombineAsync();
guide.demoThenAcceptBoth();
guide.demoRunAfterBoth();
guide.demoApplyToEither();
guide.demoAcceptEither();
guide.demoRunAfterEither();
// 五、聚合方法
guide.demoAllOf();
guide.demoAnyOf();
guide.demoJoinVsGet();
// 六、状态检查
guide.demoStatusCheck();
guide.demoGetNow();
guide.demoComplete();
guide.demoCompleteExceptionally();
guide.demoCancel();
// 综合实战
guide.demoOrderProcess();
guide.shutdown();
System.out.println("\n所有演示完成!");
}
}到此这篇关于Java CompletableFuture 多任务编排方法实践指南的文章就介绍到这了,更多相关Java CompletableFuture 多任务编排内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
