Spring WebFlux是Spring 5引入的响应式Web框架,基于Reactor库实现,支持非阻塞IO和背压机制。本文详细介绍WebFlux的核心概念和使用方法。
什么是响应式编程
响应式编程(Reactive Programming)是一种面向数据流和变化传播的编程范式。
传统命令式 vs 响应式
1 2 3 4 5 6 7 8 9
| List<User> users = userRepository.findAll(); for (User user : users) { process(user); }
userRepository.findAll() .subscribe(user -> process(user));
|
响应式流规范(Reactive Streams)
响应式流定义了4个核心接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public interface Publisher<T> { void subscribe(Subscriber<? super T> subscriber); }
public interface Subscriber<T> { void onSubscribe(Subscription subscription); void onNext(T item); void onError(Throwable throwable); void onComplete(); }
public interface Subscription { void request(long n); void cancel(); }
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
|
1 2 3 4 5 6 7 8
| Publisher ──────────────────────▶ Subscriber onSubscribe(subscription) ◀── request(n) ─── ─── onNext(item) ─▶ ─── onNext(item) ─▶ ◀── request(n) ─── ─── onNext(item) ─▶ ─── onComplete() ─▶
|
WebFlux vs Spring MVC
1 2 3 4 5 6 7 8 9 10 11 12
| ┌─────────────────────────────────────────────────────────┐ │ Spring Web │ ├────────────────────────┬────────────────────────────────┤ │ Spring MVC │ Spring WebFlux │ ├────────────────────────┼────────────────────────────────┤ │ Servlet API │ Reactive Streams │ │ 同步阻塞 │ 异步非阻塞 │ │ 一个请求一个线程 │ 少量线程处理大量请求 │ │ Tomcat/Jetty │ Netty/Undertow │ │ JDBC │ R2DBC │ │ RestTemplate │ WebClient │ └────────────────────────┴────────────────────────────────┘
|
线程模型对比
Spring MVC(阻塞模型):
1 2 3 4 5 6 7
| 请求1 ──▶ 线程1 ──▶ [处理中...等待DB...] ──▶ 响应 请求2 ──▶ 线程2 ──▶ [处理中...等待DB...] ──▶ 响应 请求3 ──▶ 线程3 ──▶ [处理中...等待DB...] ──▶ 响应 ... 请求200 ──▶ 线程200 ──▶ [处理中...]
每个请求占用一个线程,线程在IO等待时空闲
|
WebFlux(非阻塞模型):
1 2 3 4 5 6 7
| 请求1 ──┐ 请求2 ──┼──▶ 线程1 ──▶ [处理] ──▶ 等待IO(让出线程) 请求3 ──┤ ↑ ... │ │ 请求N ──┘ IO完成时继续处理
少量线程处理大量请求,IO等待时不占用线程
|
Mono和Flux
WebFlux基于Project Reactor,核心是两个发布者类型:
Mono:0或1个元素
1 2 3 4 5 6 7 8 9 10 11
| Mono<String> mono = Mono.just("Hello");
Mono<String> empty = Mono.empty();
Mono<User> user = Mono.fromSupplier(() -> userService.findById(1L));
Mono<String> fromCallable = Mono.fromCallable(() -> heavyComputation());
|
1 2
| Mono<T>: ────○────| 0或1个元素 完成
|
Flux:0到N个元素
1 2 3 4 5 6 7 8 9 10 11
| Flux<String> flux = Flux.just("A", "B", "C");
Flux<Integer> fromList = Flux.fromIterable(Arrays.asList(1, 2, 3));
Flux<Integer> range = Flux.range(1, 10);
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1));
|
1 2
| Flux<T>: ────○────○────○────○────| 元素1 元素2 元素3 元素4 完成
|
创建Mono/Flux
静态工厂方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Mono.just(value) Mono.empty() Mono.error(throwable) Mono.fromSupplier(supplier) Mono.fromCallable(callable) Mono.fromFuture(future) Mono.defer(() -> Mono.just(x))
Flux.just(v1, v2, v3) Flux.fromIterable(list) Flux.fromArray(array) Flux.range(start, count) Flux.interval(duration) Flux.generate(generator) Flux.create(sink -> {...})
|
使用generate同步生成
1 2 3 4 5 6 7 8 9 10 11
| Flux<Integer> flux = Flux.generate( () -> 0, (state, sink) -> { sink.next(state); if (state == 10) { sink.complete(); } return state + 1; } );
|
使用create异步生成
1 2 3 4 5 6
| Flux<String> flux = Flux.create(sink -> { someAsyncAPI.onData(data -> sink.next(data)); someAsyncAPI.onComplete(() -> sink.complete()); someAsyncAPI.onError(e -> sink.error(e)); });
|
常用操作符
转换操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Flux.just(1, 2, 3) .map(n -> n * 2)
Flux.just(1, 2, 3) .flatMap(n -> Flux.range(1, n))
Flux.just(1, 2, 3) .flatMapSequential(n -> Flux.range(1, n))
Flux.just(1, 2, 3) .concatMap(n -> Flux.range(1, n))
|
过滤操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| Flux.range(1, 10) .filter(n -> n % 2 == 0)
Flux.range(1, 100) .take(5)
Flux.range(1, 10) .skip(3)
Flux.just(1, 2, 2, 3, 3, 3) .distinct()
Flux.just(1, 1, 2, 2, 1, 1) .distinctUntilChanged()
|
组合操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| Flux.concat( Flux.just(1, 2), Flux.just(3, 4) )
Flux.merge( Flux.interval(Duration.ofMillis(100)).take(3), Flux.interval(Duration.ofMillis(150)).take(3) )
Flux.zip( Flux.just("A", "B", "C"), Flux.just(1, 2, 3), (letter, number) -> letter + number )
Flux.combineLatest( flux1, flux2, (a, b) -> a + b )
|
错误处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| Flux.just(1, 2, 0) .map(n -> 10 / n) .onErrorReturn(-1)
Flux.just(1, 2, 0) .map(n -> 10 / n) .onErrorResume(e -> Flux.just(-1, -2))
Flux.just(1, 2, 0, 4) .map(n -> 10 / n) .onErrorContinue((e, val) -> log.warn("跳过: {}", val))
Flux.just(1) .flatMap(n -> callUnstableService()) .retry(3)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
|
副作用操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Flux.just(1, 2, 3) .doOnNext(n -> log.info("处理: {}", n)) .subscribe();
.doOnError(e -> log.error("出错", e))
.doOnComplete(() -> log.info("完成"))
.doOnSubscribe(sub -> log.info("开始订阅"))
.doFinally(signalType -> log.info("结束: {}", signalType))
|
聚合操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Flux.range(1, 5) .reduce((a, b) -> a + b)
Flux.just(1, 2, 3) .collectList()
Flux.range(1, 100) .count()
|
WebFlux使用方式
方式1:注解式(与Spring MVC类似)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @RestController @RequestMapping("/users") public class UserController {
@Autowired private UserRepository userRepository;
@GetMapping("/{id}") public Mono<User> getUser(@PathVariable Long id) { return userRepository.findById(id); }
@GetMapping public Flux<User> getAllUsers() { return userRepository.findAll(); }
@PostMapping public Mono<User> createUser(@RequestBody Mono<User> userMono) { return userMono.flatMap(userRepository::save); }
@PutMapping("/{id}") public Mono<User> updateUser(@PathVariable Long id, @RequestBody User user) { return userRepository.findById(id) .flatMap(existing -> { existing.setName(user.getName()); return userRepository.save(existing); }); }
@DeleteMapping("/{id}") public Mono<Void> deleteUser(@PathVariable Long id) { return userRepository.deleteById(id); } }
|
方式2:函数式端点(RouterFunction)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| @Configuration public class RouterConfig {
@Bean public RouterFunction<ServerResponse> routes(UserHandler handler) { return RouterFunctions.route() .GET("/users", handler::getAllUsers) .GET("/users/{id}", handler::getUser) .POST("/users", handler::createUser) .PUT("/users/{id}", handler::updateUser) .DELETE("/users/{id}", handler::deleteUser) .build(); } }
@Component public class UserHandler {
@Autowired private UserRepository userRepository;
public Mono<ServerResponse> getAllUsers(ServerRequest request) { return ServerResponse.ok() .contentType(MediaType.APPLICATION_JSON) .body(userRepository.findAll(), User.class); }
public Mono<ServerResponse> getUser(ServerRequest request) { Long id = Long.parseLong(request.pathVariable("id")); return userRepository.findById(id) .flatMap(user -> ServerResponse.ok() .contentType(MediaType.APPLICATION_JSON) .bodyValue(user)) .switchIfEmpty(ServerResponse.notFound().build()); }
public Mono<ServerResponse> createUser(ServerRequest request) { return request.bodyToMono(User.class) .flatMap(userRepository::save) .flatMap(saved -> ServerResponse .created(URI.create("/users/" + saved.getId())) .bodyValue(saved)); }
public Mono<ServerResponse> updateUser(ServerRequest request) { Long id = Long.parseLong(request.pathVariable("id")); return request.bodyToMono(User.class) .flatMap(user -> userRepository.findById(id) .flatMap(existing -> { existing.setName(user.getName()); return userRepository.save(existing); })) .flatMap(updated -> ServerResponse.ok().bodyValue(updated)) .switchIfEmpty(ServerResponse.notFound().build()); }
public Mono<ServerResponse> deleteUser(ServerRequest request) { Long id = Long.parseLong(request.pathVariable("id")); return userRepository.deleteById(id) .then(ServerResponse.noContent().build()); } }
|
响应式数据访问
R2DBC(响应式数据库连接)
1 2 3 4 5 6 7 8 9
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-r2dbc</artifactId> </dependency> <dependency> <groupId>io.r2dbc</groupId> <artifactId>r2dbc-postgresql</artifactId> </dependency>
|
1 2 3 4 5 6
| spring: r2dbc: url: r2dbc:postgresql://localhost:5432/mydb username: user password: password
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Table("users") public class User { @Id private Long id; private String name; private String email; }
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findByName(String name);
@Query("SELECT * FROM users WHERE email = :email") Mono<User> findByEmail(String email); }
|
响应式MongoDB
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId> </dependency>
|
1 2 3
| public interface UserRepository extends ReactiveMongoRepository<User, String> { Flux<User> findByAgeGreaterThan(int age); }
|
响应式Redis
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Configuration public class RedisConfig {
@Bean public ReactiveRedisTemplate<String, User> reactiveRedisTemplate( ReactiveRedisConnectionFactory factory) { return new ReactiveRedisTemplate<>(factory, context); } }
@Service public class UserCacheService {
@Autowired private ReactiveRedisTemplate<String, User> redisTemplate;
public Mono<User> getUser(String id) { return redisTemplate.opsForValue().get("user:" + id); }
public Mono<Boolean> saveUser(User user) { return redisTemplate.opsForValue() .set("user:" + user.getId(), user, Duration.ofHours(1)); } }
|
WebClient(响应式HTTP客户端)
WebClient是RestTemplate的响应式替代品。
基本使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| WebClient client = WebClient.create("https://api.example.com");
Mono<User> user = client.get() .uri("/users/{id}", 1) .retrieve() .bodyToMono(User.class);
Flux<User> users = client.get() .uri("/users") .retrieve() .bodyToFlux(User.class);
Mono<User> created = client.post() .uri("/users") .contentType(MediaType.APPLICATION_JSON) .bodyValue(new User("John", "john@example.com")) .retrieve() .bodyToMono(User.class);
|
配置WebClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Configuration public class WebClientConfig {
@Bean public WebClient webClient() { return WebClient.builder() .baseUrl("https://api.example.com") .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer token") .filter(logRequest()) .filter(logResponse()) .build(); }
private ExchangeFilterFunction logRequest() { return ExchangeFilterFunction.ofRequestProcessor(request -> { log.info("Request: {} {}", request.method(), request.url()); return Mono.just(request); }); }
private ExchangeFilterFunction logResponse() { return ExchangeFilterFunction.ofResponseProcessor(response -> { log.info("Response: {}", response.statusCode()); return Mono.just(response); }); } }
|
错误处理
1 2 3 4 5 6 7 8
| Mono<User> user = client.get() .uri("/users/{id}", 1) .retrieve() .onStatus(HttpStatusCode::is4xxClientError, response -> Mono.error(new UserNotFoundException("用户不存在"))) .onStatus(HttpStatusCode::is5xxServerError, response -> Mono.error(new ServiceException("服务器错误"))) .bodyToMono(User.class);
|
并行调用多个API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public Mono<UserProfile> getUserProfile(Long userId) { Mono<User> userMono = client.get() .uri("/users/{id}", userId) .retrieve() .bodyToMono(User.class);
Mono<List<Order>> ordersMono = client.get() .uri("/users/{id}/orders", userId) .retrieve() .bodyToFlux(Order.class) .collectList();
Mono<List<Address>> addressesMono = client.get() .uri("/users/{id}/addresses", userId) .retrieve() .bodyToFlux(Address.class) .collectList();
return Mono.zip(userMono, ordersMono, addressesMono) .map(tuple -> new UserProfile( tuple.getT1(), tuple.getT2(), tuple.getT3() )); }
|
背压(Backpressure)
背压是响应式流的核心特性,允许消费者控制数据流速。
背压策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Flux.range(1, 1000) .onBackpressureBuffer(100)
.onBackpressureDrop(dropped -> log.warn("丢弃: {}", dropped))
.onBackpressureLatest()
.onBackpressureError()
.subscribe();
|
控制请求速率
1 2 3 4 5
| Flux.range(1, 100) .limitRate(10) .subscribe(n -> { });
|
Server-Sent Events(SSE)
WebFlux天然支持SSE:
1 2 3 4 5 6 7 8 9
| @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<String>> stream() { return Flux.interval(Duration.ofSeconds(1)) .map(seq -> ServerSentEvent.<String>builder() .id(String.valueOf(seq)) .event("message") .data("数据 " + seq) .build()); }
|
客户端:
1 2 3 4
| const eventSource = new EventSource('/stream'); eventSource.onmessage = (event) => { console.log(event.data); };
|
WebSocket支持
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Configuration public class WebSocketConfig {
@Bean public HandlerMapping webSocketHandlerMapping() { Map<String, WebSocketHandler> map = new HashMap<>(); map.put("/ws/chat", new ChatWebSocketHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping(); mapping.setUrlMap(map); mapping.setOrder(-1); return mapping; }
@Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); } }
public class ChatWebSocketHandler implements WebSocketHandler {
@Override public Mono<Void> handle(WebSocketSession session) { Flux<WebSocketMessage> output = session.receive() .map(WebSocketMessage::getPayloadAsText) .map(msg -> "Echo: " + msg) .map(session::textMessage);
return session.send(output); } }
|
测试
使用WebTestClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) class UserControllerTest {
@Autowired private WebTestClient webTestClient;
@Test void shouldGetUser() { webTestClient.get() .uri("/users/1") .exchange() .expectStatus().isOk() .expectBody(User.class) .value(user -> { assertThat(user.getId()).isEqualTo(1L); }); }
@Test void shouldGetAllUsers() { webTestClient.get() .uri("/users") .exchange() .expectStatus().isOk() .expectBodyList(User.class) .hasSize(3); }
@Test void shouldCreateUser() { User newUser = new User("John", "john@example.com");
webTestClient.post() .uri("/users") .contentType(MediaType.APPLICATION_JSON) .bodyValue(newUser) .exchange() .expectStatus().isCreated() .expectBody(User.class) .value(user -> { assertThat(user.getId()).isNotNull(); assertThat(user.getName()).isEqualTo("John"); }); } }
|
使用StepVerifier测试Mono/Flux
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Test void testFlux() { Flux<Integer> flux = Flux.just(1, 2, 3) .map(n -> n * 2);
StepVerifier.create(flux) .expectNext(2) .expectNext(4) .expectNext(6) .verifyComplete(); }
@Test void testMono() { Mono<String> mono = Mono.just("hello") .map(String::toUpperCase);
StepVerifier.create(mono) .expectNext("HELLO") .verifyComplete(); }
@Test void testError() { Flux<Integer> flux = Flux.just(1, 2, 0) .map(n -> 10 / n);
StepVerifier.create(flux) .expectNext(10) .expectNext(5) .expectError(ArithmeticException.class) .verify(); }
|
适用场景
适合WebFlux
- 高并发IO密集型应用
- 流式数据处理
- 实时推送(SSE、WebSocket)
- 微服务网关
- 需要背压控制的场景
不适合WebFlux
- CPU密集型应用
- 使用阻塞库(JDBC、部分SDK)
- 团队不熟悉响应式编程
- 简单的CRUD应用
WebFlux vs 虚拟线程
| 特性 |
WebFlux |
虚拟线程(JDK 21) |
| 编程模型 |
响应式/函数式 |
命令式/同步 |
| 学习曲线 |
陡峭 |
平缓 |
| 代码复杂度 |
高 |
低 |
| 调试难度 |
难 |
易 |
| 生态成熟度 |
一般 |
复用传统生态 |
| 背压支持 |
原生支持 |
需要额外实现 |
| 适用场景 |
流式处理、背压 |
通用高并发IO |
选择建议:
- 新项目 + JDK 21+ → 优先考虑虚拟线程
- 需要流式处理/背压 → WebFlux
- 已有WebFlux项目 → 继续使用
常见陷阱
陷阱1:阻塞调用
1 2 3 4 5 6 7 8 9 10 11 12 13
| Mono.just(1) .map(n -> { Thread.sleep(1000); return n; });
Mono.just(1) .flatMap(n -> Mono.fromCallable(() -> { Thread.sleep(1000); return n; }).subscribeOn(Schedulers.boundedElastic()));
|
陷阱2:忘记订阅
1 2 3 4 5 6
| userRepository.save(user);
userRepository.save(user).subscribe();
|
陷阱3:多次订阅
1 2 3 4 5 6 7 8 9 10 11
| Mono<User> userMono = Mono.fromSupplier(() -> { log.info("查询数据库"); return userRepository.findById(1L).block(); });
userMono.subscribe(); userMono.subscribe();
Mono<User> cached = userMono.cache();
|
总结
WebFlux核心要点:
- Mono和Flux是核心类型,分别表示0-1和0-N个元素
- 操作符丰富,支持转换、过滤、组合、错误处理
- 非阻塞是关键,避免在响应式链中阻塞
- 背压控制数据流速,防止生产者过快
- 必须订阅才会执行,或由框架自动订阅
适用建议:
- 需要高并发IO → 考虑WebFlux或虚拟线程
- 需要流式处理 → WebFlux
- 需要背压机制 → WebFlux
- 简单CRUD → Spring MVC即可
- JDK 21+新项目 → 优先虚拟线程