抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

Reactor是响应式编程的核心库,Netty是高性能网络编程框架。Spring WebFlux底层正是基于这两者构建。本文深入介绍Reactor和Netty的核心原理。

第一部分:Reactor

Reactor是什么

Reactor是一个基于Reactive Streams规范的第四代响应式库,由Pivotal(Spring母公司)开发,是Spring WebFlux的基础。

1
2
3
4
5
6
7
8
9
10
11
Reactive Streams规范


┌───────────────────┐
│ Reactor │ ← 实现规范 + 丰富操作符
├───────────────────┤
│ Mono │ Flux │ ← 核心类型
└───────────────────┘


Spring WebFlux

调度器(Schedulers)

Reactor默认不切换线程,操作在订阅线程执行。使用调度器可以控制执行线程。

内置调度器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 1. immediate:当前线程(默认)
Schedulers.immediate()

// 2. single:单个可复用线程
Schedulers.single()

// 3. parallel:固定大小线程池(CPU核心数)
Schedulers.parallel()

// 4. boundedElastic:弹性线程池,适合阻塞IO
Schedulers.boundedElastic()

// 5. fromExecutorService:自定义线程池
Schedulers.fromExecutorService(myExecutor)

subscribeOn vs publishOn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// subscribeOn:影响整个链的订阅过程(从源头开始)
Flux.just(1, 2, 3)
.map(n -> {
log.info("map1: {}", Thread.currentThread().getName());
return n * 2;
})
.subscribeOn(Schedulers.parallel()) // 整个链在parallel执行
.map(n -> {
log.info("map2: {}", Thread.currentThread().getName());
return n + 1;
})
.subscribe();
// map1和map2都在parallel线程

// publishOn:影响之后的操作(从此处开始切换)
Flux.just(1, 2, 3)
.map(n -> {
log.info("map1: {}", Thread.currentThread().getName());
return n * 2;
})
.publishOn(Schedulers.parallel()) // 之后切换到parallel
.map(n -> {
log.info("map2: {}", Thread.currentThread().getName());
return n + 1;
})
.subscribe();
// map1在订阅线程,map2在parallel线程
1
2
3
4
5
subscribeOn (影响整个链):
[parallel] ──▶ map1 ──▶ map2 ──▶ subscribe

publishOn (影响之后):
[main] ──▶ map1 ──▶ [切换] ──▶ [parallel] ──▶ map2 ──▶ subscribe

处理阻塞操作

1
2
3
4
5
6
7
// 将阻塞操作隔离到boundedElastic线程池
Mono.fromCallable(() -> {
// 阻塞的JDBC调用
return jdbcTemplate.queryForObject("SELECT ...", User.class);
})
.subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行
.flatMap(user -> reactiveProcess(user)); // 后续响应式处理

冷发布者 vs 热发布者

冷发布者(Cold Publisher)

每个订阅者都从头开始接收数据:

1
2
3
4
5
6
7
8
9
10
Flux<Integer> cold = Flux.range(1, 3)
.doOnSubscribe(s -> log.info("订阅了"));

cold.subscribe(n -> log.info("订阅者A: {}", n));
// 订阅了
// 订阅者A: 1, 2, 3

cold.subscribe(n -> log.info("订阅者B: {}", n));
// 订阅了(重新开始)
// 订阅者B: 1, 2, 3

热发布者(Hot Publisher)

数据实时产生,订阅者只收到订阅后的数据:

1
2
3
4
5
6
7
8
9
10
// 使用share()创建热发布者
Flux<Long> hot = Flux.interval(Duration.ofSeconds(1))
.share(); // 多个订阅者共享

hot.subscribe(n -> log.info("订阅者A: {}", n));

Thread.sleep(3000);

hot.subscribe(n -> log.info("订阅者B: {}", n));
// 订阅者B从当前值开始,错过之前的

转换方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 冷 → 热
Flux<Integer> cold = Flux.range(1, 5);

