CompletableFuture创建及功能使用全面详解
作者:小成都人
引言
FutureTask对于get()方法容易造成阻塞,所以在其基础上诞生了CompletableFuture。他们的关系就像i和i++的关系,FutureTask能做的,CompletableFuture也能做,并且更加高效,功能更加扩展。
创建CompletableFuture
在CompletableFuture源码注释中,作者并不希望开发人员直接使用实例化去创建CompletableFuture,而是使用四大静态方法。
实例化创建示例:
CompletableFuture completableFuture = new CompletableFuture();
CompletableFuture的四大静态方法
supplyAsync(Supplier<U> supplier) ----有返回值
创建带有返回值的异步任务,类似方法ExecutorService
的 submit(Callable<T> task)
方法
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " come in....."); int result = ThreadLocalRandom.current().nextInt(10); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return result; });
supplyAsync(Supplier<U> supplier,Executor executor)
ExecutorService threadPool = Executors.newFixedThreadPool(3); // 如果自己没有创建线程池,则使用默认的ForkJoinPool线程池 CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " come in....."); int result = ThreadLocalRandom.current().nextInt(10); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return result; }, threadPool);
runAsync(Runnable runnable)----没有返回值
创建没有返回值的异步任务,类似ExecutorService
submit(Runnable task)
方法
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName() + " come in....."); try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(runAsync.get());
runAsync(Runnable runnable,Executor executor)
ExecutorService threadPool = Executors.newFixedThreadPool(3); // 如果没有指定线程池,则使用默认的ForkJoinPool线程池 CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName() + " come in....."); try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } }, threadPool); System.out.println(runAsync.get());
CompletableFuture获取值
get()
最直接的获值方法,但会抛出异常。源码如下
/** * Waits if necessary for this future to complete, and then * returns its result. * 等待线程计算完成以后获取结果 * @return the result value * @throws CancellationException if this future was cancelled * @throws ExecutionException if this future completed exceptionally * @throws InterruptedException if the current thread was interrupted * while waiting */ public T get() throws InterruptedException, ExecutionException { Object r; return reportGet((r = result) == null ? waitingGet(true) : r); }
get(long timeout, TimeUnit unit)
如上,但可以加入等待时间,超过等待时间,抛出Timeout异常。源码如下
/** * Waits if necessary for at most the given time for this future * to complete, and then returns its result, if available. * 最多等待给定的时间以完成此计算,然后返回其结果(如果可用) * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the result value * @throws CancellationException if this future was cancelled * @throws ExecutionException if this future completed exceptionally * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the wait timed out */ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { Object r; long nanos = unit.toNanos(timeout); return reportGet((r = result) == null ? timedGet(nanos) : r); }
join()
直接获取值,类似于get(),但与get()不同的是,join()不需要手动抛出异常。代码如下
/** * Returns the result value when complete, or throws an * (unchecked) exception if completed exceptionally. To better * conform with the use of common functional forms, if a * computation involved in the completion of this * CompletableFuture threw an exception, this method throws an * (unchecked) {@link CompletionException} with the underlying * exception as its cause. * 完成时返回结果值,如果异常完成则抛出(未选中)异常。 * 为了更好地符合通用函数形式的使用,如果完成此CompletableFuture所涉及的计算引发异常, * 则此方法将引发(未选中){@link CompletionException},其原因是基础异常。 * * @return the result value * @throws CancellationException if the computation was cancelled * @throws CompletionException if this future completed * exceptionally or a completion computation threw an exception */ public T join() { Object r; return reportJoin((r = result) == null ? waitingGet(false) : r); }
getNow(T valueIfAbsent)
和join()相同,不需要手动抛出异常,在执行过程中会返回遇见的异常。不同的是可以给定默认值,如果执行错误或未完成,返回给定的默认值。源码如下
/** * Returns the result value (or throws any encountered exception) * if completed, else returns the given valueIfAbsent. * 如果计算完成,则返回结果值(或引发任何遇到的异常),否则返回给定的默认值 * @param valueIfAbsent the value to return if not completed * @return the result value, if completed, else the given valueIfAbsent * @throws CancellationException if the computation was cancelled * @throws CompletionException if this future completed * exceptionally or a completion computation threw an exception */ public T getNow(T valueIfAbsent) { Object r; return ((r = result) == null) ? valueIfAbsent : reportJoin(r); }
complete
通俗来说,在complete之前设定一个等待时间,如果在等待时间内没有计算出结果,则返回true,并返回complete给定的默认值。反之为false,返回原定计算的值。源码如下
/** * If not already completed, sets the value returned by {@link * #get()} and related methods to the given value. * * @param value the result value * @return {@code true} if this invocation caused this CompletableFuture * to transition to a completed state, else {@code false} */ public boolean complete(T value) { boolean triggered = completeValue(value); postComplete(); return triggered; }
示例:
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } return "123"; }); try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } System.out.println(completableFuture.complete("给定的默认值") + "\t" + completableFuture.join());
返回结果:
true 给定的默认值
CompletableFuture的回调
其中,带Async
后缀的函数表示需要连接的后置任务会被单独提交到线程池中
,从而相对前置任务来说是异步运行
的。除此之外,两者没有其他区别。
thenApply / thenAccept / thenRun互相依赖
thenApply()-----有入参有返回
表示获取上一个任务的执行结果作为新任务的执行参数,有返回值。但遇见错误时会终止后面所有的线程。
通俗来说,任务A执行完执行B,B需要A的结果,并且B有返回值
thenApply
也是有三个方法重载
// 后一个任务与前一个任务在同一线程执行 public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } // 后一个任务与前一个任务在不同线程中执行 public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn) { return uniApplyStage(defaultExecutor(), fn); } //后一个任务使用自定义线程池执行 public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn, Executor executor) { return uniApplyStage(screenExecutor(executor), fn); }
ps:示例
ExecutorService threadPool = Executors.newFixedThreadPool(3); CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " come in....."); int result = ThreadLocalRandom.current().nextInt(10); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return result; }, threadPool); // 承接上一个多线程,有返回值 CompletableFuture<Integer> thenApply = supplyAsync.thenApply((result) -> { System.out.println("上一线程结果" + result); return ThreadLocalRandom.current().nextInt(10); }); System.out.println(thenApply.get()); threadPool.shutdown();
如果没用使用thenApplyAsync()指定自己的线程池,线程池依旧使用的是默认的ForkJoinPool线程池。
thenAccept() ----有入参无返回
消费型回调。接受上一个任务的结果作为参数,但是没有返回值。
通俗来说,任务A执行完执行B,B需要A的结果,B没有返回值
// 后一个任务与前一个任务在同一线程执行 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { return uniAcceptStage(null, action); } // 后一个任务与前一个任务在不同线程中执行 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { return uniAcceptStage(defaultExecutor(), action); } //后一个任务使用自定义线程池执行 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) { return uniAcceptStage(screenExecutor(executor), action); }
示例:
public static void main(String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(2); CompletableFuture<Void> async = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } System.out.println("111111"); return 1; }, threadPool).thenApplyAsync((f) -> { // 手动创建异常 System.out.println("222222"); f += 2; return f; }, threadPool).thenAcceptAsync(f -> { f += 3; System.out.println("最终的值:" + f); }, threadPool); System.out.println(Thread.currentThread().getName() + " 先去做其他事情了"); async.join(); threadPool.shutdown(); }
ps:只消费
thenRun()----无入参无返回
thenRun
的方法没有入参,也没有返回值。
通俗来说,任务A执行完执行B,并且B不需要A的结果,B没有返回值
示例:
public static void main(String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(2); CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } return "123"; },threadPool); completableFuture.thenRunAsync(() -> { System.out.println("thenRun 无入参,无返回值"); },threadPool); completableFuture.join(); threadPool.shutdown(); }
小结
thenApply()、thenAccept()、thenRun()的区别,示例代码如下:
System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenRun(()->{}).join()); System.out.println("《-----------------------------》"); System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenAccept(System.out::println).join()); System.out.println("《-----------------------------》"); System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenApply((f)-> "resultB").join());
结果:
null
《------------------------------------------》
resultA
null
《-------------------------------------------》
resultB
exceptionally()
如果执行任务出现异常,则执行 exceptionally 中的代码块,并且需要一个返回值。
示例:
public static void main(String[] args) throws Exception { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " come in....."); int a = 10 / 0; return 1; }); //whenComplete:参数列表 (T t, U u),如果执行过程正常完成,则执行该分支 System.out.println(completableFuture.whenComplete((t, u) -> { if (u==null) { System.out.println(t); } //exceptionally :如果执行过程出现异常,则走该分支 }).exceptionally((f) -> { System.out.println("异常详细信息:" + f.getMessage()); return 500; }).get()); }
whenComplete()
whenComplete
算是 exceptionally
和thenApply
的结合,将任务执行的结果和异常作为回到方法的参数,如果没有发生异常则异常参数为null。源码如下:
public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(null, action); } public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(asyncPool, action); } public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor) { return uniWhenCompleteStage(screenExecutor(executor), action); }
示例:
ExecutorService threadPool = Executors.newFixedThreadPool(3); // 自定义线程池,可以防止主线程立刻结束,导致守护线程forkjoin的问题 try { CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() ->{ System.out.println(Thread.currentThread().getName() + " come in....."); int result = ThreadLocalRandom.current().nextInt(10); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return result; }, threadPool).whenComplete((v,e) -> { if (e==null) { System.out.println(Thread.currentThread().getName() + " come in....."); System.out.println("上一个线程执行的结果:" + v); } }).exceptionally(e -> { e.printStackTrace(); System.out.println("上一个线程执行的异常" + e); return null; }); System.out.println(Thread.currentThread().getName() + " 忙其他的去了....."); } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); }
执行结果:
handle()
表示获取上一个任务的执行结果作为新任务的执行参数,有返回值。相对于thenApply(),handle()可以将异常带入下一个线程处理。源码如下
public <U> CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? extends U> fn) { return uniHandleStage(null, fn); }
示例:
public static void main(String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(2); CompletableFuture<Integer> handle = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } System.out.println("111111"); return 1; }, threadPool).handle((f, n) -> { // 手动创建异常,跳过此线程后面的动作 int i = 10/0; System.out.println("222222"); f += 2; return f; }).handle((f, n) -> { // 如果检测到上一线程有异常,可以处理 if (n!=null) { f=10; } System.out.println("333333"); f += 3; return f; }).whenComplete((f,e) -> { if (e==null) { System.out.println("计算结果为:" + f); } }).exceptionally(e -> { e.printStackTrace(); System.out.println(e.getMessage()); return null; }); System.out.println(Thread.currentThread().getName() + " 先去做其他事情了"); handle.join(); threadPool.shutdown(); }
运行结果:
main 先去做其他事情了
111111
333333
计算结果为:13
CompletableFuture对计算速度的选用
使用applyToEither
可对两个线程执行速度进行比较,获取速度最快的执行结果。源码如下
public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn) { return orApplyStage(null, other, fn); } public <U> CompletableFuture<U> applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn) { return orApplyStage(asyncPool, other, fn); } public <U> CompletableFuture<U> applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor) { return orApplyStage(screenExecutor(executor), other, fn); }
示例:
CompletableFuture<String> supplyAsyncA = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } return "playA"; }); CompletableFuture<String> supplyAsyncB = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } return "playB"; }); CompletableFuture<String> applyToEither = supplyAsyncA.applyToEither(supplyAsyncB, f -> { return f + " is winner!"; }); System.out.println(Thread.currentThread().getName() + "----" + applyToEither.join());
applyToEither
、acceptEither
、runAfterEither
的区别与前面回调函数的区别一致,在于是否有返回值
thenAcceptBoth
当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗
applyToEither
两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作。
acceptEither
两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。
runAfterEither
两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)
runAfterBoth
两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)
CompletableFuture多任务合并
thenCombine
thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。,源码如下
public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(null, other, fn); } public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(asyncPool, other, fn); } public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) { return biApplyStage(screenExecutor(executor), other, fn); }
示例:
CompletableFuture<Integer> integerCompletableFuture1 = CompletableFuture.supplyAsync(() -> { System.out.println("线程一。。。。。。启动"); try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } return 10; }); CompletableFuture<Integer> integerCompletableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println("线程二。。。。。。启动"); try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } return 20; }); CompletableFuture<Integer> result = integerCompletableFuture1.thenCombine(integerCompletableFuture2, (x, y) -> { System.out.println("开始合并。。。。。。。。"); return x + y; }); System.out.println(result.join());
结果:
线程一。。。。。。启动
线程二。。。。。。启动
开始合并。。。。。。。。
30
allof
等待所有任务完成。源码如下:
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { return andTree(cfs, 0, cfs.length - 1); }
示例:
ExecutorService threadPool = Executors.newFixedThreadPool(2); CompletableFuture<String> integerCompletableFuture1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("线程一。。。。。。启动"); } catch (Exception e) { e.printStackTrace(); } return "华为"; },threadPool); CompletableFuture<String> integerCompletableFuture2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); System.out.println("线程二。。。。。。启动"); } catch (Exception e) { e.printStackTrace(); } return "小米"; },threadPool); CompletableFuture<Void> allOf = CompletableFuture.allOf(integerCompletableFuture1, integerCompletableFuture2); System.out.println(integerCompletableFuture1.join()); System.out.println(integerCompletableFuture2.join()); System.out.println("main..........end........."); threadPool.shutdown();
因为allof
需要等待所有线程执行完毕,所以会先打印线程二。并等待线程一执行完毕。
结果:
线程二。。。。。。启动
线程一。。。。。。启动
华为
小米
main..........end.........
anyof
等待其中一个任务完成。源码如下:
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { return orTree(cfs, 0, cfs.length - 1); }
示例(在任务合并时不同,将allof改为anyof):
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(integerCompletableFuture1, integerCompletableFuture2); System.out.println(anyOf.join()); System.out.println("main..........end........."); threadPool.shutdown();
因为线程二的执行比线程一快,所以直接打印线程二。anyof返回的CompletableFuture,存储的时先完成线程的返回结果。
结果:
线程二。。。。。。启动
小米
main..........end.........
线程一。。。。。。启动
以上就是CompletableFuture创建及功能使用全面详解的详细内容,更多关于CompletableFuture功能创建的资料请关注脚本之家其它相关文章!