java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > java异步CompletableFuture

Java中的异步操作CompletableFuture示例详解

作者:晔子yy

CompletableFuture是Java8引入的异步编程工具,支持非阻塞的链式调用和组合操作,提供了强大的异步任务编排能力,本文通过实例代码讲解Java中的异步操作CompletableFuture,感兴趣的朋友跟随小编一起看看吧

一、CompletableFuture概述

CompletableFuture​ 是 Java 8 引入的一个功能强大的异步编程工具,它实现了FutureCompletionStage接口,提供了更灵活、更强大的异步任务编排能力。相比传统的Future,它支持非阻塞的链式调用和组合操作。

二、CompletableFuture的核心特性

异步任务的创建与执行

异步任务是CompletableFuture的基础单元。关键在于区分任务的发起结果的获取是两个独立的时机,这正是异步的本质。

//最简单的异步调用,默认使用 ForkJoinPool.commonPool()
CompletableFuture<Integer> computeFuture = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(1000);  // 模拟耗时计算
    return 42;
});

CompletableFuture底层是使用线程池帮我们去完成异步的一个调用,所以,我们也可以自定义线程池来实现异步操作。

//自定义线程池
ExecutorService bizExecutor = Executors.newFixedThreadPool(5, r -> {
    Thread t = new Thread(r);
    t.setName("BizExecutor-" + t.getId());
    return t;
});

实际开发中,强烈建议始终指定自定义线程池,避免业务代码占用公共线程池影响系统稳定性。

任务结果的链式处理

这是CompletableFuture最优雅的特性,它将回调地狱转换为流畅的管道。其中有三个重要的方法

// 电商订单处理流水线示例
CompletableFuture<Order> orderFuture = CompletableFuture
    // 阶段1:创建订单(有返回值)
    .supplyAsync(() -> orderService.createOrder(request), orderExecutor)
    // 阶段2:验证库存(转换结果)
    .thenApplyAsync(order -> {
        inventoryService.checkStock(order.getItems());
        return order.markAsValidated();
    }, inventoryExecutor)
    // 阶段3:扣减库存(消费结果,无返回)
    .thenAcceptAsync(order -> {
        inventoryService.deductStock(order.getItems());
        logger.info("库存扣减完成,订单号:{}", order.getId());
    }, inventoryExecutor)
    // 阶段4:发送通知(纯副作用操作)
    .thenRunAsync(() -> {
        notificationService.sendOrderCreated();
    }, notificationExecutor);

带有Async后缀的方法(如thenApplyAsync)会在新线程中执行,而不带后缀的会在前一个任务完成的线程中执行。在IO密集型场景使用Async版本可提高并发度。

多个任务组合

现实业务中很少有单一异步任务,更多是多个任务的协同。CompletableFuture提供了丰富的组合模式。

// 场景:用户详情页需要聚合多个服务的数据
public CompletableFuture<UserProfile> getUserProfile(String userId) {
    // 并行调用三个独立服务
    CompletableFuture<UserInfo> userInfoFuture = 
        userService.getUserInfoAsync(userId);
    CompletableFuture<List<Order>> ordersFuture = 
        orderService.getUserOrdersAsync(userId);
    CompletableFuture<List<Address>> addressesFuture = 
        addressService.getUserAddressesAsync(userId);
    // thenCombine等待两个都完成,然后合并
    CompletableFuture<UserWithOrders> userWithOrders = userInfoFuture
        .thenCombine(ordersFuture, (user, orders) -> {
            return new UserWithOrders(user, orders);
        });
    // thenCompose前一个完成后再开始下一个
    CompletableFuture<String> personalizedGreeting = userInfoFuture
        .thenCompose(user -> 
            greetingService.getGreetingAsync(user.getLanguage(), user.getName())
        );
    // allOf 等待所有(3+个)任务完成
    return CompletableFuture.allOf(
            userWithOrders, 
            addressesFuture, 
            personalizedGreeting
        )
        .thenApply(v -> {
            // 所有都完成后的聚合
            return new UserProfile(
                userWithOrders.join().getUser(),
                userWithOrders.join().getOrders(),
                addressesFuture.join(),
                personalizedGreeting.join()
            );
        });
}

