type
Post
status
Published
date
Apr 19, 2023
slug
summary
tags
开发
category
技术分享
icon
password
1.什么是CompletableFuture2.使用场景3.并行加载的实现方式3.1同步模型3.2NIO异步模型3.3比较4.CompletableFuture使用与原理4.1解决的问题4.2CompletableFuture的定义4.3CompletableFuture的使用4.3.1零依赖(创建)4.3.2一元依赖4.3.3二元依赖:依赖两个CF4.3.4多元依赖:依赖多个CF4.4CompletableFuture原理4.4.1设计思想4.4.2流程总结
1.什么是CompletableFuture
CompletableFuture是jdk8引入的实现类。扩展了Future和CompletionStage,是一个可以在任务完成阶段触发一些操作Future,简单的来讲就是可以实现异步回调。
2.使用场景
CompletableFuture可以在很多场景下使用
- 异步执行任务并处理任务结果
- 处理多个异步任务的结果
- 对多个异步任务进行组合
- 处理异步任务的异常
(I/O密集型任务)当需要执行一些耗时的操作时,我们可以用CompletableFuture来实现异步回调,从而释放主线程继续执行其他任务。还可以同时处理多个异步任务的结果,对它们进行一些组合操作,或者处理它们的异常情况。例如需要对外提供功能接口,对内调度各个下游服务获取数据进行聚合。
3.并行加载的实现方式
3.1同步模型
从各个服务获取数据,顺序执行

- 简单直观
- 但是在同步调用的场景下,接口耗时长、性能差,接口响应时长T > (T1+T2+T3+……+Tn)
为了缩短接口的响应时间,一般会使用线程池的方式并行获取数据。

导致资源利用率比较低
- CPU资源大量浪费在阻塞等待上
- 会引入更多额外的线程池
同步模型下,会导致硬件资源无法充分利用,系统吞吐量容易达到瓶颈。
3.2NIO异步模型
- 通过RPC NIO异步调用的方式可以降低线程数,从而降低调度(上下文切换)开销,如Dubbo的异步调用
- 通过引入CompletableFuture(下文简称CF)对业务流程进行编排,降低依赖之间的阻塞。
3.3比较
业界广泛流行的解决方案,主要包括Future、CompletableFuture、RxJava、Reactor(响应式编程)Mono/Flux
ㅤ | Future | CompletableFuture | RxJava | Reactor |
Composable(可组合) | ❌ | ✔️ | ✔️ | ✔️ |
Asynchronous(异步) | ✔️ | ✔️ | ✔️ | ✔️ |
Operator fusion(操作融合) | ❌ | ❌ | ✔️ | ✔️ |
Lazy(延迟执行) | ❌ | ❌ | ✔️ | ✔️ |
Backpressure(回压) | ❌ | ❌ | ✔️ | ✔️ |
- 可组合:可以将多个依赖操作通过不同的方式进行编排,例如CompletableFuture提供thenCompose、thenCombine等各种then开头的方法,这些方法就是对“可组合”特性的支持。
- 操作融合:将数据流中使用的多个操作符以某种方式结合起来,进而降低开销(时间、内存)。
- 延迟执行:操作不会立即执行,当收到明确指示时操作才会触发。例如Reactor只有当有订阅者订阅时,才会触发操作。
- 回压:某些异步阶段的处理速度跟不上,直接失败会导致大量数据的丢失,对业务来说是不能接受的,这时需要反馈上游生产者降低调用量。
4.CompletableFuture使用与原理
4.1解决的问题
CompletableFuture是由Java 8引入的,在Java8之前我们一般通过Future实现异步。
- Future用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法。
- CompletableFuture对Future进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排,同时一定程度解决了回调地狱的问题。
Futures和ListenableFuture实现方式
private static void testListeningExecutorService() { // pool ExecutorService executorService = Executors.newFixedThreadPool(5); // guava decorate ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService); // 任务1 ListenableFuture<String> future1 = listeningExecutorService.submit(() -> { System.out.println("步骤一:选车..."); return "选车完成"; }); // 任务2 ListenableFuture<String> future2 = listeningExecutorService.submit(() -> { System.out.println("步骤二:支付..."); return "支付完成"; }); // allOf ListenableFuture<List<String>> listListenableFuture = Futures.allAsList(future1, future2); Futures.addCallback(listListenableFuture, new FutureCallback<List<String>>() { @Override public void onSuccess(@Nullable List<String> strings) { System.out.println("操作完成:" + strings); ListenableFuture<String> future3 = listeningExecutorService.submit(() -> { System.out.println("步骤三:提车..."); return "提车完成"; }); Futures.addCallback(future3, new FutureCallback<String>() { @Override public void onSuccess(@Nullable String s) { System.out.println(s); } @Override public void onFailure(Throwable throwable) { System.out.println("操作失败:" + throwable.getCause()); } }, listeningExecutorService); } @Override public void onFailure(Throwable throwable) { System.out.println("操作失败 " + throwable.getCause()); } }, listeningExecutorService); }
步骤一:选车... 步骤二:支付... 操作完成:[选车完成, 支付完成] 步骤三:提车... 提车完成
CompletableFuture的实现方式
private static void testCompletableFuture() { ExecutorService executorService = Executors.newFixedThreadPool(5); CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> { System.out.println("步骤一:选车..."); return "选车完成"; }, executorService); CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> { System.out.println("步骤二:支付..."); return "支付完成"; }, executorService); cf1.thenCombine(cf2, (r1, r2) -> { System.out.println(r1 + " -> " + r2); System.out.println("步骤三:提车..."); return "提车完成"; }).thenAccept(System.out::println); }
步骤一:选车... 步骤二:支付... 选车完成 -> 支付完成 步骤三:提车... 提车完成
4.2CompletableFuture的定义

