抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

CompletableFuture是Java 8引入的异步编程工具,它解决了传统Future的诸多限制,支持链式调用、组合多个异步操作、异常处理等功能。本文详细介绍CompletableFuture的使用方法。

Future的局限性

Java 5引入的Future接口功能有限:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ExecutorService executor = Executors.newFixedThreadPool(10);

Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "结果";
});

// 问题1:get()是阻塞的
String result = future.get(); // 阻塞等待

// 问题2:无法手动完成
// 问题3:无法链式处理结果
// 问题4:无法组合多个Future
// 问题5:异常处理不便

CompletableFuture解决了这些问题。

创建CompletableFuture

方式1:supplyAsync(有返回值)

1
2
3
4
5
6
7
8
9
10
11
12
// 使用默认的ForkJoinPool
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
sleep(1000);
return "Hello";
});

// 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "Hello";
}, executor);

方式2:runAsync(无返回值)

1
2
3
4
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("执行异步任务");
sleep(1000);
});

方式3:completedFuture(直接创建已完成的)

1
2
3
// 创建一个已完成的Future
CompletableFuture<String> future = CompletableFuture.completedFuture("已完成");
System.out.println(future.get()); // 立即返回"已完成"

方式4:手动创建和完成

1
2
3
4
5
6
7
8
9
10
11
CompletableFuture<String> future = new CompletableFuture<>();

// 在其他地方完成它
new Thread(() -> {
sleep(1000);
future.complete("手动完成的结果"); // 正常完成
// 或者
// future.completeExceptionally(new RuntimeException("出错了")); // 异常完成
}).start();

String result = future.get();

获取结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");

// 方式1:阻塞获取
String result1 = future.get(); // 可能抛出检查异常

// 方式2:阻塞获取(带超时)
String result2 = future.get(1, TimeUnit.SECONDS);

// 方式3:阻塞获取(不抛检查异常)
String result3 = future.join(); // 抛出非检查异常

// 方式4:立即获取(不阻塞)
String result4 = future.getNow("默认值"); // 未完成则返回默认值

// 方式5:检查状态
boolean done = future.isDone();
boolean cancelled = future.isCancelled();
boolean completedExceptionally = future.isCompletedExceptionally();

结果转换

thenApply:转换结果(有返回值)

1
2
3
4
5
6
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "hello")
.thenApply(s -> s.toUpperCase()) // "HELLO"
.thenApply(s -> s + " WORLD"); // "HELLO WORLD"

System.out.println(future.join()); // HELLO WORLD
1
2
supplyAsync ──▶ thenApply ──▶ thenApply ──▶ 结果
"hello" toUpperCase + " WORLD" "HELLO WORLD"

thenAccept:消费结果(无返回值)

1
2
3
4
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(String::toUpperCase)
.thenAccept(result -> System.out.println("结果: " + result));
// 输出:结果: HELLO

thenRun:执行操作(不关心结果)

1
2
3
CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> System.out.println("任务完成"));
// 不接收上一步的结果

三者对比

方法 接收参数 返回值 用途
thenApply 上一步结果 转换数据
thenAccept 上一步结果 消费数据
thenRun 执行后续动作

异步版本(Async后缀)

每个方法都有对应的Async版本,在不同线程执行:

1
2
3
4
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s.toUpperCase()) // 可能在同一线程
.thenApplyAsync(s -> s + " WORLD") // 在新线程执行
.thenApplyAsync(s -> s + "!", executor); // 在指定线程池执行
1
2
3
4
5
方法              线程
──────────────────────────────
thenApply() 可能复用当前线程
thenApplyAsync() 使用ForkJoinPool
thenApplyAsync(executor) 使用指定线程池

组合两个CompletableFuture

thenCompose:串行组合(flatMap)

当第二个任务依赖第一个任务的结果时使用:

1
2
3
4
5
6
// 场景:先查用户,再根据用户ID查订单
CompletableFuture<Order> future = CompletableFuture
.supplyAsync(() -> getUser(userId))
.thenCompose(user -> CompletableFuture.supplyAsync(
() -> getOrderByUser(user)
));
1
2
getUser ──▶ user ──▶ getOrderByUser ──▶ order
异步1 结果传递 异步2

thenCombine:并行组合(合并结果)

当两个任务相互独立,需要合并结果时使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "Hello";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "World";
});

// 两个任务并行执行,都完成后合并结果
CompletableFuture<String> combined = future1.thenCombine(
future2,
(result1, result2) -> result1 + " " + result2
);

System.out.println(combined.join()); // Hello World(只需1秒多)
1
2
3
4
future1 ──────┐
├──▶ thenCombine ──▶ "Hello World"
future2 ──────┘
(并行执行)

thenAcceptBoth:并行组合(消费结果)

1
2
3
4
future1.thenAcceptBoth(future2, (result1, result2) -> {
System.out.println(result1 + " " + result2);
});
// 无返回值