这里有个要注意的点:allOf本身返回的是 CompletableFuture<Void>,不包含各子任务的结果。需要像上面示例中通过 join()或 get()来获取,但注意这不会导致死锁,因为此时所有子任务已确定完成。

异常处理

异步中的异常处理比同步更复杂,因为异常和业务代码在时间、空间上都是解耦的。CompletableFuture 的异常处理是功能型、非侵入式的。

// 健壮的数据处理管道
CompletableFuture<Report> reportFuture = CompletableFuture
    .supplyAsync(() -> dataFetcher.fetchData(), dataExecutor)
    // exceptionally异常时提供降级值
    .exceptionally(ex -> {
        log.warn("数据获取失败,使用缓存", ex);
        return cacheService.getCachedData();
    })
    // 对数据做处理,同时捕获处理中的异常
    .thenApplyAsync(data -> {
        try {
            return dataProcessor.process(data);
        } catch (ProcessingException e) {
            throw new CompletionException("处理失败", e);
        }
    }, processExecutor)
    // handle统一处理成功和失败
    .handle((processedData, ex) -> {
        if (ex != null) {
            // 失败路径
            metrics.recordFailure(ex);
            return Report.errorReport("系统繁忙,请稍后重试");
        } else {
            // 成功路径
            return Report.successReport(processedData);
        }
    })
    // whenComplete无论成功失败都执行(类似finally)
    .whenComplete((report, ex) -> {
        // 资源清理、日志记录等
        cleanResources();
        log.info("报告生成完成,状态:{}", 
            ex == null ? "成功" : "失败");
    });

exceptionally和handle 的区别

建议在异步链的末尾使用 handle或 whenComplete做统一的最终处理,而不是在每个阶段都捕获异常。

三、性能优化与注意事项

避免阻塞主线程

在Web请求线程中调用get()会阻塞整个请求

错误写法:

public Result handleRequest() {
    CompletableFuture<Data> future = fetchDataAsync();
    return future.get();  // 阻塞,浪费容器线程
}

接下来展示正确写法:

//完全异步
public CompletableFuture<Result> handleRequestAsync() {
    return fetchDataAsync()
        .thenApply(data -> convertToResult(data))
        .exceptionally(ex -> Result.error("处理失败"));
}

合理使用线程池资源

线程池隔离原则:不同业务、不同重要级别的任务使用不同的线程池,避免相互影响。

根据任务类型的不同配置不同的线程池参数

public class ThreadPoolConfig {
    // CPU密集型:核心数+1
    @Bean("cpuIntensivePool")
    public ExecutorService cpuIntensivePool() {
        int coreSize = Runtime.getRuntime().availableProcessors() + 1;
        return new ThreadPoolExecutor(
            coreSize, coreSize * 2, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),
            new NamedThreadFactory("cpu-pool")
        );
    }
    // IO密集型:可设置较大线程数
    @Bean("ioIntensivePool")  
    public ExecutorService ioIntensivePool() {
        return new ThreadPoolExecutor(
            20, 100, 60L, TimeUnit.SECONDS,
            new SynchronousQueue<>(),  // 无界队列,快速响应
            new NamedThreadFactory("io-pool")
        );
    }
    // 定时/超时任务:使用ScheduledExecutor
    @Bean("timeoutPool")
    public ScheduledExecutorService timeoutPool() {
        return Executors.newScheduledThreadPool(5, 
            new NamedThreadFactory("timeout-pool"));
    }
}

四、总结

CompletableFuture 不仅仅是API的增强,更是编程范式的转变。

核心价值:

正确使用CompletableFuture,可以构建出既高性能又易于维护的异步系统。

到此这篇关于Java中的异步操作CompletableFuture示例详解的文章就介绍到这了,更多相关java异步CompletableFuture内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文