// share():多播,订阅者共享
Flux<Integer> hot1 = cold.share();

// publish().autoConnect():达到指定订阅者数量后开始
Flux<Integer> hot2 = cold.publish().autoConnect(2);

// publish().refCount():有订阅者时活跃,无订阅者时停止
Flux<Integer> hot3 = cold.publish().refCount(1);

// cache():缓存所有数据,新订阅者可获取历史
Flux<Integer> cached = cold.cache();

Context(上下文传递)

响应式链中无法使用ThreadLocal,Reactor提供Context机制:

1
2
3
4
5
6
7
8
9
// 写入Context
Mono.just("Hello")
.flatMap(s -> Mono.deferContextual(ctx -> {
String userId = ctx.get("userId");
return Mono.just(s + " " + userId);
}))
.contextWrite(Context.of("userId", "12345"))
.subscribe(System.out::println);
// Hello 12345

实际应用:传递请求信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 在WebFilter中写入Context
@Component
public class UserContextFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String userId = exchange.getRequest().getHeaders().getFirst("X-User-Id");
return chain.filter(exchange)
.contextWrite(Context.of("userId", userId));
}
}

// 在服务中读取
@Service
public class OrderService {
public Mono<Order> createOrder(Order order) {
return Mono.deferContextual(ctx -> {
String userId = ctx.getOrDefault("userId", "anonymous");
order.setCreatedBy(userId);
return orderRepository.save(order);
});
}
}

Sinks(手动发射)

Sinks是Reactor 3.4+推荐的手动发射数据方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 单播(一个订阅者)
Sinks.One<String> sink = Sinks.one();
Mono<String> mono = sink.asMono();

sink.tryEmitValue("Hello");
mono.subscribe(System.out::println); // Hello

// 多播(多个订阅者)
Sinks.Many<String> manySink = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> flux = manySink.asFlux();

flux.subscribe(s -> System.out.println("A: " + s));
flux.subscribe(s -> System.out.println("B: " + s));

manySink.tryEmitNext("Hello");
// A: Hello
// B: Hello

Sinks类型

1
2
3
4
5
6
7
8
9
10
11
12
// Sinks.one() - 单值
Sinks.One<T>

// Sinks.many().unicast() - 单订阅者
Sinks.Many<T> unicast = Sinks.many().unicast().onBackpressureBuffer();

// Sinks.many().multicast() - 多订阅者,热发布
Sinks.Many<T> multicast = Sinks.many().multicast().onBackpressureBuffer();

// Sinks.many().replay() - 多订阅者,可重放历史
Sinks.Many<T> replay = Sinks.many().replay().all(); // 重放所有
Sinks.Many<T> replayN = Sinks.many().replay().limit(10); // 重放最近10个

高级操作符

transform vs transformDeferred

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// transform:编译时应用变换
Function<Flux<Integer>, Flux<Integer>> addLogging = flux ->
flux.doOnNext(n -> log.info("值: {}", n));

Flux.range(1, 3)
.transform(addLogging)
.subscribe();

// transformDeferred:每次订阅时应用(可以根据Context动态变换)
Flux.range(1, 3)
.transformDeferred(flux ->
Mono.deferContextual(ctx -> {
if (ctx.hasKey("debug")) {
return Mono.just(flux.doOnNext(n -> log.debug("值: {}", n)));
}
return Mono.just(flux);
}).flatMapMany(Function.identity())
)
.contextWrite(Context.of("debug", true))
.subscribe();

expand(递归展开)

1
2
3
4
5
6
7
8
9
// 适合树形结构遍历
Flux<TreeNode> allNodes = Flux.just(rootNode)
.expand(node -> Flux.fromIterable(node.getChildren()));

// 广度优先
.expand(...)

// 深度优先
.expandDeep(...)

windowUntil / bufferUntil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 按条件分组到窗口
Flux.range(1, 10)
.windowUntil(n -> n % 3 == 0)
.flatMap(window -> window.collectList())
.subscribe(System.out::println);
// [1, 2, 3]
// [4, 5, 6]
// [7, 8, 9]
// [10]

