Java 中多线程异步处理最佳实践
作者:沃心
第一部分: CompletableFuture 级联调用
以下代码展示了 supplyAsync(异步执行有返回值任务)和 thenApplyAsync(异步处理前序任务结果)的级联调用,包含完整的异常处理、资源管理和 Java 17+ 语法特性。
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 异步任务级联调用示例服务
* 使用 Java 17+ 特性 + Spring 注解 + 完整异常处理
*/
@Slf4j
@Service
public class AsyncCascadeService {
// 自定义线程池(避免使用默认 ForkJoinPool,提升可控性)
private static final ExecutorService CUSTOM_EXECUTOR = Executors.newFixedThreadPool(5);
/**
* 级联异步任务示例:
* 1. supplyAsync:异步执行任务1(生成订单ID)
* 2. thenApplyAsync:异步处理任务1结果(根据订单ID查询订单详情)
* 3. thenApplyAsync:异步处理任务2结果(计算订单金额)
*/
public CompletableFuture<OrderAmountResult> processOrderAsync(Long userId) {
// 第一个异步任务:生成订单ID(supplyAsync 执行有返回值的异步任务)
CompletableFuture<String> generateOrderIdFuture = CompletableFuture.supplyAsync(() -> {
log.info("异步任务1:生成订单ID,线程:{}", Thread.currentThread().getName());
// 模拟业务耗时
simulateDelay(100);
return "ORDER_" + userId + "_" + System.currentTimeMillis();
}, CUSTOM_EXECUTOR) // 指定自定义线程池(推荐,避免默认池耗尽)
.exceptionally(ex -> {
log.error("任务1执行失败:", ex);
throw new RuntimeException("生成订单ID失败", ex);
});
// 第二个异步任务:处理订单ID,查询订单详情(thenApplyAsync 异步处理前序结果)
CompletableFuture<OrderDetail> queryOrderDetailFuture = generateOrderIdFuture
.thenApplyAsync(orderId -> {
log.info("异步任务2:查询订单详情,订单ID:{},线程:{}", orderId, Thread.currentThread().getName());
simulateDelay(150);
// 模拟查询结果
return new OrderDetail(orderId, userId, "商品A", 2, 99.9);
}, CUSTOM_EXECUTOR)
.exceptionally(ex -> {
log.error("任务2执行失败:", ex);
throw new RuntimeException("查询订单详情失败", ex);
});
// 第三个异步任务:计算订单总金额(级联处理前序结果)
return queryOrderDetailFuture
.thenApplyAsync(orderDetail -> {
log.info("异步任务3:计算订单金额,订单详情:{},线程:{}", orderDetail, Thread.currentThread().getName());
simulateDelay(100);
double totalAmount = orderDetail.price() * orderDetail.quantity();
// 使用 Java 17 record 简化数据返回
return new OrderAmountResult(orderDetail.orderId(), totalAmount, totalAmount * 0.08);
}, CUSTOM_EXECUTOR)
.exceptionally(ex -> {
log.error("任务3执行失败:", ex);
throw new RuntimeException("计算订单金额失败", ex);
});
}
// 模拟业务耗时
private void simulateDelay(long millis) {
try {
TimeUnit.MILLISECONDS.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("任务被中断", e);
}
}
// Java 17 Record:订单详情
public record OrderDetail(String orderId, Long userId, String productName, int quantity, double price) {}
// Java 17 Record:订单金额计算结果
public record OrderAmountResult(String orderId, double totalAmount, double tax) {}
// 资源关闭:Spring 销毁时关闭线程池(try-with-resources 思想)
@Override
protected void finalize() throws Throwable {
try (ExecutorService executor = CUSTOM_EXECUTOR) {
executor.shutdown();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
log.info("异步线程池已关闭");
} finally {
super.finalize();
}
}
// 测试入口
public static void main(String[] args) {
AsyncCascadeService service = new AsyncCascadeService();
// 执行级联异步任务
CompletableFuture<OrderAmountResult> resultFuture = service.processOrderAsync(1001L);
// 阻塞获取结果(实际业务中建议使用 thenAccept/thenRun 非阻塞处理)
resultFuture.whenComplete((result, ex) -> {
if (ex != null) {
log.error("级联任务执行失败", ex);
} else {
log.info("级联任务执行完成:{}", result);
}
// 关闭线程池
CUSTOM_EXECUTOR.shutdown();
}).join();
}
}核心知识点解释
- supplyAsync 作用
- 异步执行有返回值的任务,参数是
Supplier<T>(无入参、有返回值) - 第二个参数指定自定义线程池(推荐,避免使用 JVM 默认的
ForkJoinPool.commonPool(),防止核心线程被耗尽)
- 异步执行有返回值的任务,参数是
- thenApplyAsync 作用
- 异步处理前序
CompletableFuture的结果,参数是Function<T, R>(接收前序结果、返回新结果) - 与
thenApply的区别:thenApply使用前序任务的线程执行,thenApplyAsync提交到线程池异步执行,真正实现“级联异步”
- 异步处理前序
- Java 17+ 特性使用
record:简化数据载体(OrderDetail/OrderAmountResult),自动生成构造器、getter、equals/hashCode 等- 异常处理:
exceptionally捕获前序任务异常并兜底,whenComplete统一处理最终结果/异常
- 资源管理
- 线程池使用
try-with-resources思想关闭(finalize 方法中通过 try-with-resources 语法确保线程池关闭) - 中断处理:捕获
InterruptedException后恢复线程中断状态,避免线程状态丢失
- 线程池使用
- 级联调用逻辑
生成订单ID(supplyAsync)→ 查询订单详情(thenApplyAsync)→ 计算金额(thenApplyAsync)
- 每个步骤都异步执行,且后序任务依赖前序任务的结果,实现“流水线式”异步处理。
最佳实践
- 避免在
thenApplyAsync中执行耗时过长的任务,建议拆分多个小任务 - 优先使用自定义线程池,通过
@Configuration配置线程池参数(核心线程数、最大线程数、队列等) - 异常处理:每个异步步骤都建议添加
exceptionally或handle,避免异常穿透导致整个链路失败 - 非阻塞处理结果:使用
thenAccept(消费结果)/thenRun(无结果处理)替代join()阻塞调用
第二部分: 非阻塞处理结果
以下示例移除阻塞的 join(),完全基于 thenAccept(消费结果)、thenRun(无结果收尾)实现非阻塞处理,并补充 whenComplete 统一异常处理,同时优化 Spring 规范的资源关闭(替换不推荐的 finalize() 为 @PreDestroy)。
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.PreDestroy;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 非阻塞处理 CompletableFuture 结果示例
* 核心:thenAccept(消费结果)/thenRun(收尾动作)替代 join() 阻塞调用
*/
@Slf4j
@Service
public class NonBlockingAsyncService {
// 自定义线程池(Spring 中建议通过 @Bean 配置,此处简化)
private final ExecutorService customExecutor = Executors.newFixedThreadPool(5);
// 订单状态枚举(Java 17+ 增强枚举,可结合 switch 表达式)
public enum OrderStatus { SUCCESS, FAILURE }
/**
* 级联异步任务(非阻塞核心逻辑)
*/
public void processOrderNonBlocking(Long userId) {
// 步骤1:异步生成订单ID(supplyAsync)
CompletableFuture<String> orderIdFuture = CompletableFuture.supplyAsync(() -> {
log.info("【异步任务1】生成订单ID | 线程:{}", Thread.currentThread().getName());
simulateDelay(100);
return "ORDER_" + userId + "_" + System.currentTimeMillis();
}, customExecutor)
.exceptionally(ex -> {
log.error("【任务1失败】生成订单ID异常", ex);
throw new RuntimeException("生成订单ID失败", ex);
});
// 步骤2:异步查询订单详情(thenApplyAsync)
CompletableFuture<OrderDetail> orderDetailFuture = orderIdFuture
.thenApplyAsync(orderId -> {
log.info("【异步任务2】查询订单详情 | 订单ID:{} | 线程:{}", orderId, Thread.currentThread().getName());
simulateDelay(150);
return new OrderDetail(orderId, userId, "Java编程实战", 2, 89.9);
}, customExecutor)
.exceptionally(ex -> {
log.error("【任务2失败】查询订单详情异常", ex);
throw new RuntimeException("查询订单详情失败", ex);
});
// 步骤3:非阻塞处理最终结果(核心:thenAccept + thenRun)
orderDetailFuture
// 1. whenComplete:统一处理结果/异常(非阻塞)
.whenComplete((detail, ex) -> {
if (ex != null) {
log.error("【级联任务失败】订单处理异常", ex);
// 非阻塞处理失败逻辑(如通知、落库)
handleOrderFailure(userId, ex.getMessage());
} else {
log.info("【级联任务成功】订单详情:{}", detail);
}
})
// 2. thenAccept:消费结果(有返回值时用,非阻塞)
.thenAccept(detail -> {
log.info("【非阻塞消费】计算订单金额 | 订单ID:{}", detail.orderId());
double total = detail.quantity() * detail.price();
// 模拟:非阻塞更新订单金额到数据库
updateOrderAmount(detail.orderId(), total);
})
// 3. thenRun:无结果收尾动作(非阻塞)
.thenRun(() -> {
log.info("【非阻塞收尾】订单处理流程结束 | 清理临时资源/发送通知");
// 模拟:非阻塞发送处理完成通知
sendOrderNotify(userId, OrderStatus.SUCCESS);
});
// 关键:当前线程(如主线程/Controller线程)不会阻塞,直接继续执行后续逻辑
log.info("【主线程不阻塞】订单处理请求已提交,当前线程继续执行其他任务...");
}
// ------------------- 模拟业务方法 -------------------
private void simulateDelay(long millis) {
try {
TimeUnit.MILLISECONDS.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("任务被中断", e);
}
}
private void handleOrderFailure(Long userId, String errorMsg) {
log.warn("【失败处理】用户{}订单处理失败:{}", userId, errorMsg);
// 实际场景:非阻塞发送失败通知、记录异常日志等
}
private void updateOrderAmount(String orderId, double total) {
log.info("【更新金额】订单{}总金额:{}元", orderId, total);
// 实际场景:非阻塞调用DAO更新数据库
}
private void sendOrderNotify(Long userId, OrderStatus status) {
// Java 17+ switch表达式(简化分支逻辑)
String msg = switch (status) {
case SUCCESS -> "用户" + userId + "订单处理完成";
case FAILURE -> "用户" + userId + "订单处理失败";
};
log.info("【发送通知】{}", msg);
}
// ------------------- Java 17+ 特性 -------------------
// Record:简化数据载体(自动生成构造器、getter、equals等)
public record OrderDetail(String orderId, Long userId, String productName, int quantity, double price) {}
// ------------------- 资源管理(Spring规范) -------------------
// Spring 销毁Bean时关闭线程池(try-with-resources 确保资源释放)
@PreDestroy
public void destroy() {
log.info("【关闭线程池】开始释放异步线程池资源...");
// try-with-resources 自动关闭 ExecutorService(Java 9+ 支持)
try (ExecutorService executor = customExecutor) {
executor.shutdown();
if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
executor.shutdownNow();
log.warn("【强制关闭】线程池未正常终止,强制关闭");
}
} catch (InterruptedException e) {
customExecutor.shutdownNow();
Thread.currentThread().interrupt();
log.error("【关闭异常】线程池关闭被中断", e);
}
log.info("【关闭完成】异步线程池已释放");
}
// ------------------- 测试入口(非阻塞效果演示) -------------------
public static void main(String[] args) {
NonBlockingAsyncService service = new NonBlockingAsyncService();
// 提交异步任务(非阻塞)
service.processOrderNonBlocking(1001L);
// 主线程继续执行(不会等待异步任务完成)
log.info("【主线程】已提交订单处理任务,开始执行其他业务逻辑...");
simulateMainThreadWork();
// 等待异步任务完成(仅测试用,实际业务无需此步骤)
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void simulateMainThreadWork() {
log.info("【主线程】执行其他任务:检查库存、验证用户权限...");
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}核心知识点:非阻塞处理的关键区别
| 方法 | 作用 | 是否阻塞 | 适用场景 |
|---|---|---|---|
join() | 阻塞当前线程,等待异步任务返回结果 | 是 | 测试/必须同步获取结果的场景 |
thenAccept() | 异步消费结果(入参为结果,无返回值) | 否 | 有结果需要处理(如更新/通知) |
thenRun() | 异步执行收尾动作(无入参、无返回值) | 否 | 结果处理完成后的清理/日志 |
whenComplete | 异步处理结果+异常(入参为结果+异常) | 否 | 统一捕获异常+分支处理 |
非阻塞处理的核心优势
- 提升系统吞吐量:提交异步任务的线程(如 Controller 线程)无需等待,可立即处理下一个请求,避免线程阻塞导致的资源浪费;
- 避免线程死锁/饥饿:阻塞式
join()可能导致线程池耗尽,非阻塞回调更符合异步编程的设计初衷; - 逻辑解耦:结果消费、异常处理、收尾动作拆分到不同回调中,代码更清晰(符合单一职责)。
关键注意事项
- 异常处理:非阻塞场景下,必须通过
whenComplete/exceptionally捕获异常,否则异常会被“吞掉”(无感知失败); - 线程池选择:
thenAccept/thenRun默认使用前序任务的线程池,建议统一指定自定义线程池(避免默认池耗尽); - 资源生命周期:Spring 中通过
@PreDestroy关闭线程池(替代finalize(),finalize()已被标记为过时),结合try-with-resources确保资源释放; - Java 17+ 增强:示例中使用
switch表达式(替代传统switch)、record(简化数据类),符合最新语法规范。
输出效果(非阻塞特征)
运行代码后,控制台会先打印主线程不阻塞的日志,再异步打印各任务的日志,体现“提交任务后主线程立即继续执行”的非阻塞特性:
【主线程不阻塞】订单处理请求已提交,当前线程继续执行其他任务... 【主线程】执行其他任务:检查库存、验证用户权限... 【异步任务1】生成订单ID | 线程:pool-1-thread-1 【异步任务2】查询订单详情 | 订单ID:ORDER_1001_17338xxxxxx | 线程:pool-1-thread-2 【级联任务成功】订单详情:OrderDetail[orderId=ORDER_1001_17338xxxxxx, userId=1001, productName=Java编程实战, quantity=2, price=89.9] 【非阻塞消费】计算订单金额 | 订单ID:ORDER_1001_17338xxxxxx 【更新金额】订单ORDER_1001_17338xxxxxx总金额:179.8元 【非阻塞收尾】订单处理流程结束 | 清理临时资源/发送通知 【发送通知】用户1001订单处理完成
第三部分 其它异步编程方式全解析
Java 异步编程体系从基础线程模型到现代响应式编程、虚拟线程逐步演进,以下是主流方式的代码示例+特性解析,均基于 Java 17+ 语法(部分如虚拟线程需 Java 21+ 正式支持,17+ 可通过预览特性启用),并包含异常处理、资源管理规范。
一、基础核心:线程池 + Future(Java 5+)
ExecutorService 结合 Future 是 CompletableFuture 之前的核心异步方式,通过提交 Callable/Runnable 实现异步任务,缺点是结果获取阻塞、无链式调用,但胜在简单可控。
代码示例(Java 17+)
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* 线程池 + Future 异步示例
* 核心:提交Callable获取Future,通过get()阻塞获取结果(可设置超时)
*/
public class FutureAsyncDemo {
// 自定义线程池(Java 17+ 推荐使用Executors静态工厂+try-with-resources关闭)
private static final ExecutorService EXECUTOR = Executors.newVirtualThreadPerTaskExecutor(); // Java 21+ 虚拟线程池(17+需预览)
// Java 17 Record:异步任务结果
public record TaskResult(Long taskId, String content, long costTimeMs) {}
// 异步任务:生成业务数据
private static Callable<TaskResult> generateDataTask(Long taskId) {
return () -> {
long start = System.currentTimeMillis();
log("任务{}开始执行,线程:{}", taskId, Thread.currentThread().getName());
// 模拟耗时
TimeUnit.MILLISECONDS.sleep(200);
long cost = System.currentTimeMillis() - start;
return new TaskResult(taskId, "异步生成数据_" + taskId, cost);
};
}
public static void main(String[] args) {
// 提交异步任务
Future<TaskResult> future1 = EXECUTOR.submit(generateDataTask(1L));
Future<TaskResult> future2 = EXECUTOR.submit(generateDataTask(2L));
// 非阻塞提交后,主线程可执行其他逻辑
log("主线程执行其他任务...");
// 阻塞获取结果(可设置超时,避免永久阻塞)
try {
TaskResult result1 = future1.get(1, TimeUnit.SECONDS); // 超时时间1秒
TaskResult result2 = future2.get(1, TimeUnit.SECONDS);
log("任务1结果:{}", result1);
log("任务2结果:{}", result2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log("任务被中断:{}", e.getMessage());
} catch (ExecutionException e) {
log("任务执行失败:{}", e.getCause().getMessage());
} catch (TimeoutException e) {
log("任务超时:{}", e.getMessage());
future1.cancel(true); // 超时取消任务
future2.cancel(true);
} finally {
// try-with-resources 关闭线程池(Java 9+ 支持ExecutorService自动关闭)
try (ExecutorService executor = EXECUTOR) {
executor.shutdown();
if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
executor.shutdownNow();
log("线程池强制关闭");
}
} catch (InterruptedException e) {
EXECUTOR.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
private static void log(String msg, Object... args) {
System.out.printf("[%s] %s%n", Thread.currentThread().getName(), String.format(msg, args));
}
}核心特性
- 优点:基础通用,兼容所有Java版本,线程池可控;
- 缺点:
Future.get()阻塞线程,无链式调用、无异步异常处理,无法组合多个任务; - 适用场景:简单异步任务,无需结果联动的场景。
二、现代轻量:虚拟线程(VirtualThread,Java 21+ 正式/17+ 预览)
Project Loom 推出的虚拟线程(用户态线程)是 Java 异步编程的重大升级,相比传统平台线程,虚拟线程轻量(百万级创建)、无上下文切换开销,天然适配异步场景。
代码示例(Java 21+)
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 虚拟线程(VirtualThread)异步示例
* Java 21+ 正式支持,17+ 需添加 --enable-preview 启动参数
*/
public class VirtualThreadAsyncDemo {
// Java 17 Record:用户数据
public record UserData(Long userId, String username, String email) {}
// 异步任务:查询用户信息
private static UserData queryUser(Long userId) {
try {
// 模拟IO阻塞(虚拟线程阻塞时会释放载体线程,无性能损耗)
TimeUnit.MILLISECONDS.sleep(300);
log("查询用户{}完成,线程:{}", userId, Thread.currentThread().getName());
return new UserData(userId, "user_" + userId, userId + "@example.com");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("查询用户中断", e);
}
}
public static void main(String[] args) throws InterruptedException {
// 虚拟线程池(Java 21+ 推荐)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// 提交1000个虚拟线程任务(轻量,无性能压力)
for (long i = 1; i <= 1000; i++) {
long userId = i;
executor.submit(() -> queryUser(userId));
}
// 主线程等待任务完成(仅测试用)
executor.awaitTermination(Duration.ofSeconds(5));
log("所有虚拟线程任务执行完成");
} // try-with-resources 自动关闭线程池
}
private static void log(String msg, Object... args) {
System.out.printf("[%s] %s%n", Thread.currentThread().getName(), String.format(msg, args));
}
}核心特性
- 优点:
- 轻量:单个JVM可创建百万级虚拟线程,远超平台线程(千级);
- 无阻塞损耗:虚拟线程遇到IO阻塞时,会释放底层载体线程(OS线程),避免资源浪费;
- 编程模型简单:无需回调地狱,同步写法实现异步效果;
- 缺点:Java 21+ 正式支持,低版本需升级;
- 适用场景:IO密集型异步任务(如数据库、网络调用),替代传统线程池+回调。
三、异步IO:NIO.2 异步通道(Java 7+)
Java NIO.2 提供 AsynchronousFileChannel/AsynchronousSocketChannel 等异步IO通道,基于事件驱动实现文件/网络IO的异步操作,无需线程阻塞等待IO完成。
代码示例(异步文件读写,Java 17+)
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.charset.StandardCharsets;
/**
* NIO.2 异步文件IO示例
* 核心:CompletionHandler 回调处理异步结果,无线程阻塞
*/
public class AsyncFileIODemo {
private static final Path FILE_PATH = Path.of("async_demo.txt");
public static void main(String[] args) throws InterruptedException {
// try-with-resources 管理异步文件通道(自动关闭资源)
try (AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(
FILE_PATH,
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE
)) {
// 步骤1:异步写入文件
String content = "Java 17 异步IO示例:" + System.currentTimeMillis();
ByteBuffer writeBuffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8));
fileChannel.write(writeBuffer, 0, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer bytesWritten, Void attachment) {
log("异步写入完成,写入字节数:{}", bytesWritten);
// 写入完成后异步读取文件
readFileAsync(fileChannel);
}
@Override
public void failed(Throwable exc, Void attachment) {
log("异步写入失败:{}", exc.getMessage());
}
});
// 主线程等待异步操作完成(仅测试用)
Thread.sleep(2000);
} catch (Exception e) {
log("文件通道操作异常:{}", e.getMessage());
}
}
// 异步读取文件
private static void readFileAsync(AsynchronousFileChannel fileChannel) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
fileChannel.read(readBuffer, 0, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer bytesRead, Void attachment) {
if (bytesRead == -1) {
log("文件读取完成(无数据)");
return;
}
readBuffer.flip();
String content = StandardCharsets.UTF_8.decode(readBuffer).toString();
log("异步读取结果:{}", content);
}
@Override
public void failed(Throwable exc, Void attachment) {
log("异步读取失败:{}", exc.getMessage());
}
});
}
private static void log(String msg, Object... args) {
System.out.printf("[%s] %s%n", Thread.currentThread().getName(), String.format(msg, args));
}
}核心特性
- 优点:底层基于操作系统异步IO(AIO),无用户态线程阻塞,IO效率极高;
- 缺点:编程模型繁琐(依赖
CompletionHandler回调),仅适用于IO场景; - 适用场景:高并发文件/网络IO(如文件服务器、网络通信)。
四、响应式编程:Reactor(Spring WebFlux,Java 8+)
Reactor 是 Java 响应式编程的主流框架(实现 Reactive Streams 规范),基于 Mono(单值异步)/Flux(多值异步)实现非阻塞、背压可控的异步编程,是 Spring WebFlux 的核心。
代码示例(Spring Boot 3.x + Java 17+)
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
/**
* Reactor 响应式异步示例(Spring Boot 3.x + Java 17+)
*/
@Slf4j
@Service
public class ReactorAsyncDemo {
// Java 17 Record:商品数据
public record Product(Long id, String name, double price) {}
// 异步查询商品(Mono 表示单值异步结果)
public Mono<Product> queryProductAsync(Long productId) {
return Mono.fromSupplier(() -> {
log.info("查询商品{},线程:{}", productId, Thread.currentThread().getName());
// 模拟IO耗时
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("查询中断", e);
}
return new Product(productId, "商品_" + productId, 99.9 + productId);
})
.subscribeOn(Schedulers.boundedElastic()) // 指定异步线程池
.timeout(Duration.ofSeconds(1)) // 超时控制
.onErrorResume(ex -> { // 异常兜底
log.error("查询商品失败:{}", ex.getMessage());
return Mono.just(new Product(productId, "默认商品", 0.0));
});
}
// 测试入口(Spring Boot 环境)
public static void main(String[] args) {
ReactorAsyncDemo service = new ReactorAsyncDemo();
// 非阻塞消费结果
service.queryProductAsync(1001L)
.doOnNext(product -> log.info("查询结果:{}", product)) // 消费结果
.doOnTerminate(() -> log.info("异步任务结束")) // 收尾动作
.subscribe(); // 订阅(触发异步执行)
// 主线程等待(仅测试用)
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}核心特性
- 优点:
- 非阻塞+背压:可控制数据生产速率,避免消费者过载;
- 丰富的操作符:
map/flatMap/filter等实现复杂异步逻辑; - 与 Spring 生态深度集成:Spring WebFlux 支持全响应式Web开发;
- 缺点:学习曲线陡峭,调试难度高;
- 适用场景:高并发微服务、流式数据处理、实时响应系统。
五、补充:SwingWorker(GUI 异步场景)
SwingWorker 是 Java 专门为 Swing GUI 设计的异步工具,用于在后台线程执行耗时任务,避免阻塞UI线程(EDT),属于小众但专用的异步方式。
核心特性
- 优点:专为GUI异步设计,内置进度回调、结果发布;
- 缺点:仅适用于Swing/AWT场景,通用性差;
- 适用场景:桌面GUI应用的后台任务(如数据加载、文件导出)。
六、各异步方式对比与选型建议
| 方式 | 核心优势 | 核心劣势 | 适用场景 |
|---|---|---|---|
| ExecutorService+Future | 基础通用,兼容性好 | 阻塞获取结果,无链式调用 | 简单异步任务,无结果联动 |
| VirtualThread | 轻量高效,同步写法异步 | Java 21+ 依赖 | IO密集型异步任务(数据库/网络) |
| NIO.2 异步通道 | 底层AIO,IO效率极高 | 编程繁琐,仅适用于IO | 高并发文件/网络IO |
| Reactor(WebFlux) | 非阻塞+背压,生态完善 | 学习成本高,调试难 | 高并发微服务、流式数据处理 |
| SwingWorker | 专为GUI设计,适配EDT | 通用性差 | Swing/AWT桌面应用 |
| CompletableFuture | 链式调用,任务组合 | 线程池需手动管理 | 通用异步任务,结果联动/组合 |
选型核心原则
- IO密集型任务:优先选 虚拟线程(Java 21+),次选 CompletableFuture;
- 高并发流式处理:选 Reactor(WebFlux);
- 文件/网络IO:选 NIO.2 异步通道;
- 简单异步无联动:选 ExecutorService+Future;
- GUI应用:选 SwingWorker。
所有方式均需遵循:
- 资源管理:使用
try-with-resources关闭线程池/IO通道; - 异常处理:捕获中断异常并恢复线程中断状态,避免状态丢失;
- 线程池规范:自定义线程池,避免使用默认池(如 ForkJoinPool.commonPool())。
到此这篇关于Java 中多线程异步处理的文章就介绍到这了,更多相关java多线程异步内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
