抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

Spring WebFlux是Spring 5引入的响应式Web框架,基于Reactor库实现,支持非阻塞IO和背压机制。本文详细介绍WebFlux的核心概念和使用方法。

什么是响应式编程

响应式编程(Reactive Programming)是一种面向数据流和变化传播的编程范式。

传统命令式 vs 响应式

1
2
3
4
5
6
7
8
9
// 命令式:主动拉取数据
List<User> users = userRepository.findAll(); // 阻塞等待
for (User user : users) {
process(user);
}

// 响应式:数据推送过来时处理
userRepository.findAll() // 立即返回,不阻塞
.subscribe(user -> process(user)); // 数据到达时回调

响应式流规范(Reactive Streams)

响应式流定义了4个核心接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}

public interface Subscriber<T> {
void onSubscribe(Subscription subscription);
void onNext(T item);
void onError(Throwable throwable);
void onComplete();
}

public interface Subscription {
void request(long n); // 背压:请求n个元素
void cancel();
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
1
2
3
4
5
6
7
8
Publisher ──────────────────────▶ Subscriber
onSubscribe(subscription)
◀── request(n) ───
─── onNext(item) ─▶
─── onNext(item) ─▶
◀── request(n) ───
─── onNext(item) ─▶
─── onComplete() ─▶

WebFlux vs Spring MVC

1
2
3
4
5
6
7
8
9
10
11
12
┌─────────────────────────────────────────────────────────┐
│ Spring Web │
├────────────────────────┬────────────────────────────────┤
│ Spring MVC │ Spring WebFlux │
├────────────────────────┼────────────────────────────────┤
│ Servlet API │ Reactive Streams │
│ 同步阻塞 │ 异步非阻塞 │
│ 一个请求一个线程 │ 少量线程处理大量请求 │
│ Tomcat/Jetty │ Netty/Undertow │
│ JDBC │ R2DBC │
│ RestTemplate │ WebClient │
└────────────────────────┴────────────────────────────────┘

线程模型对比

Spring MVC(阻塞模型):

1
2
3
4
5
6
7
请求1 ──▶ 线程1 ──▶ [处理中...等待DB...] ──▶ 响应
请求2 ──▶ 线程2 ──▶ [处理中...等待DB...] ──▶ 响应
请求3 ──▶ 线程3 ──▶ [处理中...等待DB...] ──▶ 响应
...
请求200 ──▶ 线程200 ──▶ [处理中...]

每个请求占用一个线程,线程在IO等待时空闲

WebFlux(非阻塞模型):

1
2
3
4
5
6
7
请求1 ──┐
请求2 ──┼──▶ 线程1 ──▶ [处理] ──▶ 等待IO(让出线程)
请求3 ──┤ ↑
... │ │
请求N ──┘ IO完成时继续处理

少量线程处理大量请求,IO等待时不占用线程

Mono和Flux

WebFlux基于Project Reactor,核心是两个发布者类型:

Mono:0或1个元素

1
2
3
4
5
6
7
8
9
10
11
// 包含一个元素
Mono<String> mono = Mono.just("Hello");

// 空的Mono
Mono<String> empty = Mono.empty();

// 延迟创建
Mono<User> user = Mono.fromSupplier(() -> userService.findById(1L));

// 从Callable创建
Mono<String> fromCallable = Mono.fromCallable(() -> heavyComputation());
1
2
Mono<T>: ────○────|
0或1个元素 完成

Flux:0到N个元素

1
2
3
4
5
6
7
8
9
10
11
// 多个元素
Flux<String> flux = Flux.just("A", "B", "C");

// 从集合创建
Flux<Integer> fromList = Flux.fromIterable(Arrays.asList(1, 2, 3));

// 范围
Flux<Integer> range = Flux.range(1, 10); // 1到10

// 间隔发射
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); // 每秒发射
1
2
Flux<T>: ────○────○────○────○────|
元素1 元素2 元素3 元素4 完成

创建Mono/Flux

静态工厂方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Mono创建方式
Mono.just(value) // 包装一个值
Mono.empty() // 空Mono
Mono.error(throwable) // 错误Mono
Mono.fromSupplier(supplier) // 延迟计算
Mono.fromCallable(callable) // 从Callable
Mono.fromFuture(future) // 从CompletableFuture
Mono.defer(() -> Mono.just(x))// 延迟创建Mono本身

