java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > CompletableFuture功能创建

CompletableFuture创建及功能使用全面详解

作者:小成都人

这篇文章主要为大家介绍了CompletableFuture创建及功能使用全面详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

引言

FutureTask对于get()方法容易造成阻塞,所以在其基础上诞生了CompletableFuture。他们的关系就像i和i++的关系,FutureTask能做的,CompletableFuture也能做,并且更加高效,功能更加扩展。

创建CompletableFuture

在CompletableFuture源码注释中,作者并不希望开发人员直接使用实例化去创建CompletableFuture,而是使用四大静态方法。

实例化创建示例:

CompletableFuture completableFuture = new CompletableFuture();

CompletableFuture的四大静态方法

supplyAsync(Supplier<U> supplier) ----有返回值

创建带有返回值的异步任务,类似方法ExecutorServicesubmit(Callable<T> task)方法

CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
  System.out.println(Thread.currentThread().getName() + " come in.....");
    int result = ThreadLocalRandom.current().nextInt(10);
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return result;
});

supplyAsync(Supplier<U> supplier,Executor executor)

ExecutorService threadPool = Executors.newFixedThreadPool(3);
// 如果自己没有创建线程池,则使用默认的ForkJoinPool线程池
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
  System.out.println(Thread.currentThread().getName() + " come in.....");
    int result = ThreadLocalRandom.current().nextInt(10);
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return result;
}, threadPool);

runAsync(Runnable runnable)----没有返回值

创建没有返回值的异步任务,类似ExecutorService submit(Runnable task)方法

CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + " come in.....");
    try {
        TimeUnit.MILLISECONDS.sleep(300);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});
System.out.println(runAsync.get());

runAsync(Runnable runnable,Executor executor)

ExecutorService threadPool = Executors.newFixedThreadPool(3);
// 如果没有指定线程池,则使用默认的ForkJoinPool线程池
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName() + " come in.....");
    try {
        TimeUnit.MILLISECONDS.sleep(300);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}, threadPool);
System.out.println(runAsync.get());

CompletableFuture获取值

get()

最直接的获值方法,但会抛出异常。源码如下

/**
 * Waits if necessary for this future to complete, and then
 * returns its result.
 * 等待线程计算完成以后获取结果
 * @return the result value
 * @throws CancellationException if this future was cancelled
 * @throws ExecutionException if this future completed exceptionally
 * @throws InterruptedException if the current thread was interrupted
 * while waiting
 */
public T get() throws InterruptedException, ExecutionException {
    Object r;
    return reportGet((r = result) == null ? waitingGet(true) : r);
}

get(long timeout, TimeUnit unit)

如上,但可以加入等待时间,超过等待时间,抛出Timeout异常。源码如下

/**
 * Waits if necessary for at most the given time for this future
 * to complete, and then returns its result, if available.
 * 最多等待给定的时间以完成此计算,然后返回其结果(如果可用)
 * @param timeout the maximum time to wait
 * @param unit the time unit of the timeout argument
 * @return the result value
 * @throws CancellationException if this future was cancelled
 * @throws ExecutionException if this future completed exceptionally
 * @throws InterruptedException if the current thread was interrupted
 * while waiting
 * @throws TimeoutException if the wait timed out
 */
public T get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    Object r;
    long nanos = unit.toNanos(timeout);
    return reportGet((r = result) == null ? timedGet(nanos) : r);
}

join()

直接获取值,类似于get(),但与get()不同的是,join()不需要手动抛出异常。代码如下

/**
 * Returns the result value when complete, or throws an
 * (unchecked) exception if completed exceptionally. To better
 * conform with the use of common functional forms, if a
 * computation involved in the completion of this
 * CompletableFuture threw an exception, this method throws an
 * (unchecked) {@link CompletionException} with the underlying
 * exception as its cause.
 * 完成时返回结果值,如果异常完成则抛出(未选中)异常。
 * 为了更好地符合通用函数形式的使用,如果完成此CompletableFuture所涉及的计算引发异常,
 * 则此方法将引发(未选中){@link CompletionException},其原因是基础异常。
 *
 * @return the result value
 * @throws CancellationException if the computation was cancelled
 * @throws CompletionException if this future completed
 * exceptionally or a completion computation threw an exception
 */
