type
status
date
slug
summary
tags
category
icon
password
🐵 背景
CompletableFuture结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。CompletableFuture被设计在Java中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。CompletableFuture是由Java8引入的,在Java8之前我们一般通过Future实现异步。Future用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法,Java8之前若要设置回调一般会使用guava的ListenableFuture。 CompletableFuture对Future进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排,同时一定程度解决了回调地狱的问题。
名词解释
- CF:代表CompletableFuture
- CS:代表CompletionStage
🐝 CompletableFuture 核心接口API介绍
Future 简介
API
方法名 | 返回值 | 描述 |
cancel (boolean mayInterruptIfRunning) | boolean | 尝试取消执行此任务。 |
get() | V | 如果需要等待计算完成,然后检索其结果。 |
get(long timeout, TimeUnit unit) | V | 如果需要,最多等待计算完成的给定时间,然后检索其结果(如果可用)。 |
isCancelled() | boolean | 如果此任务在正常完成之前取消,则返回 true 。 |
isDone() | boolean | 如果此任务完成,则返回 true 。 |
局限性
从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:
- 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
- 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
- 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
- 没有异常处理:Future接口中没有关于异常处理的方法;
CompletableFuture 简介
实现
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { }
JDK1.8 才新加入的一个实现类CompletableFuture,而CompletableFuture实现了两个接口(如上面代码所示):Future<T>、CompletionStage<T>,意味着可以像以前一样通过阻塞或者轮询的方式获得结果。
Future表示异步计算的结果,CompletionStage用于表示异步执行过程中的一个步骤Stage,这个步骤可能是由另外一个CompletionStage触发的,随着当前步骤的完成,也可能会触发其他一系列CompletionStage的执行。从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage接口正是定义了这样的能力,我们可以通过其提供的thenAppy、thenCompose等函数式编程方法来组合编排这些步骤。
- CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,
- CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。
CompletionStage 简介
API

CompletionStage<T>接口提供了更多方法来更好的实现异步编排,并且大量的使用了JDK8引入的函数式编程概念。由stage执行的计算可以表示为Function,Consumer或Runnable(使用名称分别包括apply 、accept或run的方法 ),具体取决于它是否需要参数和/或产生结果。 例如:
stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println());
📢 使用CompletableFuture场景
应用场景
1️⃣ 执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度;
2️⃣ 使用CompletableFuture类,它提供了异常管理的机制,让你有机会抛出、管理异步任务执行种发生的异常;
3️⃣ 如果这些异步任务之间相互独立,或者他们之间的的某一些的结果是另一些的输入,你可以讲这些异步任务构造或合并成一个。
举个常见的案例,在APP查询首页信息的时候,一般会涉及到不同的RPC远程调用来获取很多用户相关信息数据,比如:商品banner轮播图信息、用户message消息信息、用户权益信息、用户优惠券信息 等,假设每个rpc invoke()耗时是250ms,那么基于同步的方式获取到话,算下来接口的RT至少大于1s,这响应时长对于首页来说是万万不能接受的,因此,我们这种场景就可以通过多线程异步的方式去优化。

CompletableFuture依赖链分析