CompletableFuture实现了两个接口(如上图所示):Future、CompletionStage。
- Future表示异步计算的结果
- CompletionStage用于表示异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个CompletionStage触发的,随着当前步骤的完成,也可能会触发其他一系列CompletionStage的执行。
从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage接口正是定义了这样的能力,我们可以通过其提供的thenAppy、thenCompose等函数式编程方法来组合编排这些步骤。
4.3CompletableFuture的使用
使用CompletableFuture也是构建依赖树的过程。一个CompletableFuture的完成会触发另外一系列依赖它的CompletableFuture的执行

一个业务接口的流程,其中包括CF1\CF2\CF3\CF4\CF5共5个步骤,并描绘了这些步骤之间的依赖关系,每个步骤可以是一次RPC调用、一次数据库操作或者是一次本地方法调用等
根据依赖数量,可以分为以下几类:零依赖、一元依赖、二元依赖和多元依赖。
4.3.1零依赖(创建)

- 使用runAsync或supplyAsync发起异步调用
- 直接创建一个已完成状态的CompletableFuture
- 先初始化一个未完成的CompletableFuture,然后通过complete()、completeExceptionally(),完成该CompletableFuture
private static void testZeroDep() throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(5); CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "r1", executorService); cf1.join(); System.out.println(cf1.get()); CompletableFuture<String> cf2 = CompletableFuture.completedFuture("r2"); cf2.join(); System.out.println(cf2.get()); CompletableFuture<Object> cf3 = new CompletableFuture<>(); cf3.complete("r3"); cf3.join(); System.out.println(cf3.get()); }
将回调方法转为CompletableFuture,然后再依赖CompletableFure的能力进行调用编排
private static <T> CompletableFuture<T> toCompletableFuture(final AsyncCallback<T> callback, AsyncCall call) { // 新建一个未完成的CompletableFuture CompletableFuture<T> cf = new CompletableFuture<>(); callback.addObserver(new CompletableObserver<T>() { @Override public void onSuccess(T t) { cf.complete(t); } @Override public void onFailure(Throwable t) { cf.completeExceptionally(t); } @Override public void update(Observable o, Object arg) { } }); if (call != null) { try { call.invoke(); } catch (Exception e) { cf.completeExceptionally(e); } } return cf; } @FunctionalInterface public interface AsyncCall { // 回调方法 void invoke() throws Exception; } public static class AsyncCallback<T> extends Observable implements AsyncCall { @Override public void invoke() { } } public interface CompletableObserver<T> extends Observer { void onSuccess(T t); void onFailure(Throwable t); }
4.3.2一元依赖

CF3,CF5分别依赖于CF1和CF2,这种对于单个CompletableFuture的依赖可以通过thenApply、thenAccept、thenCompose等方法来实现
private static void testOneDep() throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(5); CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "CF1", executorService); CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "CF2", executorService); CompletableFuture<String> cf3 = cf1.thenApply(r1 -> { System.out.println(r1); System.out.println("CF3 doing..."); return "CF3"; }); cf3.join(); System.out.println(cf3.get()); CompletableFuture<String> cf5 = cf2.thenApply(r2 -> { System.out.println(r2); System.out.println("CF5 doing..."); return "CF5"; }); cf5.join(); System.out.println(cf5.get()); }
CF1 CF3 doing... CF3 CF1 CF5 doing... CF5
4.3.3二元依赖:依赖两个CF