// Flux创建方式
Flux.just(v1, v2, v3) // 多个值
Flux.fromIterable(list) // 从集合
Flux.fromArray(array) // 从数组
Flux.range(start, count) // 范围
Flux.interval(duration) // 定时发射
Flux.generate(generator) // 同步生成
Flux.create(sink -> {...}) // 异步生成

使用generate同步生成

1
2
3
4
5
6
7
8
9
10
11
Flux<Integer> flux = Flux.generate(
() -> 0, // 初始状态
(state, sink) -> {
sink.next(state);
if (state == 10) {
sink.complete();
}
return state + 1; // 新状态
}
);
// 输出:0, 1, 2, ... 10

使用create异步生成

1
2
3
4
5
6
Flux<String> flux = Flux.create(sink -> {
// 可以在任何线程调用sink.next()
someAsyncAPI.onData(data -> sink.next(data));
someAsyncAPI.onComplete(() -> sink.complete());
someAsyncAPI.onError(e -> sink.error(e));
});

常用操作符

转换操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// map:转换每个元素
Flux.just(1, 2, 3)
.map(n -> n * 2)
// 2, 4, 6

// flatMap:一对多转换(异步)
Flux.just(1, 2, 3)
.flatMap(n -> Flux.range(1, n))
// 1, 1, 2, 1, 2, 3 (顺序可能不同)

// flatMapSequential:保持顺序的flatMap
Flux.just(1, 2, 3)
.flatMapSequential(n -> Flux.range(1, n))
// 1, 1, 2, 1, 2, 3 (保持顺序)

// concatMap:串行的flatMap
Flux.just(1, 2, 3)
.concatMap(n -> Flux.range(1, n))
// 1, 1, 2, 1, 2, 3 (严格顺序)

过滤操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// filter:过滤
Flux.range(1, 10)
.filter(n -> n % 2 == 0)
// 2, 4, 6, 8, 10

// take:取前n个
Flux.range(1, 100)
.take(5)
// 1, 2, 3, 4, 5

// skip:跳过前n个
Flux.range(1, 10)
.skip(3)
// 4, 5, 6, 7, 8, 9, 10

// distinct:去重
Flux.just(1, 2, 2, 3, 3, 3)
.distinct()
// 1, 2, 3

// distinctUntilChanged:连续去重
Flux.just(1, 1, 2, 2, 1, 1)
.distinctUntilChanged()
// 1, 2, 1

组合操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// concat:串行连接
Flux.concat(
Flux.just(1, 2),
Flux.just(3, 4)
)
// 1, 2, 3, 4

// merge:并行合并
Flux.merge(
Flux.interval(Duration.ofMillis(100)).take(3),
Flux.interval(Duration.ofMillis(150)).take(3)
)
// 交错输出

// zip:配对合并
Flux.zip(
Flux.just("A", "B", "C"),
Flux.just(1, 2, 3),
(letter, number) -> letter + number
)
// A1, B2, C3

// combineLatest:组合最新值
Flux.combineLatest(
flux1, flux2,
(a, b) -> a + b
)

错误处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// onErrorReturn:错误时返回默认值
Flux.just(1, 2, 0)
.map(n -> 10 / n)
.onErrorReturn(-1)
// 10, 5, -1

// onErrorResume:错误时切换到备用流
Flux.just(1, 2, 0)
.map(n -> 10 / n)
.onErrorResume(e -> Flux.just(-1, -2))
// 10, 5, -1, -2

// onErrorContinue:错误时跳过继续
Flux.just(1, 2, 0, 4)
.map(n -> 10 / n)
.onErrorContinue((e, val) -> log.warn("跳过: {}", val))
// 10, 5, 2(跳过了0)

// retry:重试
Flux.just(1)
.flatMap(n -> callUnstableService())
.retry(3) // 失败时重试3次

// retryWhen:自定义重试策略
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))

副作用操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// doOnNext:每个元素执行操作
Flux.just(1, 2, 3)
.doOnNext(n -> log.info("处理: {}", n))
.subscribe();

// doOnError:错误时执行
.doOnError(e -> log.error("出错", e))