根据CompletableFuture依赖数量,可以分为以下几类:零依赖、单依赖、双重依赖和多重依赖 。
- Future1、Future2都是零依赖的体现;
- Future3、Future5都是单依赖的体现,分别依赖于Future1和Future2;
- Future4即为双重依赖的体现,同时依赖于Future1和Future2;
- Future6即为多重依赖的体现,同时依赖于Future3、Future4和Future5;
类似这种多重依赖的流程来说,结果依赖于三个步骤:Future3、Future4、Future5,这种多元依赖可以通过allOf()或anyOf()方法来实现,区别是当需要多个依赖全部完成时使用allOf(),当多个依赖中的任意一个完成即可时使用anyOf(),如下代码所示:
CompletableFuture<Void> Future6 = CompletableFuture.allOf(Future3, Future4, Future5); CompletableFuture<String> result = Future6.thenApply(v -> { //这里的join并不会阻塞,因为传给thenApply的函数是在Future3、Future4、Future5全部完成时,才会执行 。 result3 = Future3.join(); result4 = Future4.join(); result5 = Future5.join(); // 返回result3、result4、result5组装后结果 return assamble(result3, result4, result5); });
🐼 CompletableFuture异步编排
在分析CompletableFuture异步编排之前,我跟大家理清一下CompletionStage接口下 (thenRun、thenApply、thenAccept、thenCombine、thenCompose)、(handle、whenComplete、exceptionally) 相关方法的实际用法和它们之间的区别是什么? 带着你的想法往下看吧!!!
异步编排API
- thenRun:【执行】直接开启一个异步线程执行任务,不接收任何参数,回调方法没有返回值;
- thenApply:【提供】可以提供返回值,接收上一个任务的执行结果,作为下一个Future的入参,回调方法是有返回值的;
- thenAccept:【接收】可以接收上一个任务的执行结果,作为下一个Future的入参,回调方法是没有返回值的;
- thenCombine:【结合】可以结合不同的Future的返回值,做为下一个Future的入参,回调方法是有返回值的;
- thenCompose:【组成】将上一个Future实例结果传递给下一个实例中。
实例代码分析
自定义线程池
/** * 创建一个自定义的线程池 */ public class CustomThreadPool { private int corePoolSize = 5; private int maximumPoolSize = 10; private long keepAliveTime = 60; private TimeUnit unit = TimeUnit.SECONDS; private BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10000); public CustomThreadPool() { } public CustomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, BlockingQueue<Runnable> workQueue) { this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.keepAliveTime = keepAliveTime; this.workQueue = workQueue; } public ThreadPoolExecutor createCustomThreadPool() { ThreadFactory threadFactory = new CustomThreadFactory("CustomThreadPool"); return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); } }
如果所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。通过自定义线程池 customThreadPoolExecutor ,后面不同的异步编排方法,我们可以通过指定对应的线程池。
完整的测试代码
public class CompletableFutureCompose { ThreadPoolExecutor customAsyncTaskExecutor = null; private void init() { CustomThreadPool customThreadPool = new CustomThreadPool(); customAsyncTaskExecutor = customThreadPool.createCustomThreadPool(); } public void thenRun() { System.out.println("------- thenRun start -------"); if (customAsyncTaskExecutor == null) { init(); } CompletableFuture.runAsync(() -> { System.out.println("thread name: " + Thread.currentThread().getName() + " first step..."); }, customAsyncTaskExecutor).thenRun(() -> { System.out.println("thread name: " + Thread.currentThread().getName() + " second step..."); }).thenRunAsync(() -> { System.out.println("thread name: " + Thread.currentThread().getName() + " third step..."); }); System.out.println("------- thenRun end -------"); } public void thenApply() { System.out.println("------- thenApply start -------"); if (customAsyncTaskExecutor == null) { init(); } CompletableFuture.supplyAsync(() -> { System.out.println("thread name:" + Thread.currentThread().getName() + " first step..."); return "hello"; }, customAsyncTaskExecutor).thenApply((result1) -> { String targetResult = result1 + " austin"; System.out.println("first step result: " + result1); System.out.println("thread name: " + Thread.currentThread().getName() + " second step..., targetResult: " + targetResult); return targetResult; }); System.out.println("------- thenApply end -------"); } public void thenAccept() { System.out.println("------- thenAccept start -------"); if (customAsyncTaskExecutor == null) { init(); } CompletableFuture.supplyAsync(() -> { System.out.println("thread name: " + Thread.currentThread().getName() + " first step..."); return "hello"; }, customAsyncTaskExecutor).thenAccept((result1) -> { String targetResult = result1 + " austin"; System.out.println("first step result: " + result1); System.out.println("thread name: " + Thread.currentThread().getName() + " second step..., targetResult: " + targetResult); }); System.out.println("------- thenAccept end -------"); } public void thenCombine() { System.out.println("------- thenCombine start -------"); if (customAsyncTaskExecutor == null) { init(); } CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("thread name: " + Thread.currentThread().getName() + " 执行future1开始..."); return "Hello"; }, customAsyncTaskExecutor); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("thread name: " + Thread.currentThread().getName() + " 执行future2开始..."); return "World"; }, customAsyncTaskExecutor); future1.thenCombine(future2, (result1, result2) -> { String result = result1 + " " + result2; System.out.println("thread name: " + Thread.currentThread().getName() + " 获取到future1、future2聚合结果:" + result); return result; }).thenAccept(result -> System.out.println(result)); System.out.println("------- thenCombine end -------"); } public void thenCompose() { System.out.println("------- thenCompose start -------"); if (customAsyncTaskExecutor == null) { init(); } CompletableFuture.supplyAsync(() -> { // 第一个Future实例结果 System.out.println("thread name: " + Thread.currentThread().getName() + " 执行future1开始..."); return "Hello"; }, customAsyncTaskExecutor).thenCompose(result1 -> CompletableFuture.supplyAsync(() -> { // 将上一个Future实例结果传到这里 System.out.println("thread name: " + Thread.currentThread().getName() + " 执行future2开始..., 第一个实例结果:" + result1); return result1 + " World"; })).thenCompose(result12 -> CompletableFuture.supplyAsync(() -> { // 将第一个和第二个实例结果传到这里 System.out.println("thread name: " + Thread.currentThread().getName() + " 执行future3开始..., 第一第二个实现聚合结果:" + result12); String targetResult = result12 + ", I am austin!"; System.out.println("最终输出结果:" + targetResult); return targetResult; })); System.out.println("------- thenCompose end -------"); } public static CompletableFuture handle(int a, int b) { return CompletableFuture.supplyAsync(() -> a / b) .handle((result, ex) -> { if (null != ex) { System.out.println("handle error: " + ex.getMessage()); return 0; } else { return result; } }); } public static CompletableFuture whenComplete(int a, int b) { return CompletableFuture.supplyAsync(() -> a / b) .whenComplete((result, ex) -> { if (null != ex) { System.out.println("whenComplete error: " + ex.getMessage()); } }); } public HomeVO homeIndex(String userId, String lang) { if (customAsyncTaskExecutor == null) { init(); } BuildTask buildTask = new BuildTask(); // 获取Banner轮播图信息 CompletableFuture<List<BannerVO>> future1 = CompletableFuture.supplyAsync(() -> buildTask.buildBanners(userId, lang), customAsyncTaskExecutor); // 获取用户message通知信息 CompletableFuture<List<NotificationVO>> future2 = CompletableFuture.supplyAsync(() -> buildTask.buildNotifications(userId, lang), customAsyncTaskExecutor); // 获取用户权益信息 CompletableFuture<List<BenefitVO>> future3 = CompletableFuture.supplyAsync(() -> buildTask.buildBenefits(userId, lang), customAsyncTaskExecutor); // 获取优惠券信息 CompletableFuture<List<CouponVO>> future4 = CompletableFuture.supplyAsync(() -> buildTask.buildCoupons(userId), customAsyncTaskExecutor); CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2, future3, future4); HomeVO finalHomeVO = new HomeVO(); CompletableFuture<HomeVO> resultFuture = allOfFuture.thenApply(v -> { try { finalHomeVO.setBanners(future1.get()); finalHomeVO.setNotifications(future2.get()); finalHomeVO.setBenefits(future3.get()); finalHomeVO.setCoupons(future4.get()); return finalHomeVO; } catch (Exception e) { System.out.println("[Error] assemble homeVO data error: {}" + e); throw new RuntimeException(e); } }); HomeVO homeVO = resultFuture.join(); return homeVO; } public static void main(String[] args) { CompletableFutureCompose completableFutureCompose = new CompletableFutureCompose(); completableFutureCompose.thenRun(); completableFutureCompose.thenApply(); completableFutureCompose.thenAccept(); completableFutureCompose.thenCombine(); completableFutureCompose.thenCompose(); try { System.out.println("success: " + handle(10, 5).get()); System.out.println("fail: " + handle(10, 0).get()); } catch (Exception e) { System.out.println("catch exception= " + e.getMessage()); } System.out.println("------------------------------------------------------------------"); try { System.out.println("success: " + whenComplete(10, 5).get()); System.out.println("fail: " + whenComplete(10, 0).get()); } catch (Exception e) { System.out.println("catch exception=" + e.getMessage()); } completableFutureCompose.homeIndex("1", "10"); } }
总结
- thenAccept()和thenApply()的用法实际上基本上一致,区别在于thenAccept()回调方法是没有返回值的,而thenApply()回调的带返回值的。
- thenCombine() VS thenCompose(),两者之间的区别
thenCombine结合的两个CompletableFuture没有依赖关系,且第二个CompletableFuture不需要等第一个CompletableFuture执行完成才开始。 thenCompose() 可以两个 CompletableFuture 对象,并将前一个任务的返回结果作为下一个任务的参数,它们之间存在着先后顺序。 thenCombine() 会在两个任务都执行完成后,把两个任务的结果合并。两个任务是并行执行的,它们之间并没有先后依赖顺序。
- CompletableFuture有两种方式实现异步,一种是supply开头的方法,一种是run开头的方法:
- supply 开头:该方法可以返回异步线程执行之后的结果;
- run 开头:该方法不会返回结果,就只是执行线程任务。
- 获取CompletableFuture结果
public T get() public T get(long timeout, TimeUnit unit) public T getNow(T valueIfAbsent) public T join() public CompletableFuture<Object> allOf() public CompletableFuture<Object> anyOf()
- get():阻塞式获取执行结果,如果任务还没有完成则会阻塞等待知直到任务执行完成
- get(long timeout, TimeUnit unit):带超时的阻塞式获取执行结果
- getNow():如果已完成,立刻返回执行结果,否则返回给定的valueIfAbsent
- join():该方法和get()方法作用一样, 不抛异常的阻塞式获取异步执行结果
- allOf():当给定的所有CompletableFuture都完成时,返回一个新的CompletableFuture
- anyOf():当给定的其中一个CompletableFuture完成时,返回一个新的CompletableFuture
join()和get()方法都是 阻塞式 调用它们的线程(通常为主线程)来获取CompletableFuture异步之后的返回值。 两者的区别在于join()返回计算的结果或者抛出一个unchecked异常CompletionException,而get()返回一个具体的异常。
- 结果处理
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
- 异常处理
使用CompletableFuture编写代码时,异常处理很重要,CompletableFuture提供了三种方法来处理它们:handle()、whenComplete() 和 exceptionly()。
- handle:返回一个新的CompletionStage,当该阶段正常或异常完成时,将使用此阶段的结果和异常作为所提供函数的参数执行,不会将内部异常抛出。
- whenComplete:返回与此阶段具有相同结果或异常的新CompletionStage,该阶段在此阶段完成时执行给定操作。与方法handle不同,会将内部异常往外抛出。
- exceptionally:返回一个新的CompletableFuture,CompletableFuture提供了异常捕获回调exceptionally,相当于同步调用中的try/catch。
handle() VS whenComplete(), 两者之间的区别
核心区别在于whenComplete不消费异常,而handle消费异常
Two method forms support processing whether the triggering stage completed normally or exceptionally:Method {whenComplete} allows injection of an action regardless of outcome, otherwise preserving the outcome in its completion.Method {handle} additionally allows the stage to compute a replacement result that may enable further processing by other dependent stages.
翻译过来就是:
两种方法形式支持处理触发阶段是否 正常完成 或 异常完成:
- whenComplete:可以访问当前CompletableFuture的 结果 和 异常 作为参数,使用它们并执行您想要的操作。此方法并不能转换完成的结果,会内部抛出异常。
- handle:当此阶段正常或异常完成时,将使用此阶段的结果和异常作为所提供函数的参数来执行。当此阶段完成时,以 该阶段的结果 和 该阶段的异常 作为参数调用给定函数,并且函数的结果用于完成返回的阶段,不会把异常外抛出来。
- 关于异步线程池(十分重要)
异步回调方法可以选择是否传递线程池参数Executor,这里为了实现线程池隔离,当不传递线程池时,默认会使用ForkJoinPool中的公共线程池CommonPool,这个线程池默认创建的线程数是CPU的核数,如果所有的异步回调共享一个线程池,核心与非核心业务都竞争同一个池中的线程,那么一旦有任务执行一些很慢的I/O 操作,就会导致线程池中所有线程都阻塞在I/O操作上,很容易成为系统瓶颈,影响整个系统的性能。因此, 建议强制传线程池,且根据实际情况做线程池隔离,减少不同业务之间的相互干扰。
🎖️ 异步化带来的性能提升
- 通过异步化改造,原本同步获取数据的API性能得到明显提升,大大减少了接口的响应时长(RT)。
- 接口的吞吐量大幅度提升。
🔗 引用文章
有关于博客的任何问题,请在下方留言,感谢~ 🤞🏻
- 作者:Sheamus
- 链接:https://www.sheamus.top/article/java/completableFuture
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。
相关文章