runAfterBoth:两个都完成后执行

1
2
3
4
future1.runAfterBoth(future2, () -> {
System.out.println("两个任务都完成了");
});
// 不关心结果

applyToEither:任一完成即处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "结果1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(500);
return "结果2";
});

// 哪个先完成用哪个
CompletableFuture<String> fastest = future1.applyToEither(
future2,
result -> "最快的: " + result
);

System.out.println(fastest.join()); // 最快的: 结果2

acceptEither / runAfterEither

1
2
3
4
5
// acceptEither:任一完成后消费
future1.acceptEither(future2, result -> System.out.println(result));

// runAfterEither:任一完成后执行
future1.runAfterEither(future2, () -> System.out.println("有一个完成了"));

组合多个CompletableFuture

allOf:等待所有完成

1
2
3
4
5
6
7
8
9
10
11
12
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "结果1");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "结果2");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "结果3");

// 等待所有完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(f1, f2, f3);
allFutures.join();

// 获取所有结果
List<String> results = Stream.of(f1, f2, f3)
.map(CompletableFuture::join)
.collect(Collectors.toList());

实用封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 封装一个更好用的allOf
public static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}

// 使用
List<CompletableFuture<String>> futures = urls.stream()
.map(url -> CompletableFuture.supplyAsync(() -> fetch(url)))
.collect(Collectors.toList());

CompletableFuture<List<String>> allResults = allOf(futures);
List<String> results = allResults.join();

anyOf:任一完成即返回

1
2
3
4
5
6
7
8
9
10
11
12
13
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "慢结果";
});

CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
sleep(500);
return "快结果";
});

// 返回最先完成的结果
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(f1, f2);
System.out.println(anyFuture.join()); // 快结果

异常处理

exceptionally:捕获异常并恢复

1
2
3
4
5
6
7
8
9
10
11
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("出错了!");
return "正常结果";
})
.exceptionally(ex -> {
System.out.println("捕获异常: " + ex.getMessage());
return "默认值"; // 返回降级结果
});

System.out.println(future.join()); // 默认值
1
2
supplyAsync ──▶ 异常 ──▶ exceptionally ──▶ "默认值"
捕获并恢复

handle:处理结果或异常

1
2
3
4
5
6
7
8
9
10
11
12
13
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机错误");
}
return "成功";
})
.handle((result, ex) -> {
if (ex != null) {
return "处理异常: " + ex.getMessage();
}
return "处理结果: " + result;
});

whenComplete:完成时回调(不改变结果)

1
2
3
4
5
6
7
8
9
10
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("异常: " + ex);
} else {
System.out.println("结果: " + result);
}
// 注意:不能修改结果,只能观察
});

三者对比

方法 接收参数 能否恢复 用途
exceptionally 只有异常 异常降级
handle 结果和异常 统一处理
whenComplete 结果和异常 不能 记录日志

异常传播

1
2
3
4
5
6
7
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("步骤1错误");
})
.thenApply(r -> r + " 步骤2") // 不会执行
.thenApply(r -> r + " 步骤3") // 不会执行
.exceptionally(ex -> "异常恢复") // 在这里捕获
.thenApply(r -> r + " 步骤4"); // 会执行,收到"异常恢复"

超时控制(Java 9+)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
sleep(5000); // 模拟慢操作
return "结果";
})
.orTimeout(2, TimeUnit.SECONDS); // 2秒超时,抛出TimeoutException

// 或者超时返回默认值
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> {
sleep(5000);
return "结果";
})
.completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS);

实际应用场景

场景1:并行调用多个服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public UserProfile getUserProfile(Long userId) {
// 并行调用三个服务
CompletableFuture<User> userFuture = CompletableFuture
.supplyAsync(() -> userService.getUser(userId));

CompletableFuture<List<Order>> ordersFuture = CompletableFuture
.supplyAsync(() -> orderService.getOrders(userId));

CompletableFuture<List<Address>> addressFuture = CompletableFuture
.supplyAsync(() -> addressService.getAddresses(userId));

// 等待所有完成并组装结果
return CompletableFuture.allOf(userFuture, ordersFuture, addressFuture)
.thenApply(v -> new UserProfile(
userFuture.join(),
ordersFuture.join(),
addressFuture.join()
))
.join();
}
1
2
3
4
5
6
          ┌─ userService.getUser() ────────┐
│ │
userId ───┼─ orderService.getOrders() ─────┼──▶ UserProfile
│ │
└─ addressService.getAddresses() ┘
(并行执行)

场景2:批量异步处理

1
2
3
4
5
6
7
8
9
10
11
12
public List<Product> getProductDetails(List<Long> productIds) {
List<CompletableFuture<Product>> futures = productIds.stream()
.map(id -> CompletableFuture.supplyAsync(
() -> productService.getProduct(id),
executor
))
.collect(Collectors.toList());

return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}

