【Java并发】CompletableFuture异步编程详解
CompletableFuture是Java 8引入的异步编程工具,它解决了传统Future的诸多限制,支持链式调用、组合多个异步操作、异常处理等功能。本文详细介绍CompletableFuture的使用方法。
Future的局限性
Java 5引入的Future接口功能有限:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| ExecutorService executor = Executors.newFixedThreadPool(10);
Future<String> future = executor.submit(() -> { Thread.sleep(1000); return "结果"; });
String result = future.get();
|
CompletableFuture解决了这些问题。
创建CompletableFuture
方式1:supplyAsync(有返回值)
1 2 3 4 5 6 7 8 9 10 11 12
| CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { sleep(1000); return "Hello"; });
ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { return "Hello"; }, executor);
|
方式2:runAsync(无返回值)
1 2 3 4
| CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("执行异步任务"); sleep(1000); });
|
方式3:completedFuture(直接创建已完成的)
1 2 3
| CompletableFuture<String> future = CompletableFuture.completedFuture("已完成"); System.out.println(future.get());
|
方式4:手动创建和完成
1 2 3 4 5 6 7 8 9 10 11
| CompletableFuture<String> future = new CompletableFuture<>();
new Thread(() -> { sleep(1000); future.complete("手动完成的结果"); }).start();
String result = future.get();
|
获取结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
String result1 = future.get();
String result2 = future.get(1, TimeUnit.SECONDS);
String result3 = future.join();
String result4 = future.getNow("默认值");
boolean done = future.isDone(); boolean cancelled = future.isCancelled(); boolean completedExceptionally = future.isCompletedExceptionally();
|
结果转换
thenApply:转换结果(有返回值)
1 2 3 4 5 6
| CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> "hello") .thenApply(s -> s.toUpperCase()) .thenApply(s -> s + " WORLD");
System.out.println(future.join());
|
1 2
| supplyAsync ──▶ thenApply ──▶ thenApply ──▶ 结果 "hello" toUpperCase + " WORLD" "HELLO WORLD"
|
thenAccept:消费结果(无返回值)
1 2 3 4
| CompletableFuture.supplyAsync(() -> "Hello") .thenApply(String::toUpperCase) .thenAccept(result -> System.out.println("结果: " + result));
|
thenRun:执行操作(不关心结果)
1 2 3
| CompletableFuture.supplyAsync(() -> "Hello") .thenRun(() -> System.out.println("任务完成"));
|
三者对比
| 方法 |
接收参数 |
返回值 |
用途 |
| thenApply |
上一步结果 |
有 |
转换数据 |
| thenAccept |
上一步结果 |
无 |
消费数据 |
| thenRun |
无 |
无 |
执行后续动作 |
异步版本(Async后缀)
每个方法都有对应的Async版本,在不同线程执行:
1 2 3 4
| CompletableFuture.supplyAsync(() -> "Hello") .thenApply(s -> s.toUpperCase()) .thenApplyAsync(s -> s + " WORLD") .thenApplyAsync(s -> s + "!", executor);
|
1 2 3 4 5
| 方法 线程 ────────────────────────────── thenApply() 可能复用当前线程 thenApplyAsync() 使用ForkJoinPool thenApplyAsync(executor) 使用指定线程池
|
组合两个CompletableFuture
thenCompose:串行组合(flatMap)
当第二个任务依赖第一个任务的结果时使用:
1 2 3 4 5 6
| CompletableFuture<Order> future = CompletableFuture .supplyAsync(() -> getUser(userId)) .thenCompose(user -> CompletableFuture.supplyAsync( () -> getOrderByUser(user) ));
|
1 2
| getUser ──▶ user ──▶ getOrderByUser ──▶ order 异步1 结果传递 异步2
|
thenCombine:并行组合(合并结果)
当两个任务相互独立,需要合并结果时使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { sleep(1000); return "Hello"; });
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { sleep(1000); return "World"; });
CompletableFuture<String> combined = future1.thenCombine( future2, (result1, result2) -> result1 + " " + result2 );
System.out.println(combined.join());
|
1 2 3 4
| future1 ──────┐ ├──▶ thenCombine ──▶ "Hello World" future2 ──────┘ (并行执行)
|
thenAcceptBoth:并行组合(消费结果)
1 2 3 4
| future1.thenAcceptBoth(future2, (result1, result2) -> { System.out.println(result1 + " " + result2); });
|
runAfterBoth:两个都完成后执行
1 2 3 4
| future1.runAfterBoth(future2, () -> { System.out.println("两个任务都完成了"); });
|
applyToEither:任一完成即处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { sleep(1000); return "结果1"; });
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { sleep(500); return "结果2"; });
CompletableFuture<String> fastest = future1.applyToEither( future2, result -> "最快的: " + result );
System.out.println(fastest.join());
|
acceptEither / runAfterEither
1 2 3 4 5
| future1.acceptEither(future2, result -> System.out.println(result));
future1.runAfterEither(future2, () -> System.out.println("有一个完成了"));
|
组合多个CompletableFuture
allOf:等待所有完成
1 2 3 4 5 6 7 8 9 10 11 12
| CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "结果1"); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "结果2"); CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "结果3");
CompletableFuture<Void> allFutures = CompletableFuture.allOf(f1, f2, f3); allFutures.join();
List<String> results = Stream.of(f1, f2, f3) .map(CompletableFuture::join) .collect(Collectors.toList());
|
实用封装:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) { return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .thenApply(v -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); }
List<CompletableFuture<String>> futures = urls.stream() .map(url -> CompletableFuture.supplyAsync(() -> fetch(url))) .collect(Collectors.toList());
CompletableFuture<List<String>> allResults = allOf(futures); List<String> results = allResults.join();
|
anyOf:任一完成即返回
1 2 3 4 5 6 7 8 9 10 11 12 13
| CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { sleep(1000); return "慢结果"; });
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { sleep(500); return "快结果"; });
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(f1, f2); System.out.println(anyFuture.join());
|
异常处理
exceptionally:捕获异常并恢复
1 2 3 4 5 6 7 8 9 10 11
| CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { if (true) throw new RuntimeException("出错了!"); return "正常结果"; }) .exceptionally(ex -> { System.out.println("捕获异常: " + ex.getMessage()); return "默认值"; });
System.out.println(future.join());
|
1 2
| supplyAsync ──▶ 异常 ──▶ exceptionally ──▶ "默认值" 捕获并恢复
|
handle:处理结果或异常
1 2 3 4 5 6 7 8 9 10 11 12 13
| CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { if (Math.random() > 0.5) { throw new RuntimeException("随机错误"); } return "成功"; }) .handle((result, ex) -> { if (ex != null) { return "处理异常: " + ex.getMessage(); } return "处理结果: " + result; });
|
whenComplete:完成时回调(不改变结果)
1 2 3 4 5 6 7 8 9 10
| CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> "Hello") .whenComplete((result, ex) -> { if (ex != null) { System.out.println("异常: " + ex); } else { System.out.println("结果: " + result); } });
|
三者对比
| 方法 |
接收参数 |
能否恢复 |
用途 |
| exceptionally |
只有异常 |
能 |
异常降级 |
| handle |
结果和异常 |
能 |
统一处理 |
| whenComplete |
结果和异常 |
不能 |
记录日志 |
异常传播
1 2 3 4 5 6 7
| CompletableFuture.supplyAsync(() -> { throw new RuntimeException("步骤1错误"); }) .thenApply(r -> r + " 步骤2") .thenApply(r -> r + " 步骤3") .exceptionally(ex -> "异常恢复") .thenApply(r -> r + " 步骤4");
|
超时控制(Java 9+)
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { sleep(5000); return "结果"; }) .orTimeout(2, TimeUnit.SECONDS);
CompletableFuture<String> future2 = CompletableFuture .supplyAsync(() -> { sleep(5000); return "结果"; }) .completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS);
|
实际应用场景
场景1:并行调用多个服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public UserProfile getUserProfile(Long userId) { CompletableFuture<User> userFuture = CompletableFuture .supplyAsync(() -> userService.getUser(userId));
CompletableFuture<List<Order>> ordersFuture = CompletableFuture .supplyAsync(() -> orderService.getOrders(userId));
CompletableFuture<List<Address>> addressFuture = CompletableFuture .supplyAsync(() -> addressService.getAddresses(userId));
return CompletableFuture.allOf(userFuture, ordersFuture, addressFuture) .thenApply(v -> new UserProfile( userFuture.join(), ordersFuture.join(), addressFuture.join() )) .join(); }
|
1 2 3 4 5 6
| ┌─ userService.getUser() ────────┐ │ │ userId ───┼─ orderService.getOrders() ─────┼──▶ UserProfile │ │ └─ addressService.getAddresses() ┘ (并行执行)
|
场景2:批量异步处理
1 2 3 4 5 6 7 8 9 10 11 12
| public List<Product> getProductDetails(List<Long> productIds) { List<CompletableFuture<Product>> futures = productIds.stream() .map(id -> CompletableFuture.supplyAsync( () -> productService.getProduct(id), executor )) .collect(Collectors.toList());
return futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); }
|
场景3:带重试的异步操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public <T> CompletableFuture<T> retryAsync( Supplier<T> task, int maxRetries, long delayMs) {
return CompletableFuture.supplyAsync(task) .handle((result, ex) -> { if (ex == null) { return CompletableFuture.completedFuture(result); } if (maxRetries <= 0) { return CompletableFuture.<T>failedFuture(ex); } sleep(delayMs); return retryAsync(task, maxRetries - 1, delayMs); }) .thenCompose(Function.identity()); }
CompletableFuture<String> result = retryAsync( () -> callUnstableService(), 3, 1000 );
|
场景4:带超时的服务调用
1 2 3 4 5 6 7 8 9 10
| public String callServiceWithFallback(String param) { return CompletableFuture .supplyAsync(() -> slowService.call(param)) .completeOnTimeout("默认响应", 2, TimeUnit.SECONDS) .exceptionally(ex -> { log.error("服务调用失败", ex); return "降级响应"; }) .join(); }
|
线程池最佳实践
默认线程池的问题
1 2 3 4 5 6 7
| CompletableFuture.supplyAsync(() -> doSomething());
|
推荐:使用自定义线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| private static final ExecutorService IO_EXECUTOR = new ThreadPoolExecutor( 10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadFactoryBuilder() .setNameFormat("async-io-%d") .build(), new ThreadPoolExecutor.CallerRunsPolicy() );
CompletableFuture.supplyAsync(() -> doIO(), IO_EXECUTOR);
|
不同任务类型使用不同线程池
1 2 3 4 5 6 7 8 9 10 11 12 13
| private static final ExecutorService IO_POOL = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() * 10 );
private static final ExecutorService CPU_POOL = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() );
CompletableFuture.supplyAsync(() -> readFromDB(), IO_POOL); CompletableFuture.supplyAsync(() -> heavyComputation(), CPU_POOL);
|
常见陷阱
陷阱1:忘记处理异常
1 2 3 4 5 6 7 8 9 10 11 12 13
| CompletableFuture.supplyAsync(() -> { throw new RuntimeException("错误"); });
CompletableFuture.supplyAsync(() -> { throw new RuntimeException("错误"); }) .exceptionally(ex -> { log.error("任务失败", ex); return null; });
|
陷阱2:在回调中阻塞
1 2 3 4 5 6 7 8 9 10 11 12
| CompletableFuture.supplyAsync(() -> getData()) .thenApply(data -> { return anotherSlowService.process(data); });
CompletableFuture.supplyAsync(() -> getData()) .thenCompose(data -> CompletableFuture.supplyAsync( () -> anotherSlowService.process(data) ));
|
陷阱3:get()不设超时
1 2 3 4 5 6 7 8
| String result = future.get();
String result = future.get(5, TimeUnit.SECONDS);
future.orTimeout(5, TimeUnit.SECONDS);
|
API速查表
| 方法 |
作用 |
返回值 |
| supplyAsync |
异步执行有返回值任务 |
CompletableFuture |
| runAsync |
异步执行无返回值任务 |
CompletableFuture |
| thenApply |
转换结果 |
CompletableFuture |
| thenAccept |
消费结果 |
CompletableFuture |
| thenRun |
执行后续动作 |
CompletableFuture |
| thenCompose |
串行组合 |
CompletableFuture |
| thenCombine |
并行组合 |
CompletableFuture |
| allOf |
等待所有完成 |
CompletableFuture |
| anyOf |
任一完成 |
CompletableFuture |
| exceptionally |
异常恢复 |
CompletableFuture |
| handle |
处理结果或异常 |
CompletableFuture |
| whenComplete |
完成时回调 |
CompletableFuture |
| orTimeout |
超时异常(Java9) |
CompletableFuture |
| completeOnTimeout |
超时默认值(Java9) |
CompletableFuture |
总结
CompletableFuture的核心价值:
- 链式调用:流畅的API,避免回调地狱
- 组合能力:轻松组合多个异步操作
- 异常处理:完善的异常捕获和恢复机制
- 灵活控制:可以手动完成、取消、超时
使用建议:
- 简单场景用虚拟线程(JDK 21+),代码更简洁
- 需要复杂组合逻辑时用CompletableFuture
- 始终使用自定义线程池
- 不要忘记处理异常
- 阻塞操作要设置超时