Java多线程工具CompletableFuture详解
作者:SolidCocoi
简介
CompletableFuture 是 java 1.8 追加的新特性,通俗的话来说,是一个函数式的,用于控制多任务同步、异步组合操作的工具,实现诸如:
- 控制若干个线程任务间是同步还是异步
- 控制若干个线程间的先后执行顺序、依赖关系
- 若干个线程任务,任意其中一个完成就执行某种逻辑
- ……
将变得十分简单。如果你对前端有一定了解,你会发现它和 Javascript 中的 Promise 是十分类似的。
使用方法
CompletableFuture 需要依仗线程池实现自身功能,这个线程池是个非必填值,如果未特殊指明,将会使用 ForkJoinPool 的实例,构造方法为 ForkJoinPool.makeCommonPool(),该线程池大小为 Runtime.getRuntime().availableProcessors() - 1 即 当前电脑 cpu 可用核心数 -1。
常见 API
方法名称 | 备注 |
complete | 标识自身已完成任务,并传入一个参数作为 CompletableFuture.get() 将获取的值;标识结束是 CAS 方式设置,只有 未结束→结束 的变化才能成功,complete 操作返回 true 时才真正影响 CompletableFuture.get() 将获取的值 |
completedFuture | 这是个静态方法,构造一个已完成的 CompletableFuture 对象,并以传入的参数作为 CompletableFuture.get() 将获取的值 |
get | 阻塞直至任务完成,并获取该任务的返回值 |
join | 阻塞直至任务完成,并获取该任务的返回值(几乎与 get 等同,但 join 不会抛出检查型异常,不强制要求你必须处理) |
cancel | 标识自身已完成,无法阻断自身任务,但会构造 CancellationException传给关联在该对象后续的 CompletableFuture,后续的 CompletableFuture 会因捕获到异常而终止任务。另:该函数入参传入的 boolean 不会产生任何作用( javadoc 里这么描述也是绝了) |
completeExceptionally | 可以理解为 cancel 的可自定义异常版本,其入参就是传递给后续 CompletableFuture 对象的异常 |
exceptionally | CompletableFuture 链路上发生异常时会触发该方法,给链路的最后一个 CompletableFuture 对象配置,即可对全链路进行异常捕获,其入参为异常处理时需要执行的 Function |
isDone | CompletableFuture 任务是否已完成 |
剩下的大多 API 是 run、accept、apply、then、either、both、async …… 的组合,本质上都是语法糖,用了原生的 @FunctionalInterface,决定传入的函数有无返回值、有无入参、在当前任务结束后开始执行、是否任意一个完成就结束、是否全部完成才结束、是否另起线程执行任务 ……
// 你的下一步业务逻辑 Runnable next = new MyRunnable(); // 因为是 run 所以无返回值, 因为是 then 所以在 completableFuture1 对应的任务结束后,执行一段任务 completableFuture1.thenRun(next); // 因为是 run 所以无返回值, 因为是 then 所以在 completableFuture1 对应的任务结束后,因为 async ,所以要另起一个线程,执行一段任务 completableFuture1.thenRunAsync(next);
在看得懂函数式编程的情况下,其他你可通过源码函数定义以此类推
如果想使用自定义的线程池执行任务,那么使用带 Executor executor 的重载函数即可,后不再重复说明,例如:
// 自定义线程池 ThreadPoolExecutor executorService = new ThreadPoolExecutor(10, 20, 2L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(10), // 你可以根据你的需要自己实现 Handler,此处为简写使用现成的 Handler,此处捕获的是流量过载异常 new ThreadPoolExecutor.AbortPolicy()); // 通过自定义线程池使用 CompletableFuture CompletableFuture.runAsync(() -> { System.out.println("我的业务代码"); }, executorService);
使用示例
为简洁代码,睡眠模拟任务运行耗时均使用下列函数,后续不再重复说明
public static void sleep(Long sleepTime) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } }
请时刻注意要进行异常处理,意味着你的 completableFuture 链路得保证调用过下列代码,进行异常处理(链路最后一个使用,则全链路可捕获异常),后续为简洁代码已省略
completableFuture.exceptionally((Throwable ex) -> { ex.printStackTrace(); // 取决于 completableFuture 有无返回值,类型是啥。此处实例是 completableFuture 为 CompletableFuture<Long> return -1L; });
将常规线程任务转化为 CompletableFuture 对象
CompletableFuture completableFuture = new CompletableFuture(); Long startTs = System.currentTimeMillis(); new Thread(() -> { sleep(200L); completableFuture.complete("完成");// 设置 completableFuture 结果并将状态设置为已完成 }).start(); while (!completableFuture.isDone()) { // 非阻塞式获取结果,如果当前未执行完成则返回入参字符串"我还没完成" // 执行到此处恰好任务完成,然后在执行 while 循环判断跳出,你在循环内最多输出一次 “完成” System.out.println(completableFuture.getNow("未完")); } System.out.println("最终结果:" + completableFuture.getNow("未完") + " " + (System.currentTimeMillis() - startTs) + " ms");
阻塞到任意一个任务完成
public static void 阻塞到任意一个完成() throws IOException { // 模拟一个耗时 20 L 的任务 CompletableFuture<Long> completableFuture1 = CompletableFuture.supplyAsync(() -> { sleep(20L); System.out.println("completableFuture1 完成" + " --" + (System.currentTimeMillis() - startTs)); return 20L; }); // 模拟一个耗时 10 L 的任务 CompletableFuture<Long> completableFuture2 = CompletableFuture.supplyAsync(() -> { sleep(10L); System.out.println("completableFuture2 完成" + " --" + (System.currentTimeMillis() - startTs)); return 10L; }); // applyToEitherAsync 代表另起一个线程去执行第二个入参的代码块,这里其实没啥影响,我就不加 Async 了 CompletableFuture result = completableFuture1.applyToEither(completableFuture2, fasterOne -> { System.out.println(fasterOne); return fasterOne; }); // 除了写回调函数方法外的另一种获取最快值的方法 System.out.println("最快的为:" + result.join()); // 调用读取行阻塞住,防止异步任务还未完成就退出了 System.in.read(); }
遇到特别多任务的情况下,你可以尝试数组
CompletableFuture[] array = new CompletableFuture[2]; array[0] = completableFuture1; array[1] = completableFuture2; CompletableFuture fasterOne = CompletableFuture.anyOf(array); System.out.println("最快的为:" + fasterOne.join());
阻塞到全部任务完成
public static void 阻塞到全部完成() { Long startTs = System.currentTimeMillis(); // 模拟一个耗时 20 L 的任务 CompletableFuture<Long> completableFuture1 = CompletableFuture.supplyAsync(() -> { sleep(20L); System.out.println("completableFuture1 完成" + " --" + (System.currentTimeMillis() - startTs)); return 20L; }); // 模拟一个耗时 10 L 的任务 CompletableFuture<Long> completableFuture2 = CompletableFuture.supplyAsync(() -> { sleep(10L); System.out.println("completableFuture2 完成" + " --" + (System.currentTimeMillis() - startTs)); return 10L; }); // thenAcceptBothAsync 代表另起一个线程去执行第二个入参的代码块,这里其实没啥影响,我就不加 Async 了 CompletableFuture result = completableFuture1.thenAcceptBoth(completableFuture2, (r1, r2) -> { System.out.println("completableFuture1 :" + r1); System.out.println("completableFuture2 :" + r2); }); // thenAcceptBoth 是没有返回值的,所以这里是 null ,但这句代码还是有作用的,相当于阻塞到全部任务都完成 System.out.println("返回这个 null 之后意味着全部任务已完成:" + result.join()); }
遇到特别多任务的情况下,你可以尝试数组
CompletableFuture[] array = new CompletableFuture[2]; array[0] = completableFuture1; array[1] = completableFuture2; CompletableFuture<Void> all = CompletableFuture.allOf(array); System.out.println("返回这个 null 之后意味着全部任务已完成:" + all.join());
合并任务
合并任务会涉及到 Compose、Combine,他们区别在于合并的逻辑不同:
- Compose: 合并的两个任务间是同步阻塞执行的,后一个任务需要阻塞等待第一个任务执行完成。你需要传入一个函数 —— 已知第一个任务的返回值,返回合并之后的 CompletableFuture 对象
- Combine: 合并的两个任务间是异步执行的。你需要传入另一个任务、一个函数 —— 已知两个任务的返回值,合并成最终返回值
public static void 合并() { Long startTs = System.currentTimeMillis(); // 模拟一个耗时 100 L 的任务 CompletableFuture<Long> completableFuture1 = CompletableFuture.supplyAsync(() -> { sleep(100L); System.out.println("completableFuture1 完成" + " --" + (System.currentTimeMillis() - startTs)); return 100L; }); // 模拟一个耗时 100 L 的任务 CompletableFuture<Long> completableFuture2 = CompletableFuture.supplyAsync(() -> { sleep(100L); System.out.println("completableFuture2 完成" + " --" + (System.currentTimeMillis() - startTs)); return 100L; }); // 模拟 completableFuture1 合并一个耗时 120 L 的任务,返回值为两个任务总工时 // thenCombineAsync 代表另起一个线程去执行第二个入参的代码块,这里其实没啥影响,我就不加 Async 了 CompletableFuture<Long> completableFuture3 = completableFuture1.thenCombine(CompletableFuture.supplyAsync(() -> { sleep(120L); return 120L; }), (x, y) -> x + y); // 模拟 completableFuture2 合并一个耗时 50 L 的任务,返回值为两个任务总工时 // thenComposeAsync 代表另起一个线程去执行第一个入参的代码块,这里其实没啥影响,我就不加 Async 了 CompletableFuture<Long> completableFuture4 = completableFuture2.thenCompose(r2 -> { CompletableFuture<Long> temp = CompletableFuture.completedFuture(r2); return temp.thenApply(rTemp -> { sleep(50L); return temp.join() + 50L; }); }); boolean printFlag3 = true; boolean printFlag4 = true; String completableFuture3Info = null; String completableFuture4Info = null; while (!completableFuture3.isDone() || !completableFuture4.isDone()) { if (completableFuture3.isDone()) { if (printFlag3) { printFlag3 = false; completableFuture3Info = "completableFuture3 完成:" + completableFuture3.join() + " --" + (System.currentTimeMillis() - startTs); } } if (completableFuture4.isDone()) { if (printFlag4) { printFlag4 = false; completableFuture4Info = "completableFuture4 完成:" + completableFuture4.join() + " --" + (System.currentTimeMillis() - startTs); } } } System.out.println(completableFuture3Info != null ? completableFuture3Info : "completableFuture3 actual:" + completableFuture3.getNow(-100L) + " --" + (System.currentTimeMillis() - startTs)); System.out.println(completableFuture4Info != null ? completableFuture4Info : "completableFuture4 actual:" + completableFuture4.getNow(-100L) + " --" + (System.currentTimeMillis() - startTs)); }
你会观测到总工时更长的反而实际结束时间点更早,completableFuture3 早于 completableFuture4
到此这篇关于Java多线程工具CompletableFuture详解的文章就介绍到这了,更多相关多线程工具CompletableFuture内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!