Java中的CompletableFuture详解
作者:苦 糖 果
1.Future
1.1 Future接口理论知识复习
Future接口(FutueTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。 比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后, 主线程就去做其他事情了,忙其它事情或者先执行完,过了一会才去获取子任务的执行结果或变更的任务状态。
Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。
Future接口能干什么?
Future是Java5新加的一个接口,它提供了一种异步并行计算的功能。 如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。
代码说话: Runnable接口Callable接口
Future接口和FutureTask实现类
目的:异步多线程任务执行且返回有结果,三个特点:多线程/有返回/异步任务
1.2 FutureTask架构
绿色虚线:表示实现的关系,实现一个接口 绿色实线:表示接口之间的继承 蓝色实线:表示类之间的继承
1.3 Future编码实战
public class FutureTaskTest { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { ExecutorService executorService = Executors.newFixedThreadPool(3); long start = System.currentTimeMillis(); FutureTask<String> task2 = new FutureTask<>(() -> { TimeUnit.SECONDS.sleep(2); return "2"; }); FutureTask<String> task1 = new FutureTask<>(() -> { TimeUnit.SECONDS.sleep(1); return "1"; }); FutureTask<String> task3 = new FutureTask<>(() -> { TimeUnit.SECONDS.sleep(3); return "3"; }); executorService.submit(task1); executorService.submit(task2); executorService.submit(task3); System.out.println(task1.get()); System.out.println(task2.get(3,TimeUnit.SECONDS)); while (true){ if(task3.isDone()){ System.out.println(task3.get()); break; }else { TimeUnit.MILLISECONDS.sleep(200); } } System.out.println("执行耗时:"+(System.currentTimeMillis()-start)); executorService.shutdown(); } }
1
2
3
执行耗时:3066
优缺点分析
优点
- future+线程池异步多线程任务配合,能显著提高程序的执行效率。
缺点
- 一旦调用get()方法求结果,如果计算没有完成容易导致程序阻塞 isDone()轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果. 如果想要异步获取结果,通常都会以轮询的方式去获取结果尽量不要阻塞
- Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。
- 将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值
- 将两个或多个异步计算合成一个异步计算,这几个异步计算互相独立,同时后面这个又依赖前一个处理的结果。
- 对计算速度选最快:当Future集合中某个任务最快结束时,返回结果,返回第一名处理
对于简单的业务场景使用Future完全OK,但想完成上述一些复杂的任务,使用Future之前提供的那点API就囊中羞涩,处理起来不够优雅,这时候还是让CompletableFuture以声明式的方式优雅的处理这些需求。Future能干的,CompletableFuture都能干。
2. CompletableFuture
2.1 CompletableFuture对Future的改进
CompletableFuture异步线程发生异常,不会影响主线程,用来记录日志特别方便。
CompletableFuture为什么出现 get()方法在Future 计算完成之前会一直处在阻塞状态下,isDone()方法容易耗费CPU资源, 对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果。
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。因此,JDK8设计出CompletableFuture。 CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。
CompletableFuture和CompletionStage
CompletionStage CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段 一个阶段的计算执行可以是一个Function, Consumer或者Runnable。比如: stage.thenApply(x -> square(x)).thenAccept(×->System.out.print(x)).thenRun(( ->systeh.out.println()) 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。
CompletableFuture 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture 的方法。 它可能代表一个明确完成的Future,也有可能代表一个完成阶段(CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。 它实现了Future和CompletionStage接口
核心的四个静态方法,来创建一个异步任务
从Java8开始引入了CompletableFuture,它是Future的功能增强版。减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法
CompletableFuture的优点 异步任务结束时,会自动回调某个对象的方法; 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行 异步任务出错时,会自动回调某个对象的方法;
2.2 案例精讲
函数式编程已经主流 先说说join和get对比 说说你过去工作中的项目亮点?大厂业务需求说明 一波流Java8函数式编程带走-比价案例实战
Lambda表达式+Stream流式调用+Chain链式调用+Java8函数式编程
案例精讲-从电商网站的比价需求讲起
需求说明 同一款产品,同时搜索出同款产品在各大电商平台的售价;
输出返回: 出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List《mysql》in jd price is 88.05 《mysql》in dangdang price is 86.11 《mysql》in taobao price is 90.43
解决方案,比对同一个商品在各个平台上的价格,要求获得一个清单列表,
1 )step by step,按部就班,查完京东查淘宝,查完淘宝查天猫
2 )all in,万箭齐发,一口气多线程异步任务同时查询
public class CompletableFutureDemo { static List<NetMall> list = Arrays.asList( new NetMall("vip"), new NetMall("jd"), new NetMall("tb"), new NetMall("pdd") ); public static void main(String[] args) { long cur1 = System.currentTimeMillis(); getPrice("Phone").forEach(r-> System.out.println(r)); System.out.println("getPrice耗时"+(System.currentTimeMillis()-cur1)); long cur2 = System.currentTimeMillis(); getPriceByCompletableFuture("Phone").forEach(r-> System.out.println(r)); System.out.println("getPriceByCompletableFuture耗时"+(System.currentTimeMillis()-cur2)); } private static List<String> getPrice(String productName){ return list.stream() .map(r->String.format(productName+" in %s price is %.2f",r.getName(),r.calcPrice(productName))) .collect(Collectors.toList()); } private static List<String> getPriceByCompletableFuture(String productName){ return list.stream() .map(r-> CompletableFuture.supplyAsync(()->String.format(productName+" in %s price is %.2f",r.getName(),r.calcPrice(productName)))) .collect(Collectors.toList()) .stream() .map(s->s.join()) .collect(Collectors.toList()); } } class NetMall{ private String name; public double calcPrice(String productName){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return ThreadLocalRandom.current().nextDouble(100000000)+productName.hashCode(); } public NetMall(String name) { this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
2.3 CompletableFuture常用方法
2.3.1 获得结果和触发计算
获得结果
public T get() 不见不散 public T get(long timeout,TimeUnit unit) 过时不候 public T join() public T getNow(T valuelfAbsent)
没有计算完成的情况下,给我一个替代结果。计算完,返回计算完成后的结果。立即获取结果不阻赛。没算完,返回设定 的valuelfAbsent值
主动触发计算 public bgolean complete(T value) 是否打断get方法立即返回括号值
public class CompletableFutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello CompletableFuture"; }); System.out.println(completableFuture.getNow("心急吃不了热豆腐")); System.out.println(completableFuture.get()); System.out.println(completableFuture.get(1500, TimeUnit.MILLISECONDS)); System.out.println(completableFuture.join()); System.out.println(completableFuture.complete("未雨绸缪")+"\t"+completableFuture.join()); } }
2.3.2 对计算结果进行处理
thenApply 计算结果存在依赖关系,这两个线程串行化 异常相关:由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停。
public class CompletableFutureTest2 { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return 6; },executorService).thenApply((r)-> { int i=2/0; return r * 5; }).thenApply((r)-> { System.out.println(r); return r - 2; }).whenComplete((v, e) -> { System.out.println("计算结果:"+v); }).exceptionally(e -> { System.out.println(e.getMessage()); System.out.println(e); return null; }); System.out.println("============主线程=========="); executorService.shutdown(); } }
发生异常后进入exceptionally代码块,但是thenApply中的代码不会执行,whenComplete依旧会执行
============主线程==========
计算结果:null
java.lang.ArithmeticException: / by zero
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
handle
计算结果存在依赖关系,这两个线程串行化 异常相关:有异常也可以往下一步走,根据带的异常参数可以进步处理
发生异常后进入exceptionally代码块,但是handle和whenComplete依旧会执行
============主线程==========
null
计算结果:null
java.lang.NullPointerException
java.util.concurrent.CompletionException: java.lang.NullPointerException
2.3.3 对计算结果进行消费
接收任务的处理结果,并消费处理,无返回结果thenAccept
public class CompletableFutureTest3 { public static void main(String[] args) { CompletableFuture.supplyAsync(()->{ return 3; }).thenApply(r->{ return r*8; }).thenApply(r->{ return r/2; }).thenAccept(r-> System.out.println(r)); System.out.println(CompletableFuture.supplyAsync(()->"6666").thenRun(()->{}).join()); System.out.println(CompletableFuture.supplyAsync(()->"6666").thenAccept(r-> System.out.println(r)).join()); System.out.println(CompletableFuture.supplyAsync(()->"6666").thenApply(r->r+"9999").join()); } }
12
null
6666
null
66669999
completableFuture和线程池说明
以thenRun和thenRunAsync为例,有什么区别?
没有传入自定义线程池,都用默认线程池ForkJoinPool; 传入了一个自定义线程池, 如果你执行第一个任务的时候,传入了一个自定义线程池: 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
有可能处理太快,系统优化切换原则,直接使用main线程处理 其它如: thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是同理
2.3.4 对计算速度进行选用与对计算结果进行合并
applyToEither:谁快用谁 thenCombine:两个completionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine来处理。先完成的先等着,等待其它分支任务
public class CompletableFutureTest4 { public static void main(String[] args) { CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "1号选手"; }); CompletableFuture<String> second = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "2号选手"; }); CompletableFuture<String> result = first.applyToEither(second, r -> r + "is winner"); CompletableFuture<String> res = first.thenCombine(second, (x, y) -> x + y); System.out.println(result.join()); System.out.println(res.join()); } }
1号选手is winner
1号选手2号选手
到此这篇关于Java中的CompletableFuture详解的文章就介绍到这了,更多相关CompletableFuture详解内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!