springboot CompletableFuture异步线程池详解
作者:青衣画白扇
这篇文章主要介绍了springboot CompletableFuture异步线程池的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
初始化异步的4种方法
1、继承Thread
2、实现Runnable
3、实现Callable接口+Future (可以拿到返回结果,可以处理异常 jdk1.5之后)
4、线程池【ExecutorService】(实际开发中使用)
- 方式1和2 :主线程无法获取线程的运算结果。
- 方式3:主线程可以获取线程的运算结果,但是不利于控制服务器中的线程资源。可以导致服务器资源耗尽。
- 方式4:线程池性能稳定,控制资源,也可以获取执行结果,并捕获异常。
线程池七大参数
- corePoolSize :[5] 核心线程数【一直存在除非 (allowCoreThreadTimeOut会回收)】;线程池,创建好后就准备好5个线程 Thread thread = new Thread() ;并没有启动 。只有往线程池提交任务后才会执行 thread.start();
- maximumPoolSize : [200] 最大线程数;控制资源
- keepAliveTime :存活时间。如果当前的线程数量大于core(核心)数量。释放空闲的线程(maximumPoolSize-corePoolSize)。只要线程空闲大于指定的keepAliveTime 。
- unit :时间单位。
- BlockingQueue workQueue:阻塞队列。如果任务有很多,就会将目前多的任务放在队列里面。只要有线程空闲,就回去队列里面取出新的任务继续执行。
new LinedBlockingDeque<>() :默认是Integer的最大值 会造成内存不足 - threadFactory:线程的创建工厂
- RejectedExecutionHandler handler:拒绝策列,如果队列满了执行相应的拒绝策略
7.1 DiscardOldestPolicy :新任务进来时丢弃掉没有执行的旧任务
7.2 CallerRunsPolicy:直接调用run方法同步执行
7.3 AbortPolicy:直接丢弃新任务并抛出异常
7.4 DiscardPolicy:直接丢弃不抛出异常
工作顺序
- 线程池创建,准备好core数量的核心线程,准备接受任务
- core满了,就会将再进来的任务放在阻塞队列中,空闲的core就会自己去阻塞队列获取任务执行。
- 阻塞队列满了,就会直接开新线程执行,最大只能开到max指定的数量。
- max满了就用RejectedExecutionHandler 策略拒绝任务。
- max都执行完成,有很多空闲,在指定的时间keepAliveTime以后,释放max-core这些线程
Executors
- newCachedThreadPool() core是0,所有都可以回收
- newFixedThreadPool() 固定大小 core= max ;都不可回收
- newScheduledThreadPllo() 定时任务的线程池
- newSingleThreadExecutor() 单线程的线程池,后台获取到任务去挨个执行
1、创建异步对象
CompletableFuture 提供了四个静态方法
//可以获取到返回值,可传入自定义线程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) //没有返回值,可传入自定义线程池 public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
测试
public static ExecutorService service = Executors.newFixedThreadPool(10); //没有返回值 CompletableFuture.runAsync(()->{ System.out.println("异步任务成功完成了"); },service); //空入参只有返回值 CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> { return "返回值"; }, service);
2、计算完成时回调方法
//上一个任务完成和上一个任务用同一个线程 public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); //交给线程池重新启动一个线程 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); //任务执行完成后(只能感知) public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); //感知异常同时修改返回值 public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
测试代码
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return 10; }, service).whenComplete((res,excption)->{ //虽然能得到异常消息但是不能修改返回结果 System.out.println("异步任务成功完成了"+res+"或者异常:"+excption); }).exceptionally(throwable ->{ //处理异常并可以数据修改返回值 return 0; });
3、handle 方法(异常时处理并返回)
//和上一个任务使用同一个线程执行 public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) //默认线程池开启线程执行 public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) //指定自己的线程池开启线程执行 public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
4、handle 测试
//方法执行完成后的处理不论成功还是失败 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return 10; }, service).handle((res,thr)->{ //未出现异常 当然这里可以不写 为了掩饰 if(res!=null){ return 0; } //出现异常 if(thr!=null){ return 1; } return 0; });
5、 线程串行化方法(B任务需要A任务的执行结果后才能执行)
//A-->B-->C 感知上一步结果并返回最后一次的结果 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor) //B可以感知到A的返回值 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) //A->完成后(有Async开启新的线程没有就是和A一个线程)感知不到上一步的执行结果 public CompletableFuture<Void> thenRun(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
测试
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> { //任务A return 10; }, service).thenRunAsync(() -> { //感知不到 任务A的结果 },service);
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> { //任务A return 10; }, service).thenAcceptAsync((res) -> { //可以感知到任务A的结果,但是不能返回数据 int i = res / 2; },service);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { //任务A return 10; }, service).thenApplyAsync((res) -> { //返回值以最后一次返回为准 return 10 + res; }, service);
6、两个任务组合 - 都要完成
以下三种方式 两个任务都必须完成,才触发该任务 //组合两个future,获取两个future 任务的返回结果,并返回当前任务的返回值 public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) //组合两个future,获取两个future 任务的返回结果,然后处理任务,没有返回值。 public <U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor) //组合两个future ,不需要获取future的结果,只需要两个 future处理完成后处理该任务 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action) public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action) public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor)
测试
CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> { //任务A System.out.println(Thread.currentThread().getId()); System.out.println("任务一结束"); return 10/2; }, service); CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> { //任务A System.out.println(Thread.currentThread().getId()); System.out.println("任务二结束"); return "future02"; }, service); //不能感知到结果 CompletableFuture<Void> future03void = future01.runAfterBothAsync(future02, () -> { System.out.println("任务三执行结束"); }, service); //可以感知获取到前两个任务结果 CompletableFuture<Void> future03No = future01.thenAcceptBothAsync(future02, (res1, res2) -> { System.out.println("任务三执行结束"); }, service); CompletableFuture<String> future03 = future01.thenCombineAsync(future02, (res1, res2) -> { System.out.println("任务三执行结束"); return res1 + "-" + res2; }, service);
7、两个任务一个完成
//两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值 public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn) public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T>other,Function<? super T, U> fn,Executor executor) //两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值 public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action) public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor) //两个任务有一个执行完成,不需要获取future的结果,处理任务 ,也没有返回值 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action) public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action) public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)
测试
/** * 两个任务有一个完成就执行三 * */ CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> { //任务A try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getId()); System.out.println("任务一结束"); return 10/2; }, service); CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> { //任务A System.out.println(Thread.currentThread().getId()); System.out.println("任务二结束"); return "future02"; }, service); //不感知结果,自己也没有返回值 future01.runAfterEitherAsync(future02,()->{ System.out.println("任务三执行结束"); },service); //感知到结果,自己没有返回值 future01.acceptEitherAsync(future02,(res)->{ //感知到线程已经处理完成的结果 System.out.println("任务三执行结束"+res); },service); //感知到结果并返回自己的返回值 CompletableFuture<String> stringCompletableFuture = future01.applyToEitherAsync(future02, res -> { return "任务三结果" + res; }, service);
8、多任务组合
//等待所有任务完成 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) //只要有一个任务完成 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
测试
CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> { //任务A System.out.println(Thread.currentThread().getId()); System.out.println("任务一结束"); return 10/2; }, service); CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> { //任务A System.out.println(Thread.currentThread().getId()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务二结束"); return "future02"; }, service); //此方法是阻塞式等待不建议在这使用 //future01.get(); //future02.get(); //等待所有任务完成不会阻塞 CompletableFuture<Void> allOf= CompletableFuture.allOf(future01, future02); //等待所有的都完成(两个任务同时执行) allOf.get(); log.info("end"); System.out.println(future02.get()+""+future01.get()); //只有一个完成 CompletableFuture<Object> anyOf= CompletableFuture.anyOf(future01, future02); anyOf.get(); System.out.println(anyOf.get());
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。