// 按时间窗口
Flux.interval(Duration.ofMillis(100))
.window(Duration.ofSeconds(1))
.flatMap(Flux::collectList)
.subscribe(list -> log.info("窗口: {} 个元素", list.size()));

第二部分:Netty

Netty是什么

Netty是一个异步事件驱动的网络应用框架,用于快速开发高性能的网络服务器和客户端。

1
2
3
4
5
6
7
8
9
10
11
12
┌─────────────────────────────────────────────────┐
│ 应用层 │
├─────────────────────────────────────────────────┤
│ 编解码器 / Handler / 业务逻辑 │
├─────────────────────────────────────────────────┤
│ Netty核心 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Channel │ │ EventLoop│ │ ByteBuf │ │
│ └─────────┘ └─────────┘ └─────────┘ │
├─────────────────────────────────────────────────┤
│ Java NIO / Epoll │
└─────────────────────────────────────────────────┘

Java NIO基础

传统IO(BIO)vs NIO:

1
2
3
4
5
6
7
8
9
10
11
BIO(阻塞IO):
客户端1 ──▶ 线程1 ──▶ [阻塞读取...]
客户端2 ──▶ 线程2 ──▶ [阻塞读取...]
客户端3 ──▶ 线程3 ──▶ [阻塞读取...]
每个连接一个线程,线程阻塞等待数据

NIO(非阻塞IO):
客户端1 ──┐
客户端2 ──┼──▶ Selector ──▶ 线程(轮询就绪的Channel)
客户端3 ──┘
单线程管理多个连接,只处理就绪的Channel

NIO核心组件:

  • Channel:双向通道(可读可写)
  • Buffer:数据缓冲区
  • Selector:多路复用器,监听多个Channel

EventLoop模型

Netty的核心是EventLoop,负责处理Channel的所有IO事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌───────────────────────────────────────────────────────┐
│ EventLoopGroup │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ EventLoop 1 │ │ EventLoop 2 │ │ EventLoop 3 │ │
│ │ (线程) │ │ (线程) │ │ (线程) │ │
│ │ │ │ │ │ │ │
│ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │
│ │ │Channel A│ │ │ │Channel C│ │ │ │Channel E│ │ │
│ │ │Channel B│ │ │ │Channel D│ │ │ │Channel F│ │ │
│ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└───────────────────────────────────────────────────────┘

每个Channel绑定到一个EventLoop
一个EventLoop可以处理多个Channel
所有IO操作都在EventLoop线程中执行(无锁)

Channel和ChannelPipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
                        ChannelPipeline
┌──────────────────────────────────────────────────────────┐
│ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Handler1 │──▶│ Handler2 │──▶│ Handler3 │ │
│ │ (Inbound) │ │ (Inbound) │ │ (Inbound) │ │
│ └────────────┘ └────────────┘ └────────────┘ │
│ ▲ │ │
│ │ 入站事件 ▼ │
│ ┌────┴────┐ ┌────┴────┐ │
│ │ Channel │ │ 业务 │ │
│ │ (网络) │ │ 处理 │ │
│ └────┬────┘ └────┬────┘ │
│ │ 出站事件 │ │
│ ▼ │ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Handler6 │◀──│ Handler5 │◀──│ Handler4 │ │
│ │ (Outbound) │ │ (Outbound) │ │ (Outbound) │ │
│ └────────────┘ └────────────┘ └────────────┘ │
│ │
└──────────────────────────────────────────────────────────┘

ChannelHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 入站处理器
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 处理接收的数据
ByteBuf buf = (ByteBuf) msg;
try {
// 处理...
ctx.fireChannelRead(msg); // 传递给下一个Handler
} finally {
buf.release();
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

// 出站处理器
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// 处理发送的数据
ctx.write(msg, promise); // 传递给下一个Handler
}
}

ByteBuf