public T join() {
    Object r;
    return reportJoin((r = result) == null ? waitingGet(false) : r);
}

getNow(T valueIfAbsent)

和join()相同,不需要手动抛出异常,在执行过程中会返回遇见的异常。不同的是可以给定默认值,如果执行错误或未完成,返回给定的默认值。源码如下

/**
 * Returns the result value (or throws any encountered exception)
 * if completed, else returns the given valueIfAbsent.
 * 如果计算完成,则返回结果值(或引发任何遇到的异常),否则返回给定的默认值
 * @param valueIfAbsent the value to return if not completed
 * @return the result value, if completed, else the given valueIfAbsent
 * @throws CancellationException if the computation was cancelled
 * @throws CompletionException if this future completed
 * exceptionally or a completion computation threw an exception
 */
public T getNow(T valueIfAbsent) {
    Object r;
    return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}

complete

通俗来说,在complete之前设定一个等待时间,如果在等待时间内没有计算出结果,则返回true,并返回complete给定的默认值。反之为false,返回原定计算的值。源码如下

/**
 * If not already completed, sets the value returned by {@link
 * #get()} and related methods to the given value.
 *
 * @param value the result value
 * @return {@code true} if this invocation caused this CompletableFuture
 * to transition to a completed state, else {@code false}
 */
public boolean complete(T value) {
    boolean triggered = completeValue(value);
    postComplete();
    return triggered;
}

示例:

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return "123";
});
try {
    TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
    e.printStackTrace();
}
System.out.println(completableFuture.complete("给定的默认值") + "\t" + completableFuture.join());

返回结果:

true 给定的默认值

CompletableFuture的回调

其中,带Async后缀的函数表示需要连接的后置任务会被单独提交到线程池中,从而相对前置任务来说是异步运行的。除此之外,两者没有其他区别。

thenApply / thenAccept / thenRun互相依赖

thenApply()-----有入参有返回

表示获取上一个任务的执行结果作为新任务的执行参数,有返回值。但遇见错误时会终止后面所有的线程。

通俗来说,任务A执行完执行B,B需要A的结果,并且B有返回值

thenApply也是有三个方法重载

// 后一个任务与前一个任务在同一线程执行
public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}
// 后一个任务与前一个任务在不同线程中执行
public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(defaultExecutor(), fn);
}
//后一个任务使用自定义线程池执行
public <U> CompletableFuture<U> thenApplyAsync(
    Function<? super T,? extends U> fn, Executor executor) {
    return uniApplyStage(screenExecutor(executor), fn);
}

ps:示例

ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + " come in.....");
    int result = ThreadLocalRandom.current().nextInt(10);
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return result;
}, threadPool);
// 承接上一个多线程,有返回值
CompletableFuture<Integer> thenApply = supplyAsync.thenApply((result) -> {
    System.out.println("上一线程结果" + result);
    return ThreadLocalRandom.current().nextInt(10);
});
System.out.println(thenApply.get());
threadPool.shutdown();

如果没用使用thenApplyAsync()指定自己的线程池,线程池依旧使用的是默认的ForkJoinPool线程池。

thenAccept() ----有入参无返回

消费型回调。接受上一个任务的结果作为参数,但是没有返回值。

通俗来说,任务A执行完执行B,B需要A的结果,B没有返回值

// 后一个任务与前一个任务在同一线程执行
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}
// 后一个任务与前一个任务在不同线程中执行
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
    return uniAcceptStage(defaultExecutor(), action);
}
//后一个任务使用自定义线程池执行
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                               Executor executor) {
    return uniAcceptStage(screenExecutor(executor), action);
}

示例:

public static void main(String[] args) {
    ExecutorService threadPool = Executors.newFixedThreadPool(2);
    CompletableFuture<Void> async = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("111111");
        return 1;
    }, threadPool).thenApplyAsync((f) -> {
        // 手动创建异常
        System.out.println("222222");
        f += 2;
        return f;
    }, threadPool).thenAcceptAsync(f -> {
        f += 3;
        System.out.println("最终的值:" + f);
    }, threadPool);
    System.out.println(Thread.currentThread().getName() + " 先去做其他事情了");
    async.join();
    threadPool.shutdown();
}

ps:只消费

thenRun()----无入参无返回

thenRun 的方法没有入参,也没有返回值。

通俗来说,任务A执行完执行B,并且B不需要A的结果,B没有返回值

示例:

public static void main(String[] args) {
    ExecutorService threadPool = Executors.newFixedThreadPool(2);
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "123";
    },threadPool);
    completableFuture.thenRunAsync(() -> {
        System.out.println("thenRun 无入参,无返回值");
    },threadPool);
    completableFuture.join();
    threadPool.shutdown();
}

小结

thenApply()、thenAccept()、thenRun()的区别,示例代码如下:

System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenRun(()->{}).join());
System.out.println("《-----------------------------》");
System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenAccept(System.out::println).join());
System.out.println("《-----------------------------》");
System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenApply((f)-> "resultB").join());

结果:

null

《------------------------------------------》

resultA

null

《-------------------------------------------》

resultB

exceptionally()

如果执行任务出现异常,则执行 exceptionally 中的代码块,并且需要一个返回值。

示例:

public static void main(String[] args) throws Exception {
    CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println(Thread.currentThread().getName() + " come in.....");
        int a = 10 / 0;
        return 1;
    });
    //whenComplete:参数列表 (T t, U u),如果执行过程正常完成,则执行该分支
    System.out.println(completableFuture.whenComplete((t, u) -> {
        if (u==null) {
            System.out.println(t);
        }
        //exceptionally :如果执行过程出现异常,则走该分支
    }).exceptionally((f) -> {
        System.out.println("异常详细信息:" + f.getMessage());
        return 500;
    }).get());
}

whenComplete()

whenComplete 算是 exceptionallythenApply的结合,将任务执行的结果和异常作为回到方法的参数,如果没有发生异常则异常参数为null。源码如下:

public CompletableFuture<T> whenComplete(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(null, action);
}
public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action) {
    return uniWhenCompleteStage(asyncPool, action);
}
public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action, Executor executor) {
    return uniWhenCompleteStage(screenExecutor(executor), action);
}

示例:

ExecutorService threadPool = Executors.newFixedThreadPool(3);
        // 自定义线程池,可以防止主线程立刻结束,导致守护线程forkjoin的问题
        try {
            CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() ->{
                System.out.println(Thread.currentThread().getName() + " come in.....");
                int result = ThreadLocalRandom.current().nextInt(10);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return result;
            }, threadPool).whenComplete((v,e) -> {
                if (e==null) { 
                    System.out.println(Thread.currentThread().getName() + " come in.....");
                    System.out.println("上一个线程执行的结果:" + v);
                }
            }).exceptionally(e -> {
                e.printStackTrace();
                System.out.println("上一个线程执行的异常" + e);
                return null;
            });
            System.out.println(Thread.currentThread().getName() + " 忙其他的去了.....");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }

执行结果:

handle()

表示获取上一个任务的执行结果作为新任务的执行参数,有返回值。相对于thenApply(),handle()可以将异常带入下一个线程处理。源码如下

public <U> CompletableFuture<U> handle(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(null, fn);
}

示例:

public static void main(String[] args) {
    ExecutorService threadPool = Executors.newFixedThreadPool(2);
    CompletableFuture<Integer> handle = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("111111");
        return 1;
    }, threadPool).handle((f, n) -> {
        // 手动创建异常,跳过此线程后面的动作
        int i = 10/0;
        System.out.println("222222");
        f += 2;
        return f;
    }).handle((f, n) -> {
        // 如果检测到上一线程有异常,可以处理
        if (n!=null) {
            f=10;
        }
        System.out.println("333333");
        f += 3;
        return f;
    }).whenComplete((f,e) -> {
        if (e==null) {
            System.out.println("计算结果为:" + f);
        }
    }).exceptionally(e -> {
        e.printStackTrace();
        System.out.println(e.getMessage());
        return null;
    });
    System.out.println(Thread.currentThread().getName() + " 先去做其他事情了");
    handle.join();
    threadPool.shutdown();
}

