从 Future 到 CompletableFuture

/ JavaFutureCompletableFuture / 没有评论 / 224浏览

CompletableFuture Start

上篇已经介绍了 Future,因为 Future 有如下局限性:

  1. 将多个异步计算的结果合并成一个
  2. 等待 Future 集合中的所有任务都完成
  3. Future 完成事件(即,任务完成以后触发执行动作)
  4. CompletableFutureJava8 提供的新特性,提供了函数式编程的能力(面向对象编程->抽象数据,函数式编程->抽象行为),可以通过回调的方式处理计算结果。

CompletableFuture 特点

部分源码

/**
 * .....
 * @author Doug Lea
 * @since 1.8
 */
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    ......
}

CompletableFuture 实现了 FutureCompletionStage

CompletionStage

CompletableFuture

CompletableFuture 基本用法

源码提供一下创建 CompletableFuture 的方法

    /**
     * Creates a new incomplete CompletableFuture.
     */
    public CompletableFuture() {
    }

    /**
     * Creates a new complete CompletableFuture with given encoded result.
     */
    private CompletableFuture(Object r) {
        this.result = r;
    }

    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the {@link ForkJoinPool#commonPool()} with
     * the value obtained by calling the given Supplier.
     *
     * @param supplier a function returning the value to be used
     * to complete the returned CompletableFuture
     * @param <U> the function's return type
     * @return the new CompletableFuture
     */
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the given executor with the value obtained
     * by calling the given Supplier.
     *
     * @param supplier a function returning the value to be used
     * to complete the returned CompletableFuture
     * @param executor the executor to use for asynchronous execution
     * @param <U> the function's return type
     * @return the new CompletableFuture
     */
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }

    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the {@link ForkJoinPool#commonPool()} after
     * it runs the given action.
     *
     * @param runnable the action to run before completing the
     * returned CompletableFuture
     * @return the new CompletableFuture
     */
    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }

    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the given executor after it runs the given
     * action.
     *
     * @param runnable the action to run before completing the
     * returned CompletableFuture
     * @param executor the executor to use for asynchronous execution
     * @return the new CompletableFuture
     */
    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }

    /**
     * Returns a new CompletableFuture that is already completed with
     * the given value.
     *
     * @param value the value
     * @param <U> the type of the value
     * @return the completed CompletableFuture
     */
    public static <U> CompletableFuture<U> completedFuture(U value) {
        return new CompletableFuture<U>((value == null) ? NIL : value);
    }

thenApply

当前阶段正常完成以后执行,而且当前阶段的执行的结果会作为下一阶段的输入参数。thenApplyAsync 默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。thenApply 相当于回调函数(callback).源码如下:

    public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }
    @Test
    public void testCompletable(){
        CompletableFuture.supplyAsync(()->1)
                .thenApply(i -> i+1)
                .thenApply(i -> 1 * i)
                .whenComplete((r,e)-> System.out.println(r+","+e));
    }
/**
* 具体的计算类
**/
public class SquareCalculator {
    private ExecutorService executor
            = Executors.newFixedThreadPool(4);

    public Future<Integer> calculate(Integer input) {
        return executor.submit(() -> {
            Thread.sleep(2000);
            return input * input;
        });
    }

    public CompletableFuture<Integer> completableFutureCalculate(Integer input, Long sleep){
        CompletableFuture<Integer> future = new CompletableFuture<>();
        executor.execute(() -> {
            try{
                Thread.sleep(sleep);
                Integer result =  input * input;
                future.complete(result);
            } catch (InterruptedException e){
                System.out.println(e);
            }
        });
        return future;
    }
}

/**
** CompletableFuture 处理两个异步计算
**/
    @Test
    public void testFuture() throws Exception{
        SquareCalculator sqa = new SquareCalculator();
        System.out.println("Calculating ... ");
        long start = System.currentTimeMillis();
        CompletableFuture<Integer> reslt = sqa.completableFutureCalculate(11, 2000L);
        CompletableFuture<Integer> reslt1 = sqa.completableFutureCalculate(22, 4000L);
        CompletableFuture.allOf(reslt,reslt1).join();
        System.out.println(reslt.get());
        System.out.println(reslt1.get());
        System.out.println(System.currentTimeMillis() - start);
    }
    /**
     一个计算结果
     Calculating ... 
     121
     484
     4058
    **/

thenAccept 与 thenRun

    @Test
    public void testCompletable3() {
        CompletableFuture.supplyAsync(() -> 8)
                .thenApply(i -> i + 1)
                .thenAccept(i -> System.out.println(i * i));
    }
    @Test
    public void testCompletable4() {
        CompletableFuture.supplyAsync(()-> 8)
                .thenRun(() -> System.out.println("Hello World"));
    }