场景3:带重试的异步操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public <T> CompletableFuture<T> retryAsync(
Supplier<T> task,
int maxRetries,
long delayMs) {

return CompletableFuture.supplyAsync(task)
.handle((result, ex) -> {
if (ex == null) {
return CompletableFuture.completedFuture(result);
}
if (maxRetries <= 0) {
return CompletableFuture.<T>failedFuture(ex);
}
sleep(delayMs);
return retryAsync(task, maxRetries - 1, delayMs);
})
.thenCompose(Function.identity());
}

// 使用
CompletableFuture<String> result = retryAsync(
() -> callUnstableService(),
3, // 最多重试3次
1000 // 每次间隔1秒
);

场景4:带超时的服务调用

1
2
3
4
5
6
7
8
9
10
public String callServiceWithFallback(String param) {
return CompletableFuture
.supplyAsync(() -> slowService.call(param))
.completeOnTimeout("默认响应", 2, TimeUnit.SECONDS)
.exceptionally(ex -> {
log.error("服务调用失败", ex);
return "降级响应";
})
.join();
}

线程池最佳实践

默认线程池的问题

1
2
3
4
5
6
7
// 默认使用ForkJoinPool.commonPool()
CompletableFuture.supplyAsync(() -> doSomething());

// 问题:
// 1. 所有CompletableFuture共享,可能相互影响
// 2. 线程数 = CPU核心数 - 1,IO密集型任务可能不够
// 3. 难以监控和调优

推荐:使用自定义线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建专用线程池
private static final ExecutorService IO_EXECUTOR = new ThreadPoolExecutor(
10, // 核心线程数
50, // 最大线程数
60, TimeUnit.SECONDS, // 空闲线程存活时间
new LinkedBlockingQueue<>(1000), // 任务队列
new ThreadFactoryBuilder()
.setNameFormat("async-io-%d")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);

// 使用自定义线程池
CompletableFuture.supplyAsync(() -> doIO(), IO_EXECUTOR);

不同任务类型使用不同线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
// IO密集型任务
private static final ExecutorService IO_POOL = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 10
);

// CPU密集型任务
private static final ExecutorService CPU_POOL = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);

// 使用
CompletableFuture.supplyAsync(() -> readFromDB(), IO_POOL);
CompletableFuture.supplyAsync(() -> heavyComputation(), CPU_POOL);

常见陷阱

陷阱1:忘记处理异常

1
2
3
4
5
6
7
8
9
10
11
12
13
// 错误:异常被吞掉
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("错误");
}); // 没有join也没有异常处理,异常丢失

// 正确:始终处理异常
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("错误");
})
.exceptionally(ex -> {
log.error("任务失败", ex);
return null;
});

陷阱2:在回调中阻塞

1
2
3
4
5
6
7
8
9
10
11
12
// 错误:在回调中进行阻塞操作
CompletableFuture.supplyAsync(() -> getData())
.thenApply(data -> {
// 这会阻塞ForkJoinPool的线程!
return anotherSlowService.process(data);
});

// 正确:使用thenCompose或thenApplyAsync
CompletableFuture.supplyAsync(() -> getData())
.thenCompose(data -> CompletableFuture.supplyAsync(
() -> anotherSlowService.process(data)
));

陷阱3:get()不设超时

1
2
3
4
5
6
7
8
// 错误:可能永远阻塞
String result = future.get();

// 正确:设置超时
String result = future.get(5, TimeUnit.SECONDS);

// 或使用orTimeout (Java 9+)
future.orTimeout(5, TimeUnit.SECONDS);

API速查表

方法 作用 返回值
supplyAsync 异步执行有返回值任务 CompletableFuture
runAsync 异步执行无返回值任务 CompletableFuture
thenApply 转换结果 CompletableFuture
thenAccept 消费结果 CompletableFuture
thenRun 执行后续动作 CompletableFuture
thenCompose 串行组合 CompletableFuture
thenCombine 并行组合 CompletableFuture
allOf 等待所有完成 CompletableFuture
anyOf 任一完成 CompletableFuture
exceptionally 异常恢复 CompletableFuture
handle 处理结果或异常 CompletableFuture
whenComplete 完成时回调 CompletableFuture
orTimeout 超时异常(Java9) CompletableFuture
completeOnTimeout 超时默认值(Java9) CompletableFuture

总结

CompletableFuture的核心价值:

  1. 链式调用:流畅的API,避免回调地狱
  2. 组合能力:轻松组合多个异步操作
  3. 异常处理:完善的异常捕获和恢复机制
  4. 灵活控制:可以手动完成、取消、超时

使用建议:

  • 简单场景用虚拟线程(JDK 21+),代码更简洁
  • 需要复杂组合逻辑时用CompletableFuture
  • 始终使用自定义线程池
  • 不要忘记处理异常
  • 阻塞操作要设置超时