运行结果:

main 先去做其他事情了
111111
333333
计算结果为:13

CompletableFuture对计算速度的选用

使用applyToEither可对两个线程执行速度进行比较,获取速度最快的执行结果。源码如下

public <U> CompletableFuture<U> applyToEither(
    CompletionStage<? extends T> other, Function<? super T, U> fn) {
    return orApplyStage(null, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn) {
    return orApplyStage(asyncPool, other, fn);
}
public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn,
    Executor executor) {
    return orApplyStage(screenExecutor(executor), other, fn);
}

示例:

CompletableFuture<String> supplyAsyncA = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return "playA";
});
CompletableFuture<String> supplyAsyncB = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return "playB";
});
CompletableFuture<String> applyToEither = supplyAsyncA.applyToEither(supplyAsyncB, f -> {
    return f + " is winner!";
});
System.out.println(Thread.currentThread().getName() + "----" + applyToEither.join());

applyToEitheracceptEitherrunAfterEither的区别与前面回调函数的区别一致,在于是否有返回值

thenAcceptBoth

当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗

applyToEither

两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作。

acceptEither

两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。

runAfterEither

两个CompletionStage,任何一个完成了都会执行下一步的操作(Runnable)

runAfterBoth

两个CompletionStage,都完成了计算才会执行下一步的操作(Runnable)

CompletableFuture多任务合并

thenCombine

thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。,源码如下

public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn) {
    return biApplyStage(null, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn) {
    return biApplyStage(asyncPool, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
    return biApplyStage(screenExecutor(executor), other, fn);
}

示例:

CompletableFuture<Integer> integerCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("线程一。。。。。。启动");
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return 10;
});
CompletableFuture<Integer> integerCompletableFuture2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("线程二。。。。。。启动");
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return 20;
});
CompletableFuture<Integer> result = integerCompletableFuture1.thenCombine(integerCompletableFuture2, (x, y) -> {
    System.out.println("开始合并。。。。。。。。");
    return x + y;
});
System.out.println(result.join());

结果:

线程一。。。。。。启动
线程二。。。。。。启动
开始合并。。。。。。。。
30

allof

等待所有任务完成。源码如下:

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
    return andTree(cfs, 0, cfs.length - 1);
}

示例:

ExecutorService threadPool = Executors.newFixedThreadPool(2);
 CompletableFuture<String> integerCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println("线程一。。。。。。启动");
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "华为";
        },threadPool);
        CompletableFuture<String> integerCompletableFuture2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println("线程二。。。。。。启动");
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "小米";
        },threadPool);
        CompletableFuture<Void> allOf = CompletableFuture.allOf(integerCompletableFuture1, integerCompletableFuture2);
        System.out.println(integerCompletableFuture1.join());
        System.out.println(integerCompletableFuture2.join());
        System.out.println("main..........end.........");
        threadPool.shutdown();

因为allof需要等待所有线程执行完毕,所以会先打印线程二。并等待线程一执行完毕。

结果:

线程二。。。。。。启动
线程一。。。。。。启动
华为
小米
main..........end.........

anyof

等待其中一个任务完成。源码如下:

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
    return orTree(cfs, 0, cfs.length - 1);
}

示例(在任务合并时不同,将allof改为anyof):

CompletableFuture<Object> anyOf = CompletableFuture.anyOf(integerCompletableFuture1, integerCompletableFuture2);
System.out.println(anyOf.join());
System.out.println("main..........end.........");
threadPool.shutdown();

因为线程二的执行比线程一快,所以直接打印线程二。anyof返回的CompletableFuture,存储的时先完成线程的返回结果。

结果:

线程二。。。。。。启动
小米
main..........end.........
线程一。。。。。。启动

以上就是CompletableFuture创建及功能使用全面详解的详细内容,更多关于CompletableFuture功能创建的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文