CF4同时依赖于两个CF1和CF2,这种二元依赖可以通过thenCombine等回调来实现
private static void testTwoDep() throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(5); CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "CF1", executorService); CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "CF2", executorService); CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (r1, r2) -> { System.out.println(r1 + "," + r2); System.out.println("CF4 doing..."); return "CF4"; }); cf4.join(); System.out.println(cf4.get()); }
CF1,CF2 CF4 doing... CF4
4.3.4多元依赖:依赖多个CF

整个流程的结束依赖于三个步骤CF3、CF4、CF5,这种多元依赖可以通过allOf或anyOf方法来实现,区别是当需要多个依赖全部完成时使用allOf,当多个依赖中的任意一个完成即可时使用anyOf
private static void testManyDep() throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(5); CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> "CF3", executorService); CompletableFuture<String> cf4 = CompletableFuture.supplyAsync(() -> "CF4", executorService); CompletableFuture<String> cf5 = CompletableFuture.supplyAsync(() -> "CF5", executorService); CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5); CompletableFuture<String> r = cf6.thenApply(r6 -> { String r3 = cf3.join(); String r4 = cf4.join(); String r5 = cf5.join(); System.out.println(r3 + "," + r4 + "," + r5); System.out.println("CF6 doing..."); return "CF6"; }); r.join(); System.out.println(r.get()); }
CF3,CF4,CF5 CF6 doing... CF6
4.4CompletableFuture原理
CompletableFuture中包含两个字段:result和stack。result用于存储当前CF的结果。stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈(Treiber stack)的形式存储,stack表示栈顶元素。

依赖动作(Dependency Action)都封装在一个单独Completion子类中。下面是Completion类关系结构图。CompletableFuture中的每个方法都对应了图中的一个Completion的子类,Completion本身是观察者的基类。
- UniCompletion继承了Completion,是一元依赖的基类,例如thenApply的实现类UniApply就继承自UniCompletion。
- BiCompletion继承了UniCompletion,是二元依赖的基类,同时也是多元依赖的基类。例如thenCombine的实现类BiRelay就继承自BiCompletion。

4.4.1设计思想