// doOnComplete:完成时执行
.doOnComplete(() -> log.info("完成"))

// doOnSubscribe:订阅时执行
.doOnSubscribe(sub -> log.info("开始订阅"))

// doFinally:最终执行(无论成功失败)
.doFinally(signalType -> log.info("结束: {}", signalType))

聚合操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// reduce:归约
Flux.range(1, 5)
.reduce((a, b) -> a + b)
// Mono<15>

// collect:收集到集合
Flux.just(1, 2, 3)
.collectList()
// Mono<List<Integer>>

// count:计数
Flux.range(1, 100)
.count()
// Mono<100L>

WebFlux使用方式

方式1:注解式(与Spring MVC类似)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@RestController
@RequestMapping("/users")
public class UserController {

@Autowired
private UserRepository userRepository;

// 返回单个对象
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userRepository.findById(id);
}

// 返回多个对象
@GetMapping
public Flux<User> getAllUsers() {
return userRepository.findAll();
}

// 创建
@PostMapping
public Mono<User> createUser(@RequestBody Mono<User> userMono) {
return userMono.flatMap(userRepository::save);
}

// 更新
@PutMapping("/{id}")
public Mono<User> updateUser(@PathVariable Long id,
@RequestBody User user) {
return userRepository.findById(id)
.flatMap(existing -> {
existing.setName(user.getName());
return userRepository.save(existing);
});
}

// 删除
@DeleteMapping("/{id}")
public Mono<Void> deleteUser(@PathVariable Long id) {
return userRepository.deleteById(id);
}
}

方式2:函数式端点(RouterFunction)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Configuration
public class RouterConfig {

@Bean
public RouterFunction<ServerResponse> routes(UserHandler handler) {
return RouterFunctions.route()
.GET("/users", handler::getAllUsers)
.GET("/users/{id}", handler::getUser)
.POST("/users", handler::createUser)
.PUT("/users/{id}", handler::updateUser)
.DELETE("/users/{id}", handler::deleteUser)
.build();
}
}

@Component
public class UserHandler {

@Autowired
private UserRepository userRepository;

public Mono<ServerResponse> getAllUsers(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(userRepository.findAll(), User.class);
}

public Mono<ServerResponse> getUser(ServerRequest request) {
Long id = Long.parseLong(request.pathVariable("id"));
return userRepository.findById(id)
.flatMap(user -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}

public Mono<ServerResponse> createUser(ServerRequest request) {
return request.bodyToMono(User.class)
.flatMap(userRepository::save)
.flatMap(saved -> ServerResponse
.created(URI.create("/users/" + saved.getId()))
.bodyValue(saved));
}

public Mono<ServerResponse> updateUser(ServerRequest request) {
Long id = Long.parseLong(request.pathVariable("id"));
return request.bodyToMono(User.class)
.flatMap(user -> userRepository.findById(id)
.flatMap(existing -> {
existing.setName(user.getName());
return userRepository.save(existing);
}))
.flatMap(updated -> ServerResponse.ok().bodyValue(updated))
.switchIfEmpty(ServerResponse.notFound().build());
}

public Mono<ServerResponse> deleteUser(ServerRequest request) {
Long id = Long.parseLong(request.pathVariable("id"));
return userRepository.deleteById(id)
.then(ServerResponse.noContent().build());
}
}

响应式数据访问

R2DBC(响应式数据库连接)

1
2
3
4
5
6
7
8
9
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId>
</dependency>
1
2
3
4
5
6
# application.yml
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/mydb
username: user
password: password
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 实体
@Table("users")
public class User {
@Id
private Long id;
private String name;
private String email;
// getters, setters
}

// Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {

Flux<User> findByName(String name);

@Query("SELECT * FROM users WHERE email = :email")
Mono<User> findByEmail(String email);
}

响应式MongoDB

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
1
2
3
public interface UserRepository extends ReactiveMongoRepository<User, String> {
Flux<User> findByAgeGreaterThan(int age);
}

响应式Redis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Configuration
public class RedisConfig {

@Bean
public ReactiveRedisTemplate<String, User> reactiveRedisTemplate(
ReactiveRedisConnectionFactory factory) {
// 配置序列化...
return new ReactiveRedisTemplate<>(factory, context);
}
}

