CompletableFuture
是 Java 8 引入的增强版异步编程工具,它在 Future
的基础上做了重大改进,用于更优雅、更灵活地处理异步任务、回调链式操作、组合多个异步任务等。
为什么会冒出个 CompletableFuture
先来回答一个关键问题:为什么会有 CompletableFuture
?既然 Java 专门搞了这么个新工具,肯定是因为老的方式不够好用,甚至在实际开发中会让人抓狂。
在 Java 8 之前,我们处理异步任务主要靠 Future
接口,比如在线程池中提交一个任务,代码大概是这样的:
1
2
| Future<String> future = executor.submit(() -> "hello");
String result = future.get(); // 阻塞等待结果
|
看着挺朴素的代码,其实问题不少:
- get() 是阻塞的:任务没执行完,调用方就只能干等,完全不是我们想要的“异步”
- 没有回调机制:任务完成之后,不能自动触发后续操作,只能主动去问它“你好了没”
- 不能链式组合多个异步任务:比如 A 完成后继续执行 B,Future 根本做不到
- 异常处理很笨重:要靠开发者自己 try-catch,还没法像流水线一样优雅地传递异常
- 没法聚合多个 Future 的结果:比如等 A 和 B 都完成后再执行 C?不好意思,不支持
于是,Java 8 推出了 CompletableFuture
,一次对异步能力的全面升级。从命名上就能看出它的野心:这是一个可以“被主动完成”的未来,不再是等待,而是掌控。
它不仅支持非阻塞调用,还有完善的回调机制、链式任务组合、异常处理流转,以及多个任务的聚合能力。
简而言之,CompletableFuture
把异步这件事,从“能用”提升到了“好用”,不止是更强大,而是更适合写出清晰、优雅、可维护的异步代码。
CompletableFuture 的方法
CompletableFuture
对象的方法非常非常多,如何了解和记忆是个老大难了,这里推荐使用分组、对比的方式进行记忆。
创建方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| // 创建一个已完成的 Future
public static <U> CompletableFuture<U> completedFuture(U value) {
}
// 无返回值、默认线程池(ForkJoinPool.commonPool())
public static CompletableFuture<Void> runAsync(Runnable runnable) {
}
// 无返回值、自定义线程池
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
}
// 有返回值、默认线程池(ForkJoinPool.commonPool())
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
}
// 有返回值、自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
}
|
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| CompletableFuture<String> completedFuture =
CompletableFuture
.completedFuture("CompletableFuture (with the given value)");
CompletableFuture<Void> runAsyncFuture =
CompletableFuture.runAsync(() -> {
System.out.println("Running async task (with default executor)");
});
CompletableFuture<Void> runAsyncFuture =
CompletableFuture.runAsync(() -> {
System.out.println("Running async task with custom executor");
}, executor);
CompletableFuture<String> supplyAsyncFuture =
CompletableFuture.supplyAsync(() -> {
return "Result of the computation (with default executor)";
});
CompletableFuture<String> supplyAsyncFuture =
CompletableFuture.supplyAsync(() -> {
return "Result of the computation with custom executor";
}, executor);
|
结果处理对比
转换类方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| // 同步转换结果
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
}
// 异步转换结果(默认线程池)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
}
// 异步转换结果(指定线程池)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
}
|
示例:
1
2
3
4
5
6
| CompletableFuture<Integer> lengthFuture =
supplyAsyncFuture.thenApply(s -> s.length());
CompletableFuture<Integer> lengthFuture =
supplyAsyncFuture.thenApplyAsync(s -> s.length());
CompletableFuture<Integer> lengthFuture =
supplyAsyncFuture.thenApplyAsync(s -> s.length(), executor);
|
消费类方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| // 同步消费结果
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
}
// 异步消费结果(默认线程池)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
}
// 异步消费结果(指定线程池)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
}
// 同步执行无参操作
public CompletableFuture<Void> thenRun(Runnable action) {
}
// 异步执行无参操作(默认线程池)
public CompletableFuture<Void> thenRunAsync(Runnable action) {
}
// 异步执行无参操作(指定线程池)
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
}
|
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| supplyAsyncFuture.thenAccept(result ->
System.out.println("Received: " + result)
);
supplyAsyncFuture.thenAcceptAsync(result ->
System.out.println("Received: " + result)
);
supplyAsyncFuture.thenAcceptAsync(result ->
System.out.println("Received: " + result), executor
);
supplyAsyncFuture.thenRun(() ->
System.out.println("Task completed")
);
supplyAsyncFuture.thenRunAsync(() ->
System.out.println("Task completed")
);
supplyAsyncFuture.thenRunAsync(() ->
System.out.println("Task completed"), executor
);
|
组合方法对比
双 Future 组合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| // 合并两个结果,并返回新的 CompletableFuture
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn) {
}
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
}
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor) {
}
// 消费两个 Future 的结果
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
}
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) {
}
// 两个都完成后执行操作
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action) {
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action) {
}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
}
|
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| CompletableFuture<String> supplyAsyncFuture1 =
CompletableFuture.supplyAsync(() -> {
return "R1";
});
CompletableFuture<String> supplyAsyncFuture2 =
CompletableFuture.supplyAsync(() -> {
return "R2";
});
CompletableFuture<String> result = supplyAsyncFuture1.thenCombine(supplyAsyncFuture2,
(r1, r2) -> String.format("%s | %s", r1, r2)
);
result.thenAccept(System.out::println);
supplyAsyncFuture1.thenAcceptBoth(supplyAsyncFuture2,
(r1, r2) -> System.out.println("%s | %s", r1, r2)
);
CompletableFuture<String> runAsyncFuture1 =
CompletableFuture.runAsync(() -> {
System.out.println("run task 1")
});
CompletableFuture<String> runAsyncFuture2 =
CompletableFuture.runAsync(() -> {
System.out.println("run task 2")
});
runAsyncFuture1.runAfterBoth(runAsyncFuture2,
() -> System.out.println("run task. ")
);
|
多 Future 组合
1
2
3
4
5
6
7
| // 所有 Future 完成后完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
}
// 任意一个 Future 完成后完成
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
}
|
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| // 创建三个异步任务
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "R1");
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "R2");
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> "R3");
// 等待所有任务完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(task1, task2, task3);
// 所有完成后获取结果
allFutures.thenRun(() -> {
try {
System.out.println("T1 Result: " + task1.get());
System.out.println("T2 Result: " + task2.get());
System.out.println("T3 Result: " + task3.get());
} catch (Exception e) {
e.printStackTrace();
}
});
// 模拟不同响应时间的服务
CompletableFuture<String> fastService = CompletableFuture.supplyAsync(() -> {
sleep(100); // 模拟100ms延迟
return "Fast service response";
});
CompletableFuture<String> mediumService = CompletableFuture.supplyAsync(() -> {
sleep(300);
return "Medium service response";
});
CompletableFuture<String> slowService = CompletableFuture.supplyAsync(() -> {
sleep(500);
return "Slow service response";
});
// 获取最先响应的服务
CompletableFuture<Object> firstResponse = CompletableFuture.anyOf(
fastService, mediumService, slowService
);
firstResponse.thenAccept(result ->
System.out.println("The first results: " + result)
);
// 可能输出: "The first results: Fast service response"
|
异常处理对比
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| // 捕获异常并返回默认值
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
}
// 无论成功失败都会执行
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
}
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
}
// 无论成功失败都会执行,不影响结果
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
}
|
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Error");
}
return "Success";
})
.exceptionally(ex -> {
System.out.println("Exception: " + ex.getMessage());
return "Recovered";
})
.thenAccept(System.out::println);
CompletableFuture.supplyAsync(() -> {
return "Result";
})
.handle((result, ex) -> {
if (ex != null) {
return "Default value";
}
return result;
});
CompletableFuture.supplyAsync(() -> {
if (new Random().nextBoolean()) {
throw new RuntimeException("Error");
}
return "Success";
})
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Error", ex);
metrics.increment("failures");
} else {
log.info("Result: {}", result);
metrics.increment("success");
}
})
.thenAccept(System.out::println);
|
总结
方法名中的 Async
后缀,代表异步执行
参数列表中的 Executor
表示可以自定义线程池,如果不自定义,则使用默认线程池(ForkJoinPool.commonPool()
)
创建阶段的两个方法:[run|supply](Async)
结果处理阶段三个关键字 then[Apply|Accept|Run](Async)
apply
: Function<T,R>
接收一个参数,处理后返回结果accept
: Consumer<T>
接收一个参数,不返回run
: Runnable
不接受参数,不返回
组合多个分为两类,两个组合和多个组合
异常处理分为三部分 exceptionally
handle(Async)
和 whenComplete(Async)
exceptionally
: Function<Throwable, T>
接收异常为参数,处理后返回一个值handle
: BiFunction<T, Throwable, U>
可以得到异常和结果后,重新返回一个结果whenComplete
: BiConsumer<T, Throwable>
可以得到异常和返回值,不对结果进行处理
注意: handle
和 whenComplete
比较特殊,无论是否发生异常,都会处理(类似 finally
)