Netty的ByteBuf比Java NIO的ByteBuffer更强大:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 创建ByteBuf
ByteBuf buf = Unpooled.buffer(256); // 堆内存
ByteBuf direct = Unpooled.directBuffer(); // 直接内存(零拷贝)

// 写入数据
buf.writeInt(42);
buf.writeBytes("Hello".getBytes());

// 读取数据
int num = buf.readInt();
byte[] bytes = new byte[5];
buf.readBytes(bytes);

// ByteBuf有两个指针:readerIndex和writerIndex
// ┌───────────────────────────────────────────┐
// │ discardable │ readable │ writable │
// │ bytes │ bytes │ bytes │
// └───────────────────────────────────────────┘
// 0 readerIndex writerIndex capacity

// 引用计数(避免内存泄漏)
buf.retain(); // 引用+1
buf.release(); // 引用-1,为0时释放

编解码器

常用解码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 1. 固定长度解码器
pipeline.addLast(new FixedLengthFrameDecoder(20));

// 2. 行分隔符解码器
pipeline.addLast(new LineBasedFrameDecoder(1024));

// 3. 自定义分隔符解码器
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));

// 4. 长度字段解码器(最常用)
pipeline.addLast(new LengthFieldBasedFrameDecoder(
65535, // maxFrameLength
0, // lengthFieldOffset
4, // lengthFieldLength
0, // lengthAdjustment
4 // initialBytesToStrip
));

自定义编解码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 消息对象
public class MyMessage {
private int id;
private String content;
}

// 编码器
public class MyMessageEncoder extends MessageToByteEncoder<MyMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, MyMessage msg, ByteBuf out) {
out.writeInt(msg.getId());
byte[] bytes = msg.getContent().getBytes(StandardCharsets.UTF_8);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}

// 解码器
public class MyMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 8) {
return; // 数据不够,等待更多
}

in.markReaderIndex();
int id = in.readInt();
int length = in.readInt();

if (in.readableBytes() < length) {
in.resetReaderIndex(); // 数据不够,重置
return;
}

byte[] bytes = new byte[length];
in.readBytes(bytes);
String content = new String(bytes, StandardCharsets.UTF_8);

out.add(new MyMessage(id, content));
}
}

完整示例:Echo服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class EchoServer {
private final int port;

public EchoServer(int port) {
this.port = port;
}

public void start() throws Exception {
// Boss线程组:接受连接
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// Worker线程组:处理IO
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new EchoServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);

// 绑定端口并启动
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("服务器启动,端口: " + port);

// 等待服务器关闭
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
new EchoServer(8080).start();
}
}

class EchoServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println("收到: " + msg);
ctx.writeAndFlush("Echo: " + msg); // 回显
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("客户端连接: " + ctx.channel().remoteAddress());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

Echo客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class EchoClient {
private final String host;
private final int port;

public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}

public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();

try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new EchoClientHandler());
}
});

ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
new EchoClient("localhost", 8080).start();
}
}

class EchoClientHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush("Hello, Netty!");
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println("服务器响应: " + msg);
}
}

HTTP服务器示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class HttpServer {
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// HTTP编解码器
pipeline.addLast(new HttpServerCodec());
// 聚合HTTP消息
pipeline.addLast(new HttpObjectAggregator(65536));
// 业务处理器
pipeline.addLast(new HttpRequestHandler());
}
});

ChannelFuture future = bootstrap.bind(8080).sync();
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
String uri = request.uri();
String content = "Hello! You requested: " + uri;

FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.copiedBuffer(content, StandardCharsets.UTF_8)
);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());

ctx.writeAndFlush(response);
}
}

第三部分:Reactor Netty

Reactor Netty是什么

Reactor Netty是Reactor和Netty的桥梁,为Netty提供响应式API:

1
2
3
4
5
6
7
8
┌─────────────────────────────────────────────┐
│ Spring WebFlux │
├─────────────────────────────────────────────┤
│ Reactor Netty │
├──────────────────────┬──────────────────────┤
│ Reactor │ Netty │
│ (响应式编程模型) │ (网络框架) │
└──────────────────────┴──────────────────────┘