thenCombine 整合两个计算结果

    public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(null, other, fn);
    }

    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(asyncPool, other, fn);
    }

    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
        return biApplyStage(screenExecutor(executor), other, fn);
    }
    @Test
    public void testCompletable5() {
        CompletableFuture.supplyAsync(() -> "Hello")
                .thenApply(s -> s + " Word ")
                .thenApply(String::toUpperCase)
                .thenCombine(CompletableFuture.completedFuture("Java"),
                        (s1,s2)->s1+s2)
                .thenAccept(System.out::println);
    }

whenComplete

    @Test
    public void testCompletable9() {
        Runnable dummyTask = () -> {
            try {
                Thread.sleep(200);
            } catch (InterruptedException ignored) {
            }
        };

        CompletableFuture<Void> f1 = CompletableFuture.runAsync(dummyTask);
        CompletableFuture<Void> f2 = CompletableFuture.runAsync(dummyTask);
        f1 = f1.whenComplete((aVoid, throwable) -> System.out.println("Completed f1"));
        f2 = f2.whenComplete((aVoid, throwable) -> System.out.println("Completed f2"));
        CompletableFuture[] all = {f1, f2};
        CompletableFuture<Void> allOf = CompletableFuture.allOf(all);
        allOf.whenComplete((aVoid, throwable) -> {
            System.out.println("Completed allOf");
        });
        allOf.join();
        System.out.println("Joined");
    }

join 与 get 区别联系

两者的作用相同,都是等待 Future 完成然后返回结果。不同点是 join 不会被 interrupt,但是 get 可能会被中断,使用时需要捕捉异常,接口定义如下:

V get() throws InterruptedException, ExecutionException;

两个实际场景

多个异步请求全部完成

多个异步请求全部完成获取所有结果

/** controller 代码**/
    @RequestMapping(method = RequestMethod.GET, value = "/test")
    public long test() {
        long sleep = randomSleep();
        return sleep;
    }

    private long randomSleep(){
        long time = (int)(1+Math.random()*(1000-1+1));
        try{
            System.out.println("Sleep " + time + "ms");
            Thread.sleep(time);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return time;
    }
/** okhttp3 client获取异步消息代码**/ 
public class AsynchronousMessage {
    private static ExecutorService executor = Executors.newFixedThreadPool(4);
    private static OkHttpClient okHttpClient = (new OkHttpClient.Builder()).connectionPool(new ConnectionPool(5, 1, TimeUnit.MINUTES)).
            retryOnConnectionFailure(true).connectTimeout(5000, TimeUnit.MILLISECONDS).readTimeout(5000, TimeUnit.MILLISECONDS).build();

    public static CompletableFuture<Long> getAsynchronousMessage() {
        CompletableFuture<Long> future = new CompletableFuture<>();
        Request req = new Request.Builder().url("http://10.201.0.39:8080/datanode/test/test").get().build();
        executor.execute(() -> {
            Response response = null;
            try {
                response = okHttpClient.newCall(req).execute();
                if (response.isSuccessful()) {
                    String res = response.body().string();
                    if (res != null) {
                        future.complete(Long.valueOf(res));
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (null != response) {
                    response.close();
                }
            }
        });
        return future;
    }
}
/** CompletableFuture 调用处理所有结果**/   
    @Test
    public void testCompletable12() {
        long start = System.currentTimeMillis();
        StringBuilder result = new StringBuilder();

        CompletableFuture fut1 = AsynchronousMessage.getAsynchronousMessage();
        CompletableFuture fut2 = AsynchronousMessage.getAsynchronousMessage();
        CompletableFuture fut3 = AsynchronousMessage.getAsynchronousMessage();
        CompletableFuture futures =  CompletableFuture.allOf(fut1,fut2,fut3);
        System.out.println(fut1.join() + "," + fut2.join() + "," + fut3.join());
        System.out.println("Process Time:" + (System.currentTimeMillis() - start));
    }        

多个异步请求其中之一完成

多个异步请求全部完成获取完成的结果

/**
    共通代码参考上面的例子
**/
    @Test
    public void testCompletable13() {
        long start = System.currentTimeMillis();
        CompletableFuture fut1 = AsynchronousMessage.getAsynchronousMessage();
        CompletableFuture fut2 = AsynchronousMessage.getAsynchronousMessage();
        CompletableFuture fut3 = AsynchronousMessage.getAsynchronousMessage();
        CompletableFuture futures =  CompletableFuture.anyOf(fut1,fut2,fut3);
        System.out.println(futures.join());//anyOf 能取到完成任务的结果,而 allOf 不行
        System.out.println("Process Time:" + (System.currentTimeMillis() - start));
    }

参考资料

https://juejin.im/post/5abc9e59f265da239f077460