前面通过源码分析了服务端启动时做了以下这些事:
- 初始化 channel 注册到 eventLoop 的 selector 上
- 触发 handlerAdded、ChannelRegistered 事件
- 进行端口绑定
- 触发 ChannelActive 事件,修改 selectionKey 的 interestOps 为 OP_ACCEPT
客户端启动时做了以下这些事:
- 初始化 channel 注册到 eventLoop 的 selector 上
- 触发 handlerAdded、ChannelRegistered 事件
- 发送连接请求,修改 selectionKey 的 interestOps 为 OP_CONNECT
- 连接建立后,触发 ChannelActive 事件,修改 selectionKey 的 interestOps 为 OP_READ
但是并没有去研究 Netty 的核心 Reactor 线程模型是什么样的,以及服务端如何接收一个客户端新连接的,现在我们通过跟踪源码来了解一下。
Reactor 线程启动及执行
Netty 中的 Reactor 模型可以自由配置成 单线程模型
、多线程模型
和 主从多线程模型
,这里不做过多解释;
服务端通常启用两个线程组 NioEventLoopGroup,一个用来接收客户端的连接请求,也就是本文的重点,我们称之为 bossGroup;另一个用来处理连接建立后的IO读写操作,我们称之为 workerGroup;其中每个 group 又包括一组 NioEventLoop;
一开始 NioEventLoop 线程并没有启动,直到第一次往 NioEventLoop 上注册 channel 的时候,会把注册任务封装成一个 task 丢到 NioEventLoop 的 taskQueue 中等待 NioEventLoop 线程启动后执行,然后将 NioEventLoop 中的 run() 方法封装成 task 交给 execute 去执行,execute 会单独起个线程去执行 NioEventLoop 的 run() 方法,这个核心线程才是我们前面说的 NioEventLoop 线程。
NioEventLoop 的 execute() 方法继承自 SingleThreadEventExecutor:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66// SingleThreadEventExecutor.execute(task)
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 这里就是看当前线程是不是 NioEventLoop 线程,第一次的时候肯定返回 false
boolean inEventLoop = inEventLoop();
// 添加 task 到 taskQueue 中去
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
// SingleThreadEventExecutor.startThread()
private void startThread() {
// 当前未启动才启动
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
// CAS 修改状态成功才启动
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}
// SingleThreadEventExecutor.doStartThread()
private void doStartThread() {
assert thread == null;
// 这里的 executor 一般是 ThreadPerTaskExecutor,其 execute 方法就是简单的 newThread(task).start();
executor.execute(new Runnable() {
public void run() {
// 修改这个属性 thread 为当前线程,也就是 NioEventLoop 线程
// 上面 inEventLoop() 方法就是看执行 `inEventLoop()` 代码的线程是不是 NioEventLoop 线程
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 核心方法,在 NioEventLoop 有实现
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...
}
}
});
}
上面代码已经启动了 NioEventLoop 线程,并开始执行 NioEventLoop 的 run() 方法:
1 |
|
所以 NioEventLoop 线程主要做了以下几件事:
在 selector 上轮询事件
处理轮询到的事件
在超时时间内执行 taskQueue 里的 task
select 操作
1 | // oldWakenUp 表示是否唤醒正在阻塞的 select 操作 |
select 操作可以总结如下:
检查是否有马上就要执行(delay<=0.5ms)的定时任务,有的话跳出循环;跳出前检查是否是第一次 select,是的话进行一次 selectNow() ,并置 selectCnt 为 1;
如果任务队列中有任务,且 wakenUp.compareAndSet(false, true) 成功的话,就 selectNow() 一次,并置 selectCnt 为 1,然后跳出循环;
调用 selector.select(timeoutMillis) 进行阻塞式 select;在此期间,外部线程添加任务可以唤醒阻塞;
如果存在以下情况,则直接跳出循环
轮询到了某些事件
被用户线程唤醒
任务队列里存在待处理的任务
存在已经准备好的待执行的定时任务
- 判断 select 操作是否至少持续了 timeoutMillis,持续了 timeoutMillis 的话则视为有效轮询,否则视为空轮询;当空轮询次数超过阈值(512)后,进行 selector 重建,避免 jdk 空轮询导致 cpu 100% 的 bug;
重建 selector
1 | // NioEventLoop.rebuildSelector() |
总结 Netty 解决 JDK 空轮询 BUG 的方式如下:
新建一个 selector
取消 channel 在旧 selector 上的注册
将 channel 注册到新的 selector 上
维护 channel 和新 selectionKey 的关系
处理轮询到的 SelectedKeys
1 | // NioEventLoop.processSelectedKeysOptimized() |
执行任务 runAllTasks
在进行 select 时间并处理后,需要进行 taskQueue 里的任务处理,NioEventLoop 的 runAllTasks(timeoutNanos) 继承自 SingleThreadEventExecutor
1 | // SingleThreadEventExecutor.runAllTasks(timeoutNanos) |
服务端接收新连接
前面说到 NioEventLoop 线程轮询到事件后调用 NioEventLoop.processSelectedKey(k, ch) 进行处理,其中就包括接收客户端新连接的处理。
轮询到 OP_ACCEPT 事件
NioEventLoop.processSelectedKey(k, ch) 中轮询到 OP_ACCEPT 事件的相关代码如下:
1 | int readyOps = k.readyOps(); |
这里服务端轮询到 OP_ACCEPT 事件后,交给 unsafe 来处理。前面介绍过,unsafe 依附于 channel,channel 的许多操作最终均交付于 unsafe 来处理。
将客户端新连接注册到 workerGroup
NioMessageUnsafe 的 read() 方法相关代码如下:
1 | private final List<Object> readBuf = new ArrayList<Object>(); |
在 accept 到客户端连接,并封装成 NioSocketChannel 对象后,首先需要在 pipeline 上从 head 节点开始传播 ChannelRead 事件
1 | // DefaultChannelPipeline.fireChannelRead(msg) |
然后 ChannelRead 事件从 head 节点经过业务 handler 会传递到 ServerBootstrapAcceptor 上。对服务端启动过程还有印象的话,ServerBootstrapAcceptor 添加时机相关流程如下:
初始化 NioServerSocketChannel 时在 pipeline 中添加 ChannelInitializer
在将 NioServerSocketChannel 注册到 selector 上后,触发 handlerAdded 事件
ChannelInitializer 的 handlerAdded() 方法调用重写的 initChannel(ch) 方法
initChannel(ch) 方法会在 pipeline 上添加自定义的业务 handler,然后给 eventLoop 添加一个异步任务,任务内容为将 ServerBootstrapAcceptor 添加到 pipeline 中
1 | // ServerBootstrapAcceptor.channelRead(ctx, msg) |
NioSocketChannel 对象在 childGroup 上的注册过程和之前的注册分析类似,最终调用到 AbstractUnsafe 的 register0(promise) 方法
1 | // AbstractUnsafe.register0(promise) |
注册 read 事件
上面我们看到个条件 isActive(),它对于服务端和客户端的含义是不一样的:
服务端 NioServerSocketChannel 注册到 bossGroup 的 selector 的时候,它代表是否绑定好了端口
客户端 NioSocketChannel 注册到 group 的 selector 的时候,它代表是否已经建立了连接(此时尚未与服务端建立连接)
服务端 accept 客户端连接后,将 NioSocketChannel 注册到 workerGroup 的 selector 的时候,它代表是否已经建立了连接(此时已经建立了连接)
又由于 doRegister() 在将 SocketChannel 注册到 selector 上的时候 ops 为 0,所以我们需要在 read 之前注册一下 read 事件;
如果是第一次注册的话,在 pipeline 上传播 ChannelActive 事件时,head 节点会通过 readIfIsAutoRead() 修改 ops 注册读事件;
否则的话需要调用 beginRead() 方法去修改 ops 注册 read 事件;
1 | // AbstractNioChannel.beginRead() |
总结
Reactor 线程启动执行总结
在第一次调用 eventLoop.execute(task) 方法的时候,将 NioEventLoop 的 run() 方法封装成一个 task,然后交给 executor 去执行,ThreadPerTaskExecutor 只是简单的 new Thread(task).start() 去启动一个线程,即我们全篇所说的 NioEventLoop 线程。
NioEventLoop 的 run() 方法主要做了以下几件事:
- 轮询 selector 上事件,通过重建 selector 规避了 jdk 的空轮询导致的 cpu 100% 的 bug
- 处理轮询到的事件,包括 connect、accept、read、write 等事件
- 执行任务队列的任务,会根据上面处理事件的 ioTime 和设置的 ioRatio 计算一个超时时间
服务端接收新连接过程总结
- 轮询到 OP_ACCEPT 事件
- 根据 accept 到的 SocketChannel 构造一个 NioSocketChannel
- 将 NioSocketChannel 注册到 workerGroup 中一个 NioEventLoop 的 selector 上
- 进行 handlerAdded、ChannelRegistered 等一些事件的传播
- 在 read 之前需要通过修改 ops 去注册 read 事件