Netty 客户端启动代码基本如下所示,客户端启动时 Netty 内部为我们做了什么?
- 客户端的线程模型是如何设置的?
- 客户端的 channel 是如何创建并初始化的?
- 具体又是怎么去 connect 服务端的?
1 | EventLoopGroup group = new NioEventLoopGroup(); |
核心概念
Bootstrap
作为抽象辅助类 AbstractBootstrap 的子类,Bootstrap 主要提供给客户端使用;
基于此进行一些 group、channel、option 和 handler 的配置
NioSocketChannel
封装了 jdk 的 SocketChannel,且存在一个属性 readInterestOp 记录对什么事件感兴趣,初始化时为
SelectionKey.OP_READ
;
以及继承自父类 AbstractChannel 的三大属性 id(全局唯一标识)、unsafe(依附于 channel,负责处理操作) 和 pipeline(责任链模式处理请求);Unsafe
Unsafe 附属于 Channel,大部分情况下仅在内部使用;
ChannelPipeline
初始化 DefaultChannelPipeline 时,设置 HeadContext、TailContext 两个节点组成双向链表结构,两个节点均继承自 AbstractChannelHandlerContext;
换句话说 ChannelPipeline 就是一个双向链表 (head ⇄ handlerA ⇄ handlerB ⇄ tail),链表的每个节点是一个 AbstractChannelHandlerContext;
NioEventLoopGroup
有个属性 EventExecutor[],而下面要说的 NioEventLoop 就是 EventExecutor;
客户端只存在一个 NioEventLoopGroup;
NioEventLoop
核心线程(其实看源码的话,好像理解成一个 task 更合理),含有重要属性 executor、taskQueue、selector 等;
创建 NioEventLoopGroup
创建 NioEventLoopGroup 的时候最终调用其父类 MultithreadEventExecutorGroup 的构造方法,主要代码如下:
1 | protected MultithreadEventExecutorGroup(int nThreads, Executor executor, |
创建 NioEventLoopGroup 主要做了以下几件事:
- 没有 executor 则创建一个 executor
- 初始化一个 EventExecutor[],为创建 NioEventLoop 做准备
- 循环调用 newChild 方法(传入 executor )去创建 NioEventLoop 对象,置于 EventExecutor[] 数组当中
- 创建一个 chooser,用来从 EventExecutor[] 选取处一个 NioEventLoop
创建 NioEventLoop
创建 NioEventLoop 的时候会调用其父类 SingleThreadEventExecutor 的构造方法,主要代码如下:
1 | protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, |
创建 NioEventLoop 主要做了以下几件事:
- 创建任务队列,以后调用 NioEventLoop.execute(Runnable task) 的时候均是把 task 放入该任务队列
- 创建一个 selector,之后要把 channel 注册到该 selector 上
设置 Bootstrap
这一步对 Bootstrap 进行配置,无论是 group、channel 还是 option、handler,均是继承的 AbstractBootstrap 的属性;
对其的配置目前也仅仅是在 AbstractBootstrap 中配置属性,尚未做过多的事情;
设置 NIO 线程组
1 | // AbstractBootstrap.group(group) |
设置 IO 模型
1 | // AbstractBootstrap.channel(channelClass) |
设置 TCP 参数
1 | // AbstractBootstrap.option(option, value) |
设置业务处理 handler
1 | // AbstractBootstrap.handler(handler) |
发起连接请求
封装好 SocketAddress 后调用 doResolveAndConnect 方法,代码如下:
1 | // Bootstrap.doResolveAndConnect(remoteAddress, localAddress) |
初始化 channel(NioSocketChannel)
1 | // AbstractBootstrap.initAndRegister() |
注册 channel 到 selector
回到 initAndRegister() 方法中,在创建 channel 并调用 init(channel) 方法后,开始把 channel 注册到 selector 上,通过 EventLoopGroup 来注册;
NioEventLoopGroup 的 register(channel) 方法继承自父类 MultithreadEventLoopGroup
1 | // NioEventLoopGroup.register(channel) |
NioEventLoop 的 register(channel) 方法继承自父类 SingleThreadEventLoop
1 | // NioEventLoop.register(channel) |
AbstractUnsafe 的 register(eventLoop, promise) 相关方法主要代码如下:
1 | // AbstractUnsafe.register(eventLoop, promise) |
触发 handlerAdded、ChannelRegistered 事件
在将 channel 注册到 selector 上后,进行 handlerAdded 事件的传播,如果客户端启动代码配置了 .handler(ChannelInitializer)的话,ChannelInitializer 的 handlerAdded 方法会调用重写 initChannel(socketChannel) 方法,该方法又会将我们实际的客户端处理 handler 按序添加到 pipeline 中。然后再进行 ChannelRegistered 在 pipeline 中的传播。
再次声明下,pipeline 是双向链表,节点是 ChannelHandlerContext,持有 handler 属性及 inbound、outbound 标识;
head 节点既是 inbound 节点也是 outbound 节点,tail 节点只是 inbound 节点;
像 ChannelRegistered 这种 inbound 事件,会从 pipeline 的 head 节点开始处理,然后按照链表顺序查找下一个 inbound 节点,依次处理直到 tail 节点
发起连接请求
在将 channel 注册到 selector 上后,对 remoteAddress 进行解析,解析完成后开始发起连接请求;
发起请求这个操作,也是封装成 task 丢给 eventLoop 线程去执行
1 | // Bootstrap.doResolveAndConnect0(channel, remoteAddress, localAddress, promise) |
跟踪代码到 NioSocketChannel.doConnect(remoteAddress, localAddress) 方法为止,请求算是发出去了,但是连接算建立成功了么?当然没有,一个连接的建立当然还需要服务端的接收。
当服务端接收请求即建立连接后,客户端如何知晓呢?上面我们在 selector 上注册了 SelectionKey.OP_CONNECT 事件,selector 轮询到连接事件后,
最终会调用到 AbstractNioUnsafe 的 finishConnect() 方法。
1 | // NioSocketChannel.finishConnect() |
ChannelActive 事件在 pipeline 上的传播
1 | // DefaultChannelPipeline.fireChannelActive() |
客户端启动总结
创建 NioEventLoopGroup,然后创建 NioEventLoop 来填充 NioEventLoopGroup 的 EventExecutor[] 数组
创建辅助类 Bootstrap,并设置 NIO 线程组、IO 模型、TCP 参数以及业务处理 handler 等
调用 Bootstrap 的 connect 方法进行连接
反射创建一个 NioSocketChannel 对象,并进行初始化(封装 handler 添加到 pipeline 的 tail 节点前)
从 NioEventLoopGroup 选出一个 NioEventLoop,将 channel 注册到它的 selector 上,最终调用的是 jdk 的 ServerSocketChannel 的注册方法,但是 ops 为 0
注册成功后,进行 handlerAdded、ChannelRegistered 等事件的触发
然后开始真正的请求连接,最终还是调用 jdk 的 SocketChannel 的 connect 方法,然后设置 ops 为 SelectionKey.OP_CONNECT
在连接建立后,进行 ChannelActive 事件的传播,其中会修改 channel 注册到 selector 上返回的 selectionKey 的 ops 为 SelectionKey.OP_READ
- 一句话总结:初始化 channel 后注册到 selector 上,触发 handlerAdded、ChannelRegistered 事件后发送连接请求,在连接建立(服务端 accept)以后,接着触发 ChannelActive 事件,其中又会修改 selectionKey 的 ops 以便 selector 轮询到 服务端发来的消息