// 使用
@Service
public class UserCacheService {

@Autowired
private ReactiveRedisTemplate<String, User> redisTemplate;

public Mono<User> getUser(String id) {
return redisTemplate.opsForValue().get("user:" + id);
}

public Mono<Boolean> saveUser(User user) {
return redisTemplate.opsForValue()
.set("user:" + user.getId(), user, Duration.ofHours(1));
}
}

WebClient(响应式HTTP客户端)

WebClient是RestTemplate的响应式替代品。

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
WebClient client = WebClient.create("https://api.example.com");

// GET请求
Mono<User> user = client.get()
.uri("/users/{id}", 1)
.retrieve()
.bodyToMono(User.class);

// GET请求(多个结果)
Flux<User> users = client.get()
.uri("/users")
.retrieve()
.bodyToFlux(User.class);

// POST请求
Mono<User> created = client.post()
.uri("/users")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(new User("John", "john@example.com"))
.retrieve()
.bodyToMono(User.class);

配置WebClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Configuration
public class WebClientConfig {

@Bean
public WebClient webClient() {
return WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer token")
.filter(logRequest())
.filter(logResponse())
.build();
}

private ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(request -> {
log.info("Request: {} {}", request.method(), request.url());
return Mono.just(request);
});
}

private ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(response -> {
log.info("Response: {}", response.statusCode());
return Mono.just(response);
});
}
}

错误处理

1
2
3
4
5
6
7
8
Mono<User> user = client.get()
.uri("/users/{id}", 1)
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError, response ->
Mono.error(new UserNotFoundException("用户不存在")))
.onStatus(HttpStatusCode::is5xxServerError, response ->
Mono.error(new ServiceException("服务器错误")))
.bodyToMono(User.class);

并行调用多个API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public Mono<UserProfile> getUserProfile(Long userId) {
Mono<User> userMono = client.get()
.uri("/users/{id}", userId)
.retrieve()
.bodyToMono(User.class);

Mono<List<Order>> ordersMono = client.get()
.uri("/users/{id}/orders", userId)
.retrieve()
.bodyToFlux(Order.class)
.collectList();

Mono<List<Address>> addressesMono = client.get()
.uri("/users/{id}/addresses", userId)
.retrieve()
.bodyToFlux(Address.class)
.collectList();

// 并行执行,合并结果
return Mono.zip(userMono, ordersMono, addressesMono)
.map(tuple -> new UserProfile(
tuple.getT1(),
tuple.getT2(),
tuple.getT3()
));
}

背压(Backpressure)

背压是响应式流的核心特性,允许消费者控制数据流速。

背压策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Flux.range(1, 1000)
// onBackpressureBuffer:缓冲
.onBackpressureBuffer(100) // 缓冲100个元素

// onBackpressureDrop:丢弃
.onBackpressureDrop(dropped -> log.warn("丢弃: {}", dropped))

// onBackpressureLatest:只保留最新
.onBackpressureLatest()

// onBackpressureError:抛异常
.onBackpressureError()

.subscribe();

控制请求速率

1
2
3
4
5
Flux.range(1, 100)
.limitRate(10) // 每次只请求10个
.subscribe(n -> {
// 处理...
});

Server-Sent Events(SSE)

WebFlux天然支持SSE:

1
2
3
4
5
6
7
8
9
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> stream() {
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> ServerSentEvent.<String>builder()
.id(String.valueOf(seq))
.event("message")
.data("数据 " + seq)
.build());
}

客户端:

1
2
3
4
const eventSource = new EventSource('/stream');
eventSource.onmessage = (event) => {
console.log(event.data);
};

WebSocket支持

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Configuration
public class WebSocketConfig {

@Bean
public HandlerMapping webSocketHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/ws/chat", new ChatWebSocketHandler());

SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(-1);
return mapping;
}

@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}

public class ChatWebSocketHandler implements WebSocketHandler {

@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(msg -> "Echo: " + msg)
.map(session::textMessage);

return session.send(output);
}
}

测试

