为什么需要使用这个技术?
因为目前我们公司出现出现个别接口变慢的情况,有些是为了执行大量 IO 操作和调用其他的服务,具有 I/O 密集型特点。
执行 IO 操作会导致程序堵塞,CPU 可能会处于空闲状态,因为 CPU 在等待数据到来或者写入的过程中没有其他计算任务。
如下图
上面请求完成需要至少 300ms + 300ms + 200ms = 800ms
如果改用并行方式大概需要 max(300ms, 300ms, 200ms) = 300ms 左右(因为CPU有上下文切换的消耗)
CompletableFuture 与 Future 关系是什么?
简单理解:其实他们的关系就差个 Completable
Completable:这个词强调了 CompletableFuture
与普通 Future
的不同之处。"Completable" 意味着这个对象不仅可以代表一个异步操作的结果,还可以在操作完成时附加额外的动作或计算。
它提供了更丰富的操作,比如可以附加多个回调函数(通过
thenApply
,thenAccept
,thenRun
等方法),这些回调会在异步操作完成时执行。它支持更复杂的组合和链接操作,可以通过
thenCompose
方法将多个CompletableFuture
对象串联起来,形成一个复杂的异步处理流程。它还提供了异常处理的能力,可以通过
exceptionally
和handle
方法来处理异步操作中发生的异常。
快速开始
无依赖任务
描述
无依赖任务是指多个任务间不存在依赖关系即每个任务输入均不依赖与其他任务的输出。
实现
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
return "我是任务1返回值";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
return "我是任务2返回值";
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "我是任务3返回值";
});
// 它能够等待所有传入的 CompletableFuture 任务完成
CompletableFuture.allOf(task1, task2, task3).thenAccept(v -> {
System.out.println("任务1返回值:" + task1.join());
System.out.println("任务2返回值:" + task2.join());
System.out.println("任务3返回值:" + task3.join());
}).join();
或者
public static String getTask(String task) {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "我是任务" + task + "返回值";
}
public static void main(String[] args) {
List<String> allResult = Stream.of(1, 2, 3)
.map(i -> CompletableFuture.supplyAsync(() -> getTask(String.valueOf(i))))
.collect(Collectors.toList())
.stream().map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println(allResult);
}
存在依赖任务
存在依赖任务是指多个任务间依赖关系即某一个或某几个任务的输出是另外的任务的输入
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务1");
return "我是任务1返回值";
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务3");
return "我是任务3返回值";
});
CompletableFuture<String> task2 = task1.thenApply(res -> {
System.out.println("任务2");
return "我是任务2返回值";
});
// task4
CompletableFuture.allOf(task2, task3)
.thenAccept((v) -> {
System.out.println("任务1返回值:" + task1.join());
System.out.println("任务2返回值:" + task2.join());
System.out.println("任务3返回值:" + task3.join());
}).join();
CompletableFuture 原因&原理
CompletableFuture 出现原因
Future 用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法,Java 8 之前若要设置回调一般会使用 guava 的 ListenableFuture,回调的引入又会导致臭名昭著的回调地狱。
CompletableFuture 对 Future 进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排,同时一定程度解决了回调地狱的问题。
CompletableFuture 原理
CompletableFuture 实现了两个接口(如上图所示):Future、CompletionStage。Future 表示异步计算的结果,CompletionStage 用于表示异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个 CompletionStage 触发的,随着当前步骤的完成,也可能会触发其他一系列 CompletionStage 的执行。从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage 接口正是定义了这样的能力,我们可以通过其提供的 thenAppy、thenCompose 等函数式编程方法来组合编排这些步骤。
异常处理
由于异步执行的任务在其他线程上执行,而异常信息存储在线程栈中,因此当前线程除非阻塞等待返回结果,否则无法通过 try\catch 捕获异常。
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
log.info("任务1")
return "我是任务1返回值";
}).exceptionally(e -> {
// 通过 exceptionally 捕获异常,打印日志并返回默认值
log.error("任务1异常:", e)
return "";
});
使用时注意点
线程死锁问题
在父级任务和子级任务用同一个线程池即: 线程池循环引用会导致死锁.
ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
return CompletableFuture.supplyAsync(() -> {
System.out.println("child");
return "child";
}, threadPool1).join();//子任务
}, threadPool1);
当父任务同时来了 10 个, 这时候没有空闲线程而子任务因没有可执行线程被堵塞, 父任务就会一直等待.