JDK 21正式发布了虚拟线程(Virtual Threads),这是Java并发编程的重大革新。虚拟线程让我们可以用简单的同步代码写出高吞吐量的并发程序,不再需要复杂的异步回调。本文详细介绍虚拟线程的原理和使用方法。
传统线程的问题
平台线程的代价
Java传统的线程(现在称为平台线程,Platform Thread)是对操作系统线程的薄包装:
1 2 3 4 5 6 7 8 9 10 11 12 13
| Java Thread (平台线程) │ │ 1:1 映射 ▼ OS Thread (操作系统线程) │ │ 占用资源 ▼ ┌─────────────────┐ │ 栈空间: ~1MB │ │ 内核资源 │ │ 上下文切换开销 │ └─────────────────┘
|
问题:
- 每个线程占用约1MB栈空间
- 创建/销毁开销大
- 上下文切换代价高
- 一台机器通常只能支持几千个线程
线程数量限制示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public static void main(String[] args) { int count = 0; try { while (true) { new Thread(() -> { try { Thread.sleep(Long.MAX_VALUE); } catch (InterruptedException e) {} }).start(); count++; } } catch (OutOfMemoryError e) { System.out.println("最多创建线程数: " + count); } }
|
传统解决方案的困境
方案1:线程池
1 2
| ExecutorService executor = Executors.newFixedThreadPool(200);
|
方案2:异步编程
1 2 3 4 5
| CompletableFuture.supplyAsync(() -> fetchUser(id)) .thenCompose(user -> fetchOrders(user)) .thenCompose(orders -> calculateTotal(orders)) .thenAccept(total -> sendEmail(total));
|
方案3:响应式编程(WebFlux)
1 2 3 4 5
| Mono.fromCallable(() -> fetchUser(id)) .flatMap(user -> fetchOrders(user)) .flatMap(orders -> calculateTotal(orders)) .subscribe(total -> sendEmail(total));
|
虚拟线程是什么
虚拟线程是JDK实现的轻量级线程,不与操作系统线程一一绑定。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| ┌─────────────────────────────────────────────────────┐ │ JVM │ │ │ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │VT 1 │ │VT 2 │ │VT 3 │ │VT 4 │ │VT 5 │ │VT 6 │ │ ← 百万级虚拟线程 │ └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘ │ │ │ │ │ │ │ │ │ │ └───────┼───────┼───────┼───────┼───────┘ │ │ │ │ │ │ │ ┌──▼──┐ ┌──▼──┐ ┌──▼──┐ │ │ │PT 1 │ │PT 2 │ │PT 3 │ ← 少量平台线程 │ │ └──┬──┘ └──┬──┘ └──┬──┘ (载体线程) │ │ │ │ │ │ └─────────────┼───────┼───────┼───────────────────────┘ │ │ │ ┌──▼──┐ ┌──▼──┐ ┌──▼──┐ │OS T1│ │OS T2│ │OS T3│ ← 操作系统线程 └─────┘ └─────┘ └─────┘
|
核心特性
| 特性 |
平台线程 |
虚拟线程 |
| 创建数量 |
数千个 |
数百万个 |
| 内存占用 |
~1MB/线程 |
~几KB/线程 |
| 创建开销 |
高(系统调用) |
低(JVM内部) |
| 调度 |
操作系统 |
JVM |
| 阻塞代价 |
高(浪费OS线程) |
低(自动让出) |
创建虚拟线程
方式1:Thread.startVirtualThread()
1 2 3 4
| Thread vt = Thread.startVirtualThread(() -> { System.out.println("运行在虚拟线程: " + Thread.currentThread()); }); vt.join();
|
方式2:Thread.ofVirtual()
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Thread vt = Thread.ofVirtual() .name("my-virtual-thread") .start(() -> { System.out.println("Hello from virtual thread"); });
Thread vt2 = Thread.ofVirtual() .name("vt-", 0) .unstarted(() -> { System.out.println("Not started yet"); }); vt2.start();
|
方式3:Executors.newVirtualThreadPerTaskExecutor()
1 2 3 4 5 6 7 8 9 10
| try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { for (int i = 0; i < 100_000; i++) { executor.submit(() -> { Thread.sleep(Duration.ofSeconds(1)); return fetchData(); }); } }
|
方式4:使用ThreadFactory
1 2 3 4 5 6 7 8
| ThreadFactory factory = Thread.ofVirtual() .name("worker-", 0) .factory();
Thread vt = factory.newThread(() -> { System.out.println(Thread.currentThread().getName()); }); vt.start();
|
虚拟线程 vs 平台线程对比
创建100万个线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class ThreadComparison { public static void main(String[] args) throws Exception { long start = System.currentTimeMillis();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { for (int i = 0; i < 1_000_000; i++) { executor.submit(() -> { Thread.sleep(Duration.ofSeconds(1)); return "done"; }); } }
System.out.println("虚拟线程耗时: " + (System.currentTimeMillis() - start) + "ms"); } }
|
用平台线程尝试同样的操作:
1 2 3 4 5 6 7 8 9 10
| try (var executor = Executors.newCachedThreadPool()) { for (int i = 0; i < 1_000_000; i++) { executor.submit(() -> { Thread.sleep(Duration.ofSeconds(1)); return "done"; }); } }
|
判断是否为虚拟线程
1 2 3 4 5 6 7
| Thread current = Thread.currentThread();
if (current.isVirtual()) { System.out.println("运行在虚拟线程"); } else { System.out.println("运行在平台线程"); }
|
工作原理
载体线程(Carrier Thread)
虚拟线程运行在载体线程(平台线程)上。JVM维护一个ForkJoinPool作为载体线程池:
挂载与卸载
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| 虚拟线程执行流程:
VT开始执行 │ ▼ ┌─────────────┐ │ 挂载到载体线程 │ VT "骑上" 平台线程 └──────┬──────┘ │ ▼ 执行用户代码 │ ▼ 遇到阻塞操作? │ │ 否 是 │ │ │ ▼ │ ┌─────────────┐ │ │ 从载体线程卸载 │ VT "跳下" 平台线程 │ └──────┬──────┘ │ │ │ ▼ │ 载体线程执行 │ 其他虚拟线程 │ │ │ ▼ │ 阻塞操作完成 │ │ │ ▼ │ ┌─────────────┐ │ │ 重新挂载到 │ 可能是不同的载体线程 │ │ 载体线程 │ │ └──────┬──────┘ │ │ └────┬────┘ │ ▼ 继续执行...
|
自动让出示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Thread.startVirtualThread(() -> { System.out.println("开始, 载体: " + carrierThread());
Thread.sleep(Duration.ofMillis(100));
System.out.println("恢复, 载体: " + carrierThread()); });
String carrierThread() { return Thread.currentThread().toString(); }
|
适用场景
场景1:高并发IO密集型应用
1 2 3 4 5 6 7 8 9
| try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { ServerSocket server = new ServerSocket(8080); while (true) { Socket socket = server.accept(); executor.submit(() -> handleRequest(socket)); } }
|
场景2:批量API调用
1 2 3 4 5 6 7 8 9 10 11 12 13
| List<String> urls = loadUrls();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { List<Future<String>> futures = urls.stream() .map(url -> executor.submit(() -> fetchUrl(url))) .toList();
for (Future<String> future : futures) { String result = future.get(); process(result); } }
|
场景3:数据库批量操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| List<Long> userIds = getAllUserIds();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { List<Future<User>> futures = userIds.stream() .map(id -> executor.submit(() -> { return userRepository.findById(id); })) .toList();
List<User> users = futures.stream() .map(f -> f.get()) .toList(); }
|
Spring Boot集成
Spring Boot 3.2+
1 2 3 4 5
| spring: threads: virtual: enabled: true
|
启用后,所有Web请求自动使用虚拟线程处理。
手动配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Configuration public class VirtualThreadConfig {
@Bean public TomcatProtocolHandlerCustomizer<?> protocolHandlerCustomizer() { return protocolHandler -> { protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor()); }; }
@Bean public AsyncTaskExecutor applicationTaskExecutor() { return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor()); } }
|
注意事项和限制
1. 避免线程池包装虚拟线程
1 2 3 4 5 6 7 8 9 10
| ExecutorService pool = Executors.newFixedThreadPool(100); Thread.startVirtualThread(() -> { pool.submit(() -> doWork()); });
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { executor.submit(() -> doWork()); }
|
2. 避免长时间占用载体线程
某些操作会”钉住”(pin)载体线程,导致虚拟线程无法卸载:
1 2 3 4 5 6 7 8 9 10 11 12 13
| synchronized (lock) { Thread.sleep(1000); }
ReentrantLock lock = new ReentrantLock(); lock.lock(); try { Thread.sleep(1000); } finally { lock.unlock(); }
|
3. ThreadLocal使用注意
1 2 3 4 5 6 7
| private static final ThreadLocal<byte[]> buffer = ThreadLocal.withInitial(() -> new byte[1024 * 1024]);
|
4. 监控和调试
5. 不适合CPU密集型任务
1 2 3 4 5 6 7 8 9 10 11 12
| Thread.startVirtualThread(() -> { for (int i = 0; i < 1_000_000_000; i++) { compute(i); } });
ExecutorService cpuPool = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() );
|
与其他技术的对比
| 特性 |
虚拟线程 |
CompletableFuture |
WebFlux |
| 编程模型 |
同步/阻塞 |
回调/链式 |
响应式流 |
| 代码复杂度 |
简单 |
中等 |
复杂 |
| 调试难度 |
简单 |
困难 |
困难 |
| 学习曲线 |
平缓 |
中等 |
陡峭 |
| 栈追踪 |
完整 |
不完整 |
不完整 |
| 适用场景 |
IO密集 |
异步组合 |
高吞吐流处理 |
最佳实践总结
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { }
ExecutorService cpuPool = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() );
ReentrantLock lock = new ReentrantLock();
|
结构化并发(预览)
JDK 21还引入了结构化并发(预览特性),与虚拟线程配合使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { Future<User> userFuture = scope.fork(() -> fetchUser(id)); Future<Order> orderFuture = scope.fork(() -> fetchOrder(id));
scope.join(); scope.throwIfFailed();
User user = userFuture.resultNow(); Order order = orderFuture.resultNow(); return new Response(user, order); }
|
总结
虚拟线程的核心价值:
- 简化并发编程:用熟悉的同步代码写高并发程序
- 高吞吐量:轻松支持百万级并发
- 低资源消耗:几KB内存/线程,创建销毁几乎无开销
- 完整栈追踪:调试体验与普通线程一致
- 无缝集成:现有代码几乎不需要修改
适用场景:
- IO密集型应用(Web服务、API网关、微服务)
- 大量并发连接(聊天服务器、推送服务)
- 批量数据处理(爬虫、数据同步)
不适用场景:
- CPU密集型计算
- 需要精确控制线程数的场景
- 大量使用synchronized的遗留代码(需要重构)