使用Reactor Netty创建HTTP服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ReactorNettyServer {
public static void main(String[] args) {
HttpServer.create()
.port(8080)
.route(routes -> routes
.get("/hello", (request, response) ->
response.sendString(Mono.just("Hello World!")))
.get("/users/{id}", (request, response) -> {
String id = request.param("id");
return response.sendString(Mono.just("User: " + id));
})
.post("/users", (request, response) ->
request.receive()
.aggregate()
.asString()
.flatMap(body -> response.sendString(Mono.just("Created: " + body))))
)
.bindNow()
.onDispose()
.block();
}
}

使用Reactor Netty创建HTTP客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ReactorNettyClient {
public static void main(String[] args) {
HttpClient client = HttpClient.create()
.baseUrl("http://localhost:8080");

// GET请求
String response = client.get()
.uri("/hello")
.responseContent()
.aggregate()
.asString()
.block();
System.out.println(response);

// POST请求
String postResponse = client.post()
.uri("/users")
.send(ByteBufMono.fromString(Mono.just("{\"name\": \"John\"}")))
.responseContent()
.aggregate()
.asString()
.block();
System.out.println(postResponse);
}
}

TCP服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TcpServer {
public static void main(String[] args) {
DisposableServer server = TcpServer.create()
.port(9000)
.handle((inbound, outbound) -> {
return inbound.receive()
.asString()
.flatMap(msg -> {
System.out.println("收到: " + msg);
return outbound.sendString(Mono.just("Echo: " + msg));
})
.then();
})
.bindNow();

server.onDispose().block();
}
}

WebSocket服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class WebSocketServer {
public static void main(String[] args) {
HttpServer.create()
.port(8080)
.route(routes -> routes
.ws("/ws", (wsInbound, wsOutbound) -> {
return wsOutbound.send(
wsInbound.receive()
.asString()
.map(msg -> "Echo: " + msg)
.map(wsOutbound::alloc()::buffer)
.map(buf -> {
buf.writeCharSequence(msg, StandardCharsets.UTF_8);
return buf;
})
);
})
)
.bindNow()
.onDispose()
.block();
}
}

性能优化技巧

1. 线程模型优化

1
2
3
4
5
6
7
// Netty:调整EventLoop数量
EventLoopGroup workerGroup = new NioEventLoopGroup(
Runtime.getRuntime().availableProcessors() * 2
);

// Reactor:使用合适的调度器
Schedulers.newParallel("custom", 16);

2. 内存优化

1
2
3
4
5
6
7
8
9
// 使用直接内存
ByteBuf buf = Unpooled.directBuffer();

// 池化ByteBuf(减少GC)
ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
ByteBuf pooledBuf = allocator.buffer();

// Netty配置池化
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

3. 背压控制

1
2
3
4
5
6
7
8
// Reactor:控制请求速率
Flux.range(1, 1000)
.limitRate(100) // 每次只请求100个
.subscribe();

// Netty:使用ChannelOption控制缓冲区
bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(32 * 1024, 64 * 1024));

总结

Reactor核心要点

概念 说明
Mono/Flux 0-1和0-N元素的响应式类型
Schedulers 调度器,控制执行线程
subscribeOn 从源头切换线程
publishOn 从此处切换线程
冷/热发布者 冷:每次订阅重新开始;热:实时数据流
Context 响应式链的上下文传递
Sinks 手动发射数据

Netty核心要点

概念 说明
EventLoop 事件循环,处理Channel的IO
Channel 网络连接的抽象
ChannelPipeline Handler链,处理入站/出站事件
ByteBuf 高性能字节缓冲区
编解码器 二进制与对象的转换

选择建议

  • Spring应用 → 直接使用WebFlux
  • 需要底层控制 → 使用Reactor Netty
  • 高性能网络服务 → 使用Netty
  • 响应式数据处理 → 使用Reactor