使用WebTestClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class UserControllerTest {

@Autowired
private WebTestClient webTestClient;

@Test
void shouldGetUser() {
webTestClient.get()
.uri("/users/1")
.exchange()
.expectStatus().isOk()
.expectBody(User.class)
.value(user -> {
assertThat(user.getId()).isEqualTo(1L);
});
}

@Test
void shouldGetAllUsers() {
webTestClient.get()
.uri("/users")
.exchange()
.expectStatus().isOk()
.expectBodyList(User.class)
.hasSize(3);
}

@Test
void shouldCreateUser() {
User newUser = new User("John", "john@example.com");

webTestClient.post()
.uri("/users")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(newUser)
.exchange()
.expectStatus().isCreated()
.expectBody(User.class)
.value(user -> {
assertThat(user.getId()).isNotNull();
assertThat(user.getName()).isEqualTo("John");
});
}
}

使用StepVerifier测试Mono/Flux

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Test
void testFlux() {
Flux<Integer> flux = Flux.just(1, 2, 3)
.map(n -> n * 2);

StepVerifier.create(flux)
.expectNext(2)
.expectNext(4)
.expectNext(6)
.verifyComplete();
}

@Test
void testMono() {
Mono<String> mono = Mono.just("hello")
.map(String::toUpperCase);

StepVerifier.create(mono)
.expectNext("HELLO")
.verifyComplete();
}

@Test
void testError() {
Flux<Integer> flux = Flux.just(1, 2, 0)
.map(n -> 10 / n);

StepVerifier.create(flux)
.expectNext(10)
.expectNext(5)
.expectError(ArithmeticException.class)
.verify();
}

适用场景

适合WebFlux

  • 高并发IO密集型应用
  • 流式数据处理
  • 实时推送(SSE、WebSocket)
  • 微服务网关
  • 需要背压控制的场景

不适合WebFlux

  • CPU密集型应用
  • 使用阻塞库(JDBC、部分SDK)
  • 团队不熟悉响应式编程
  • 简单的CRUD应用

WebFlux vs 虚拟线程

特性 WebFlux 虚拟线程(JDK 21)
编程模型 响应式/函数式 命令式/同步
学习曲线 陡峭 平缓
代码复杂度
调试难度
生态成熟度 一般 复用传统生态
背压支持 原生支持 需要额外实现
适用场景 流式处理、背压 通用高并发IO

选择建议

  • 新项目 + JDK 21+ → 优先考虑虚拟线程
  • 需要流式处理/背压 → WebFlux
  • 已有WebFlux项目 → 继续使用

常见陷阱

陷阱1:阻塞调用

1
2
3
4
5
6
7
8
9
10
11
12
13
// 错误:在响应式链中阻塞
Mono.just(1)
.map(n -> {
Thread.sleep(1000); // 阻塞!
return n;
});

// 正确:使用subscribeOn切换到阻塞调度器
Mono.just(1)
.flatMap(n -> Mono.fromCallable(() -> {
Thread.sleep(1000);
return n;
}).subscribeOn(Schedulers.boundedElastic()));

陷阱2:忘记订阅

1
2
3
4
5
6
// 错误:没有订阅,不会执行
userRepository.save(user); // 什么都不会发生!

// 正确:必须订阅
userRepository.save(user).subscribe();
// 或者在Controller返回Mono,框架会订阅

陷阱3:多次订阅

1
2
3
4
5
6
7
8
9
10
11
// 可能有问题:每次订阅都执行一次
Mono<User> userMono = Mono.fromSupplier(() -> {
log.info("查询数据库");
return userRepository.findById(1L).block();
});

userMono.subscribe(); // 执行一次
userMono.subscribe(); // 又执行一次

// 使用cache()缓存结果
Mono<User> cached = userMono.cache();

总结

WebFlux核心要点:

  1. Mono和Flux是核心类型,分别表示0-1和0-N个元素
  2. 操作符丰富,支持转换、过滤、组合、错误处理
  3. 非阻塞是关键,避免在响应式链中阻塞
  4. 背压控制数据流速,防止生产者过快
  5. 必须订阅才会执行,或由框架自动订阅

适用建议:

  • 需要高并发IO → 考虑WebFlux或虚拟线程
  • 需要流式处理 → WebFlux
  • 需要背压机制 → WebFlux
  • 简单CRUD → Spring MVC即可
  • JDK 21+新项目 → 优先虚拟线程