Java并发编程中的CompletableFuture使用详解
作者:万里顾—程
引言
并行,并发
并发:一个实体上,多个任务有序执行
并行:多个实体上,多个任务同时执行
用户线程
用户线程是系统的工作线程,会完成程序需要完成的业务操作
守护线程
是一种特殊的线程,为其他线程服务的,在后台默默的完成一些系统性的服务,如GC线程;
如果用户线程全部结束,意味着程序需要完成的业务操作已经结束了,守护线程就没有必要继续运行了。所以当系统只剩下守护线程的时候,java虚拟机会自动退出。
1、Future
Future接口(实现类FutureTask)定义了操作异步任务执行的一些方法,如获取异步任务执行的结果、取消任务的执行、判断任务是否被取消,判断任务是否执行完毕等。
Future接口可以为主线程开一个分支线程,专门为主线程处理耗时和费力的复杂业务
Future接口方法
取消任务:
boolean cancel(boolean mayInterruptIfRunning);
判断任务是否被取消
boolean isCancelled();
断任务是否执行完毕
boolean isDone();
获取异步任务执行的结果:
V get() throws InterruptedException, ExecutionException;
获取异步任务执行的结果(限定时间内没获取到结果就抛出异常):
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
注:get方法获取执行结果会出现程序阻塞,所以一般放到程序最后调用
示例代码
//固定大小线程池 ExecutorService threadPool = Executors.newFixedThreadPool(3); FutureTask<String> futureTask1 = new FutureTask<>(()->{ TimeUnit.MICROSECONDS.sleep(500); return "callable1"; }); threadPool.submit(futureTask1); FutureTask<String> futureTask2 = new FutureTask<>(()->{ TimeUnit.SECONDS.sleep(3); return "callable2"; }); threadPool.submit(futureTask2); //获取异步任务执行结果 while (true){ if (futureTask1.isDone()){ System.out.println(futureTask1.get()); break; } } System.out.println(futureTask2.get(2,TimeUnit.SECONDS)); //关闭线程池 threadPool.shutdown();
2、CompletableFuture
CompletableFuture提供了Future的扩展功能,提供了函数式编程能力,可以在任务执行完后通过回调的方式处理计算结果。
CompletableFuture的创建
方法:
- runAsync 无返回值
- supplyAsync 有返回值
ExecutorService threadPool = Executors.newFixedThreadPool(3); //无返回值 CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.MICROSECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } }, threadPool);//不指定线程池就会使用默认的线程池 System.out.println(future.get());//null //有返回值 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); try { TimeUnit.MICROSECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return "supplyAsync"; }, threadPool); System.out.println(future1.get()); threadPool.shutdown();
CompletableFuture示例
一个阶段的完成可能会触发另一个阶段
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(3); CompletableFuture.supplyAsync(() -> { int nextInt = new Random().nextInt(10); // int a = 10 /0; return nextInt; }, threadPool).whenComplete((v,e)->{//获得上一步执行完返回的结果 v;可能出现的异常 e if (e==null){ System.out.println("成功获得结果:"+v); } }).exceptionally(e->{//发生异常后自动调用 e.printStackTrace(); System.out.println("发生异常:"+e.getMessage()); return null; }); threadPool.shutdown(); //todo 主线程执行任务 (注意:主线程结束后默认线程池会自动关闭,推荐使用自定义线程池) }
CompletableFuture常用方法
getNow和complete
//getNow:立即获取结果,若没获取到就使用备选结果 System.out.println(future1.getNow("xxxxx")); //complete:如果操作未执行完就将get获得的值改为给定的值,然后返回true,反之get获得的值就是操作执行完返回的值,然后返回false System.out.println(future1.complete("beixuan") + "\t" + future1.get());
thenApply:计算结果存在传递关系,发生异常时后面步骤停止运行
CompletableFuture.supplyAsync(() -> { return 1; }).thenApply(v -> {//当这一步发生异常时,后续操作不执行,直接跳到最后打印异常信息 //int i = 10/0; return v + 2; }).thenApply(v -> { return v + 3; }).whenComplete((v,e)->{ if (e==null){ System.out.println("thenApply:"+v);//6 } }).exceptionally(e->{ e.printStackTrace(); return null; });
handle:和thenApply类似,但发生异常时后续操作可以正常执行
CompletableFuture.supplyAsync(() -> { System.out.println(1); return 1; }).handle((v,e) -> {//第一步发生异常停止运行,但后面可以正常运行,直至最后把异常打印出来 int i = 10/0; System.out.println(3); return v + 2; }).handle((v,e) -> {//这里正常输出 System.out.println(6); return v + 3; }).whenComplete((v,e)->{ if (e==null){ System.out.println("handle:"+v); } }).exceptionally(e->{ e.printStackTrace(); return null; });
thenAccept:接收上一步执行完的结果,没有返回值
CompletableFuture.supplyAsync(() -> { return 1; }).thenApply(v -> { return v + 2; }).thenAccept(v -> { System.out.println(v); });
thenCombine:对两个异步操作的结果进行合并,先完成的操作要等另一个慢的操作
CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> { try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();} return 10; }); CompletableFuture<Integer> futureB = CompletableFuture.supplyAsync(() -> { try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();} return 20; }); System.out.println(futureA.thenCombine(futureB, (a, b) -> { System.out.println("结果开始合并"); return a + b; }).join());//30 //======================================================================== System.out.println(CompletableFuture.supplyAsync(() -> { try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();} return 10; }).thenCombine(CompletableFuture.supplyAsync(() -> { try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();} return 20; }), (x, y) -> { System.out.println("结果开始合并1"); return x + y; }).thenCombine(CompletableFuture.supplyAsync(() -> { try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();} return 30; }), (x, y) -> { System.out.println("结果开始合并2"); return x + y; }).join());//60
CompletableFuture案例
比较多个商城中同一物品的价格
package com.cheng.juc; import lombok.Data; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; public class GoodsPriceDemo { static List<NetMall> malls = Arrays.asList(new NetMall("JD"),new NetMall("TB"),new NetMall("DD")); /** * 轮流查询价格 * @param malls * @param goodsName * @return */ public static List<String> getPrice(List<NetMall> malls,String goodsName){ return malls.stream() .map(m -> String.format(goodsName + " in %s price is %.2f", m.getNetMallName(), m.calcPrice(goodsName))) .collect(Collectors.toList()); } /** * 使用异步任务查询价格 * @param malls * @param goodsName * @return */ public static List<String> getPricePlus(List<NetMall> malls,String goodsName){ ExecutorService threadPool = Executors.newFixedThreadPool(3); return malls.stream() //为每一个商城开启一个异步任务,然后同时查询价格 .map(m-> CompletableFuture.supplyAsync(()-> String.format(goodsName + " in %s price is %.2f", m.getNetMallName(), m.calcPrice(goodsName)),threadPool)) .collect(Collectors.toList()) .stream().map(d->d.join()) .collect(Collectors.toList()); } public static void main(String[] args) { long l1 = System.currentTimeMillis(); // List<String> price = getPrice(malls,"mysql");// 3s List<String> price = getPricePlus(malls,"mysql");// 1s price.stream().forEach(System.out::println); long l2 = System.currentTimeMillis(); System.out.println("耗时:"+(l2 - l1)); } } @Data class NetMall{ private String netMallName; public NetMall(String netMallName){ this.netMallName = netMallName; } //计算价格 public BigDecimal calcPrice(String goodsName){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } BigDecimal result = BigDecimal.valueOf(ThreadLocalRandom.current().nextDouble() * 2 + goodsName.charAt(0)); return result; } }
比较结果:
mysql in JD price is 110.37
mysql in TB price is 110.58
mysql in DD price is 109.48
到此这篇关于Java并发编程中的CompletableFuture使用详解的文章就介绍到这了,更多相关Java的CompletableFuture内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!