被观察者
- 每个CompletableFuture都可以被看作一个被观察者,其内部有一个Completion类型的链表成员变量stack,用来存储注册到其中的所有观察者。当被观察者执行完成后会弹栈stack属性,依次通知注册到其中的观察者。
- 被观察者CF中的result属性,用来存储返回结果数据。这里可能是一次RPC调用的返回值,也可能是任意对象。
观察者
CompletableFuture支持很多回调方法,例如thenAccept、thenApply、exceptionally等,这些方法接收一个函数类型的参数f,生成一个Completion类型的对象(即观察者),并将入参函数f赋值给Completion的成员变量fn,然后检查当前CF是否已处于完成状态(即result != null),如果已完成直接触发fn,否则将观察者Completion加入到CF的观察者链stack中,再次尝试触发,如果被观察者未执行完则其执行完毕之后通知触发。
- 观察者中的dep属性:指向其对应的CompletableFuture
- 观察者中的src属性:指向其依赖的CompletableFuture
- 观察者Completion中的fn属性:用来存储具体的等待被回调的函数
4.4.2流程
一元依赖
static final class UniApply<T,V> extends UniCompletion<T,V> { Function<? super T,? extends V> fn; UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V> fn) { super(executor, dep, src); this.fn = fn; } final CompletableFuture<V> tryFire(int mode) { CompletableFuture<V> d; CompletableFuture<T> a; if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this)) return null; dep = null; src = null; fn = null; return d.postFire(a, mode); } }
- 将观察者Completion注册到CF1,此时CF1将Completion压栈
- 当CF1的操作运行完成时,会将结果赋值给CF1中的result属性
- 依次弹栈,通知观察者尝试运行

二元依赖
static final class BiApply<T,U,V> extends BiCompletion<T,U,V> { BiFunction<? super T,? super U,? extends V> fn; BiApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiFunction<? super T,? super U,? extends V> fn) { super(executor, dep, src, snd); this.fn = fn; } final CompletableFuture<V> tryFire(int mode) { CompletableFuture<V> d; CompletableFuture<T> a; CompletableFuture<U> b; if ((d = dep) == null || !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this)) return null; dep = null; src = null; snd = null; fn = null; return d.postFire(a, b, mode); } }
thenCombine操作表示依赖两个CompletableFuture
BiApply通过src和snd两个属性关联被依赖的两个CF,fn属性的类型为BiFunction。与单个依赖不同的是,在依赖的CF未完成的情况下,thenCombine会尝试将BiApply压入这两个被依赖的CF的栈中,每个被依赖的CF完成时都会尝试触发观察者BiApply,BiApply会检查两个依赖是否都完成,如果完成则开始执行。

多元依赖
依赖多个CompletableFuture的回调方法包括
allOf、anyOf,allOf观察者实现类为BiRelay,需要所有被依赖的CF完成后才会执行回调;而anyOf观察者实现类为OrRelay,任意一个被依赖的CF完成后就会触发
总结
同步方法
- 如果注册时被依赖的操作已经执行完成,则直接由当前线程执行
- 如果注册时被依赖的操作还未执行完,则由回调线程执行
异步方法
- CompletableFuture默认使用ForkJoinPool中的共用线程池CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的应用,线程数可能成为瓶颈
- 异步回调传入自定义线程池,做好线程池隔离
private static void testSelectPool() { ExecutorService executorService = Executors.newFixedThreadPool(5); CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> { System.out.println("supplyAsync thread: " + Thread.currentThread().getName()); return ""; }, executorService); cf1.thenApply(r1 -> { System.out.println("thenApply thread: " + Thread.currentThread().getName()); return ""; }); cf1.thenApplyAsync(r1 -> { System.out.println("thenApplyAsync thread: " + Thread.currentThread().getName()); return ""; }); cf1.thenApplyAsync(r1 -> { System.out.println("thenApplyAsync thread: " + Thread.currentThread().getName()); return ""; }, executorService); }
supplyAsync thread: pool-1-thread-1 thenApply thread: main thenApplyAsync thread: ForkJoinPool.commonPool-worker-9 thenApplyAsync thread: pool-1-thread-2
private static void testLock() { ExecutorService executorService = new ThreadPoolExecutor(1, 2, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5)); CompletableFuture<String> outer = CompletableFuture.supplyAsync(() -> CompletableFuture.supplyAsync(() -> { System.out.println("inner"); return "inner"; }, executorService).join(), executorService); outer.join(); }