【Java】Reactor与Netty核心原理详解
Reactor是响应式编程的核心库,Netty是高性能网络编程框架。Spring WebFlux底层正是基于这两者构建。本文深入介绍Reactor和Netty的核心原理。
第一部分:Reactor
Reactor是什么
Reactor是一个基于Reactive Streams规范的第四代响应式库,由Pivotal(Spring母公司)开发,是Spring WebFlux的基础。
1 2 3 4 5 6 7 8 9 10 11
| Reactive Streams规范 │ ▼ ┌───────────────────┐ │ Reactor │ ← 实现规范 + 丰富操作符 ├───────────────────┤ │ Mono │ Flux │ ← 核心类型 └───────────────────┘ │ ▼ Spring WebFlux
|
调度器(Schedulers)
Reactor默认不切换线程,操作在订阅线程执行。使用调度器可以控制执行线程。
内置调度器
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Schedulers.immediate()
Schedulers.single()
Schedulers.parallel()
Schedulers.boundedElastic()
Schedulers.fromExecutorService(myExecutor)
|
subscribeOn vs publishOn
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| Flux.just(1, 2, 3) .map(n -> { log.info("map1: {}", Thread.currentThread().getName()); return n * 2; }) .subscribeOn(Schedulers.parallel()) .map(n -> { log.info("map2: {}", Thread.currentThread().getName()); return n + 1; }) .subscribe();
Flux.just(1, 2, 3) .map(n -> { log.info("map1: {}", Thread.currentThread().getName()); return n * 2; }) .publishOn(Schedulers.parallel()) .map(n -> { log.info("map2: {}", Thread.currentThread().getName()); return n + 1; }) .subscribe();
|
1 2 3 4 5
| subscribeOn (影响整个链): [parallel] ──▶ map1 ──▶ map2 ──▶ subscribe
publishOn (影响之后): [main] ──▶ map1 ──▶ [切换] ──▶ [parallel] ──▶ map2 ──▶ subscribe
|
处理阻塞操作
1 2 3 4 5 6 7
| Mono.fromCallable(() -> { return jdbcTemplate.queryForObject("SELECT ...", User.class); }) .subscribeOn(Schedulers.boundedElastic()) .flatMap(user -> reactiveProcess(user));
|
冷发布者 vs 热发布者
冷发布者(Cold Publisher)
每个订阅者都从头开始接收数据:
1 2 3 4 5 6 7 8 9 10
| Flux<Integer> cold = Flux.range(1, 3) .doOnSubscribe(s -> log.info("订阅了"));
cold.subscribe(n -> log.info("订阅者A: {}", n));
cold.subscribe(n -> log.info("订阅者B: {}", n));
|
热发布者(Hot Publisher)
数据实时产生,订阅者只收到订阅后的数据:
1 2 3 4 5 6 7 8 9 10
| Flux<Long> hot = Flux.interval(Duration.ofSeconds(1)) .share();
hot.subscribe(n -> log.info("订阅者A: {}", n));
Thread.sleep(3000);
hot.subscribe(n -> log.info("订阅者B: {}", n));
|
转换方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Flux<Integer> cold = Flux.range(1, 5);
Flux<Integer> hot1 = cold.share();
Flux<Integer> hot2 = cold.publish().autoConnect(2);
Flux<Integer> hot3 = cold.publish().refCount(1);
Flux<Integer> cached = cold.cache();
|
Context(上下文传递)
响应式链中无法使用ThreadLocal,Reactor提供Context机制:
1 2 3 4 5 6 7 8 9
| Mono.just("Hello") .flatMap(s -> Mono.deferContextual(ctx -> { String userId = ctx.get("userId"); return Mono.just(s + " " + userId); })) .contextWrite(Context.of("userId", "12345")) .subscribe(System.out::println);
|
实际应用:传递请求信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Component public class UserContextFilter implements WebFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { String userId = exchange.getRequest().getHeaders().getFirst("X-User-Id"); return chain.filter(exchange) .contextWrite(Context.of("userId", userId)); } }
@Service public class OrderService { public Mono<Order> createOrder(Order order) { return Mono.deferContextual(ctx -> { String userId = ctx.getOrDefault("userId", "anonymous"); order.setCreatedBy(userId); return orderRepository.save(order); }); } }
|
Sinks(手动发射)
Sinks是Reactor 3.4+推荐的手动发射数据方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Sinks.One<String> sink = Sinks.one(); Mono<String> mono = sink.asMono();
sink.tryEmitValue("Hello"); mono.subscribe(System.out::println);
Sinks.Many<String> manySink = Sinks.many().multicast().onBackpressureBuffer(); Flux<String> flux = manySink.asFlux();
flux.subscribe(s -> System.out.println("A: " + s)); flux.subscribe(s -> System.out.println("B: " + s));
manySink.tryEmitNext("Hello");
|
Sinks类型
1 2 3 4 5 6 7 8 9 10 11 12
| Sinks.One<T>
Sinks.Many<T> unicast = Sinks.many().unicast().onBackpressureBuffer();
Sinks.Many<T> multicast = Sinks.many().multicast().onBackpressureBuffer();
Sinks.Many<T> replay = Sinks.many().replay().all(); Sinks.Many<T> replayN = Sinks.many().replay().limit(10);
|
高级操作符
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| Function<Flux<Integer>, Flux<Integer>> addLogging = flux -> flux.doOnNext(n -> log.info("值: {}", n));
Flux.range(1, 3) .transform(addLogging) .subscribe();
Flux.range(1, 3) .transformDeferred(flux -> Mono.deferContextual(ctx -> { if (ctx.hasKey("debug")) { return Mono.just(flux.doOnNext(n -> log.debug("值: {}", n))); } return Mono.just(flux); }).flatMapMany(Function.identity()) ) .contextWrite(Context.of("debug", true)) .subscribe();
|
expand(递归展开)
1 2 3 4 5 6 7 8 9
| Flux<TreeNode> allNodes = Flux.just(rootNode) .expand(node -> Flux.fromIterable(node.getChildren()));
.expand(...)
.expandDeep(...)
|
windowUntil / bufferUntil
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| Flux.range(1, 10) .windowUntil(n -> n % 3 == 0) .flatMap(window -> window.collectList()) .subscribe(System.out::println);
Flux.interval(Duration.ofMillis(100)) .window(Duration.ofSeconds(1)) .flatMap(Flux::collectList) .subscribe(list -> log.info("窗口: {} 个元素", list.size()));
|
第二部分:Netty
Netty是什么
Netty是一个异步事件驱动的网络应用框架,用于快速开发高性能的网络服务器和客户端。
1 2 3 4 5 6 7 8 9 10 11 12
| ┌─────────────────────────────────────────────────┐ │ 应用层 │ ├─────────────────────────────────────────────────┤ │ 编解码器 / Handler / 业务逻辑 │ ├─────────────────────────────────────────────────┤ │ Netty核心 │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ Channel │ │ EventLoop│ │ ByteBuf │ │ │ └─────────┘ └─────────┘ └─────────┘ │ ├─────────────────────────────────────────────────┤ │ Java NIO / Epoll │ └─────────────────────────────────────────────────┘
|
Java NIO基础
传统IO(BIO)vs NIO:
1 2 3 4 5 6 7 8 9 10 11
| BIO(阻塞IO): 客户端1 ──▶ 线程1 ──▶ [阻塞读取...] 客户端2 ──▶ 线程2 ──▶ [阻塞读取...] 客户端3 ──▶ 线程3 ──▶ [阻塞读取...] 每个连接一个线程,线程阻塞等待数据
NIO(非阻塞IO): 客户端1 ──┐ 客户端2 ──┼──▶ Selector ──▶ 线程(轮询就绪的Channel) 客户端3 ──┘ 单线程管理多个连接,只处理就绪的Channel
|
NIO核心组件:
- Channel:双向通道(可读可写)
- Buffer:数据缓冲区
- Selector:多路复用器,监听多个Channel
EventLoop模型
Netty的核心是EventLoop,负责处理Channel的所有IO事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| ┌───────────────────────────────────────────────────────┐ │ EventLoopGroup │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ EventLoop 1 │ │ EventLoop 2 │ │ EventLoop 3 │ │ │ │ (线程) │ │ (线程) │ │ (线程) │ │ │ │ │ │ │ │ │ │ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │ │ │ │Channel A│ │ │ │Channel C│ │ │ │Channel E│ │ │ │ │ │Channel B│ │ │ │Channel D│ │ │ │Channel F│ │ │ │ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └───────────────────────────────────────────────────────┘
每个Channel绑定到一个EventLoop 一个EventLoop可以处理多个Channel 所有IO操作都在EventLoop线程中执行(无锁)
|
Channel和ChannelPipeline
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| ChannelPipeline ┌──────────────────────────────────────────────────────────┐ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ Handler1 │──▶│ Handler2 │──▶│ Handler3 │ │ │ │ (Inbound) │ │ (Inbound) │ │ (Inbound) │ │ │ └────────────┘ └────────────┘ └────────────┘ │ │ ▲ │ │ │ │ 入站事件 ▼ │ │ ┌────┴────┐ ┌────┴────┐ │ │ │ Channel │ │ 业务 │ │ │ │ (网络) │ │ 处理 │ │ │ └────┬────┘ └────┬────┘ │ │ │ 出站事件 │ │ │ ▼ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ Handler6 │◀──│ Handler5 │◀──│ Handler4 │ │ │ │ (Outbound) │ │ (Outbound) │ │ (Outbound) │ │ │ └────────────┘ └────────────┘ └────────────┘ │ │ │ └──────────────────────────────────────────────────────────┘
|
ChannelHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class MyInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf buf = (ByteBuf) msg; try { ctx.fireChannelRead(msg); } finally { buf.release(); } }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { ctx.write(msg, promise); } }
|
ByteBuf
Netty的ByteBuf比Java NIO的ByteBuffer更强大:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| ByteBuf buf = Unpooled.buffer(256); ByteBuf direct = Unpooled.directBuffer();
buf.writeInt(42); buf.writeBytes("Hello".getBytes());
int num = buf.readInt(); byte[] bytes = new byte[5]; buf.readBytes(bytes);
buf.retain(); buf.release();
|
编解码器
常用解码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| pipeline.addLast(new FixedLengthFrameDecoder(20));
pipeline.addLast(new LineBasedFrameDecoder(1024));
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
pipeline.addLast(new LengthFieldBasedFrameDecoder( 65535, 0, 4, 0, 4 ));
|
自定义编解码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class MyMessage { private int id; private String content; }
public class MyMessageEncoder extends MessageToByteEncoder<MyMessage> { @Override protected void encode(ChannelHandlerContext ctx, MyMessage msg, ByteBuf out) { out.writeInt(msg.getId()); byte[] bytes = msg.getContent().getBytes(StandardCharsets.UTF_8); out.writeInt(bytes.length); out.writeBytes(bytes); } }
public class MyMessageDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < 8) { return; }
in.markReaderIndex(); int id = in.readInt(); int length = in.readInt();
if (in.readableBytes() < length) { in.resetReaderIndex(); return; }
byte[] bytes = new byte[length]; in.readBytes(bytes); String content = new String(bytes, StandardCharsets.UTF_8);
out.add(new MyMessage(id, content)); } }
|
完整示例:Echo服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| public class EchoServer { private final int port;
public EchoServer(int port) { this.port = port; }
public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new EchoServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(port).sync(); System.out.println("服务器启动,端口: " + port);
future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }
public static void main(String[] args) throws Exception { new EchoServer(8080).start(); } }
class EchoServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println("收到: " + msg); ctx.writeAndFlush("Echo: " + msg); }
@Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("客户端连接: " + ctx.channel().remoteAddress()); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
|
Echo客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public class EchoClient { private final String host; private final int port;
public EchoClient(String host, int port) { this.host = host; this.port = port; }
public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup();
try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new EchoClientHandler()); } });
ChannelFuture future = bootstrap.connect(host, port).sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } }
public static void main(String[] args) throws Exception { new EchoClient("localhost", 8080).start(); } }
class EchoClientHandler extends SimpleChannelInboundHandler<String> { @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush("Hello, Netty!"); }
@Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println("服务器响应: " + msg); } }
|
HTTP服务器示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public class HttpServer { public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new HttpRequestHandler()); } });
ChannelFuture future = bootstrap.bind(8080).sync(); future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) { String uri = request.uri(); String content = "Hello! You requested: " + uri;
FullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(content, StandardCharsets.UTF_8) ); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
ctx.writeAndFlush(response); } }
|
第三部分:Reactor Netty
Reactor Netty是什么
Reactor Netty是Reactor和Netty的桥梁,为Netty提供响应式API:
1 2 3 4 5 6 7 8
| ┌─────────────────────────────────────────────┐ │ Spring WebFlux │ ├─────────────────────────────────────────────┤ │ Reactor Netty │ ├──────────────────────┬──────────────────────┤ │ Reactor │ Netty │ │ (响应式编程模型) │ (网络框架) │ └──────────────────────┴──────────────────────┘
|
使用Reactor Netty创建HTTP服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class ReactorNettyServer { public static void main(String[] args) { HttpServer.create() .port(8080) .route(routes -> routes .get("/hello", (request, response) -> response.sendString(Mono.just("Hello World!"))) .get("/users/{id}", (request, response) -> { String id = request.param("id"); return response.sendString(Mono.just("User: " + id)); }) .post("/users", (request, response) -> request.receive() .aggregate() .asString() .flatMap(body -> response.sendString(Mono.just("Created: " + body)))) ) .bindNow() .onDispose() .block(); } }
|
使用Reactor Netty创建HTTP客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class ReactorNettyClient { public static void main(String[] args) { HttpClient client = HttpClient.create() .baseUrl("http://localhost:8080");
String response = client.get() .uri("/hello") .responseContent() .aggregate() .asString() .block(); System.out.println(response);
String postResponse = client.post() .uri("/users") .send(ByteBufMono.fromString(Mono.just("{\"name\": \"John\"}"))) .responseContent() .aggregate() .asString() .block(); System.out.println(postResponse); } }
|
TCP服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class TcpServer { public static void main(String[] args) { DisposableServer server = TcpServer.create() .port(9000) .handle((inbound, outbound) -> { return inbound.receive() .asString() .flatMap(msg -> { System.out.println("收到: " + msg); return outbound.sendString(Mono.just("Echo: " + msg)); }) .then(); }) .bindNow();
server.onDispose().block(); } }
|
WebSocket服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class WebSocketServer { public static void main(String[] args) { HttpServer.create() .port(8080) .route(routes -> routes .ws("/ws", (wsInbound, wsOutbound) -> { return wsOutbound.send( wsInbound.receive() .asString() .map(msg -> "Echo: " + msg) .map(wsOutbound::alloc()::buffer) .map(buf -> { buf.writeCharSequence(msg, StandardCharsets.UTF_8); return buf; }) ); }) ) .bindNow() .onDispose() .block(); } }
|
性能优化技巧
1. 线程模型优化
1 2 3 4 5 6 7
| EventLoopGroup workerGroup = new NioEventLoopGroup( Runtime.getRuntime().availableProcessors() * 2 );
Schedulers.newParallel("custom", 16);
|
2. 内存优化
1 2 3 4 5 6 7 8 9
| ByteBuf buf = Unpooled.directBuffer();
ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT; ByteBuf pooledBuf = allocator.buffer();
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
|
3. 背压控制
1 2 3 4 5 6 7 8
| Flux.range(1, 1000) .limitRate(100) .subscribe();
bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(32 * 1024, 64 * 1024));
|
总结
Reactor核心要点
| 概念 |
说明 |
| Mono/Flux |
0-1和0-N元素的响应式类型 |
| Schedulers |
调度器,控制执行线程 |
| subscribeOn |
从源头切换线程 |
| publishOn |
从此处切换线程 |
| 冷/热发布者 |
冷:每次订阅重新开始;热:实时数据流 |
| Context |
响应式链的上下文传递 |
| Sinks |
手动发射数据 |
Netty核心要点
| 概念 |
说明 |
| EventLoop |
事件循环,处理Channel的IO |
| Channel |
网络连接的抽象 |
| ChannelPipeline |
Handler链,处理入站/出站事件 |
| ByteBuf |
高性能字节缓冲区 |
| 编解码器 |
二进制与对象的转换 |
选择建议
- Spring应用 → 直接使用WebFlux
- 需要底层控制 → 使用Reactor Netty
- 高性能网络服务 → 使用Netty
- 响应式数据处理 → 使用Reactor