CompletableFuture Start
上篇已经介绍了 Future
,因为 Future
有如下局限性:
- 将多个异步计算的结果合并成一个
- 等待
Future
集合中的所有任务都完成 Future
完成事件(即,任务完成以后触发执行动作)CompletableFuture
是Java8
提供的新特性,提供了函数式编程的能力(面向对象编程->抽象数据,函数式编程->抽象行为),可以通过回调的方式处理计算结果。
CompletableFuture 特点
部分源码
/**
* .....
* @author Doug Lea
* @since 1.8
*/
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
......
}
CompletableFuture
实现了 Future
和 CompletionStage
CompletionStage
CompletionStage
代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段- 一个阶段的计算执行可以是一个
Function
,Consumer
或者Runnable
。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
- 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
CompletableFuture
- 在
Java8
中,CompletableFuture
提供了非常强大的Future
的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture
的方法。 - 它可能代表一个明确完成的
Future
,也有可能代表一个完成阶段(CompletionStage
),它支持在计算完成以后触发一些函数或执行某些动作。
CompletableFuture 基本用法
源码提供一下创建 CompletableFuture
的方法
/**
* Creates a new incomplete CompletableFuture.
*/
public CompletableFuture() {
}
/**
* Creates a new complete CompletableFuture with given encoded result.
*/
private CompletableFuture(Object r) {
this.result = r;
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the {@link ForkJoinPool#commonPool()} with
* the value obtained by calling the given Supplier.
*
* @param supplier a function returning the value to be used
* to complete the returned CompletableFuture
* @param <U> the function's return type
* @return the new CompletableFuture
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the given executor with the value obtained
* by calling the given Supplier.
*
* @param supplier a function returning the value to be used
* to complete the returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new CompletableFuture
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the {@link ForkJoinPool#commonPool()} after
* it runs the given action.
*
* @param runnable the action to run before completing the
* returned CompletableFuture
* @return the new CompletableFuture
*/
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the given executor after it runs the given
* action.
*
* @param runnable the action to run before completing the
* returned CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
/**
* Returns a new CompletableFuture that is already completed with
* the given value.
*
* @param value the value
* @param <U> the type of the value
* @return the completed CompletableFuture
*/
public static <U> CompletableFuture<U> completedFuture(U value) {
return new CompletableFuture<U>((value == null) ? NIL : value);
}
thenApply
当前阶段正常完成以后执行,而且当前阶段的执行的结果会作为下一阶段的输入参数。thenApplyAsync
默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。thenApply
相当于回调函数(callback
).源码如下:
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
- 例1 一个简单的链式执行例子
@Test
public void testCompletable(){
CompletableFuture.supplyAsync(()->1)
.thenApply(i -> i+1)
.thenApply(i -> 1 * i)
.whenComplete((r,e)-> System.out.println(r+","+e));
}
- 例2 两个耗时异步计算同时执行
/**
* 具体的计算类
**/
public class SquareCalculator {
private ExecutorService executor
= Executors.newFixedThreadPool(4);
public Future<Integer> calculate(Integer input) {
return executor.submit(() -> {
Thread.sleep(2000);
return input * input;
});
}
public CompletableFuture<Integer> completableFutureCalculate(Integer input, Long sleep){
CompletableFuture<Integer> future = new CompletableFuture<>();
executor.execute(() -> {
try{
Thread.sleep(sleep);
Integer result = input * input;
future.complete(result);
} catch (InterruptedException e){
System.out.println(e);
}
});
return future;
}
}
/**
** CompletableFuture 处理两个异步计算
**/
@Test
public void testFuture() throws Exception{
SquareCalculator sqa = new SquareCalculator();
System.out.println("Calculating ... ");
long start = System.currentTimeMillis();
CompletableFuture<Integer> reslt = sqa.completableFutureCalculate(11, 2000L);
CompletableFuture<Integer> reslt1 = sqa.completableFutureCalculate(22, 4000L);
CompletableFuture.allOf(reslt,reslt1).join();
System.out.println(reslt.get());
System.out.println(reslt1.get());
System.out.println(System.currentTimeMillis() - start);
}
/**
一个计算结果
Calculating ...
121
484
4058
**/
thenAccept 与 thenRun
-
可以看到,
thenAccept
和thenRun
都是无返回值的。如果说thenApply
是不停的输入输出的进行生产,那么thenAccept
和thenRun
就是在进行消耗。它们是整个计算的最后两个阶段。 -
同样是执行指定的动作,同样是消耗,二者也有区别:
thenAccept
接收上一阶段的输出作为本阶段的输入thenRun
根本不关心前一阶段的输出,根本不不关心前一阶段的计算结果,因为它不需要输入参数
-
例3
thenAccept
的一个简单的例子
@Test
public void testCompletable3() {
CompletableFuture.supplyAsync(() -> 8)
.thenApply(i -> i + 1)
.thenAccept(i -> System.out.println(i * i));
}
- 例4
thenRun
的一个简单的例子
@Test
public void testCompletable4() {
CompletableFuture.supplyAsync(()-> 8)
.thenRun(() -> System.out.println("Hello World"));
}
thenCombine 整合两个计算结果
- 源码
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
- 例5 整合两个计算结果
@Test
public void testCompletable5() {
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " Word ")
.thenApply(String::toUpperCase)
.thenCombine(CompletableFuture.completedFuture("Java"),
(s1,s2)->s1+s2)
.thenAccept(System.out::println);
}
whenComplete
- 等待多个异步任务完成后执行相应的处理,这里要着重理解
whenComplete
返回的是一个新的Future
,如果需要回调需要按顺序执行那么同样需要用CompletableFuture.allOf
重新组合新他们。 - 要等待
Group
里面的Future
执行完成,需要调用join()
或get()
,两者的区别下面会说明
@Test
public void testCompletable9() {
Runnable dummyTask = () -> {
try {
Thread.sleep(200);
} catch (InterruptedException ignored) {
}
};
CompletableFuture<Void> f1 = CompletableFuture.runAsync(dummyTask);
CompletableFuture<Void> f2 = CompletableFuture.runAsync(dummyTask);
f1 = f1.whenComplete((aVoid, throwable) -> System.out.println("Completed f1"));
f2 = f2.whenComplete((aVoid, throwable) -> System.out.println("Completed f2"));
CompletableFuture[] all = {f1, f2};
CompletableFuture<Void> allOf = CompletableFuture.allOf(all);
allOf.whenComplete((aVoid, throwable) -> {
System.out.println("Completed allOf");
});
allOf.join();
System.out.println("Joined");
}
join 与 get 区别联系
两者的作用相同,都是等待 Future
完成然后返回结果。不同点是 join
不会被 interrupt
,但是 get
可能会被中断,使用时需要捕捉异常,接口定义如下:
V get() throws InterruptedException, ExecutionException;
两个实际场景
多个异步请求全部完成
多个异步请求全部完成获取所有结果
/** controller 代码**/
@RequestMapping(method = RequestMethod.GET, value = "/test")
public long test() {
long sleep = randomSleep();
return sleep;
}
private long randomSleep(){
long time = (int)(1+Math.random()*(1000-1+1));
try{
System.out.println("Sleep " + time + "ms");
Thread.sleep(time);
}catch (InterruptedException e){
e.printStackTrace();
}
return time;
}
/** okhttp3 client获取异步消息代码**/
public class AsynchronousMessage {
private static ExecutorService executor = Executors.newFixedThreadPool(4);
private static OkHttpClient okHttpClient = (new OkHttpClient.Builder()).connectionPool(new ConnectionPool(5, 1, TimeUnit.MINUTES)).
retryOnConnectionFailure(true).connectTimeout(5000, TimeUnit.MILLISECONDS).readTimeout(5000, TimeUnit.MILLISECONDS).build();
public static CompletableFuture<Long> getAsynchronousMessage() {
CompletableFuture<Long> future = new CompletableFuture<>();
Request req = new Request.Builder().url("http://10.201.0.39:8080/datanode/test/test").get().build();
executor.execute(() -> {
Response response = null;
try {
response = okHttpClient.newCall(req).execute();
if (response.isSuccessful()) {
String res = response.body().string();
if (res != null) {
future.complete(Long.valueOf(res));
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != response) {
response.close();
}
}
});
return future;
}
}
/** CompletableFuture 调用处理所有结果**/
@Test
public void testCompletable12() {
long start = System.currentTimeMillis();
StringBuilder result = new StringBuilder();
CompletableFuture fut1 = AsynchronousMessage.getAsynchronousMessage();
CompletableFuture fut2 = AsynchronousMessage.getAsynchronousMessage();
CompletableFuture fut3 = AsynchronousMessage.getAsynchronousMessage();
CompletableFuture futures = CompletableFuture.allOf(fut1,fut2,fut3);
System.out.println(fut1.join() + "," + fut2.join() + "," + fut3.join());
System.out.println("Process Time:" + (System.currentTimeMillis() - start));
}
多个异步请求其中之一完成
多个异步请求全部完成获取完成的结果
/**
共通代码参考上面的例子
**/
@Test
public void testCompletable13() {
long start = System.currentTimeMillis();
CompletableFuture fut1 = AsynchronousMessage.getAsynchronousMessage();
CompletableFuture fut2 = AsynchronousMessage.getAsynchronousMessage();
CompletableFuture fut3 = AsynchronousMessage.getAsynchronousMessage();
CompletableFuture futures = CompletableFuture.anyOf(fut1,fut2,fut3);
System.out.println(futures.join());//anyOf 能取到完成任务的结果,而 allOf 不行
System.out.println("Process Time:" + (System.currentTimeMillis() - start));
}
参考资料
- Future 各种用法
https://juejin.im/post/5abc9e59f265da239f077460
本文由 zealzhangz 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为:
2019/02/19 23:20