Zookeeper 客户端和服务端

前面两篇文章介绍了 Zookeeper 的 Watcher 机制以及 Session 机制,下面分别从客户端和服务端两个方面来看下 Zookeeper 是怎么处理的。

客户端

Zookeeper 客户端主要由以下几个核心组件组成:

  • Zookeeper 实例:客户端的入口。
  • ClientWatchManager:客户端 Watcher 管理器。
  • HostProvider:客户端地址列表管理器。
  • ClientCnxn:客户端核心线程,其内部又包含两个线程,SendThread 和 EventThread。前者是一个 I/O 线程,主要负责 Zookeeper 客户端与服务端之间的网络 I/O 通信;后者是一个事件线程,主要负责对服务端事件进行处理。

客户端整个初始化和启动过程大体分为以下三个步骤:

  1. 设置默认 Watcher
  2. 设置 Zookeeper 服务器地址列表
  3. 创建 ClientCnxn

一次会话的创建过程

初始化阶段

  1. 初始化 Zookeeper 对象。

    调用构造方法实例话一个 Zookeeper 对象,在初始化过程中,会创建一个客户端的 Watcher 管理器:ClientWatchManager(默认实现为 ZKWatchManager)。

  2. 设置会话默认 Watcher。

    如果构造方法中传入了一个 Watcher 对象,那么客户端会将这个对象作为默认 Watcher 保存在 ZKWatchManager 中。

  3. 构造 Zookeeper 服务器地址列表管理器:HostProvider。

    对于构造方法中传入的服务器地址,客户端会将其放在服务器地址列表管理器 HostProvider 中。

  4. 创建并初始化客户端网络连接器:ClientCnxn。

    客户端会首先创建一个网络连接器 ClientCnxn,用来管理客户端和服务端的网络交互。另外,客户端在创建 ClientCnxn 的同时还会初始化客户端的两个核心队列 outgoingQueue 和 pendingQueue,分别作为客户端的请求发送队列和服务端响应的等待队列。

    ClientCnxn 连接器的底层 I/O 处理器是 ClientCnxnSocket,因此在这一步中,客户端还会同时创建 ClientCnxnSocket 处理器。

  5. 初始化 SendThread 和 EventThread。

    客户端会创建两个核心网络线程 SendThread 和 EventThread,前者用于管理客户端和服务端之间的所有网络 I/O,后者则用于进行客户端的事件处理。同时客户端还会将 ClientCnxnSocket 分配给 SendThread 作为底层网络 I/O 处理器,并初始化 EventThread 的待处理事件队列 waitingEvents,用于存放所有等待被客户端处理的事件。

会话创建阶段

  1. 启动 SendThread 和 EventThread。

    SendThread 首先会判断当前客户端的状态,进行一系列清理性工作,为客户端发送 “会话创建” 请求做准备。

  2. 获取一个服务器地址。

    在创建 TCP 连接前,SendThread 首先需要获取一个服务器的目标地址,这通常是从 HostProvider 中随机获取出一个地址,然后委托给 ClientCnxnSocket 去创建与服务器之间的 TCP 连接。

  3. 创建 TCP 连接。

    获取到一个服务器地址后,ClientCnxnSocket 负责和服务器创建一个 TCP 长连接。

  4. 构造 ConnectRequest 请求。

    上一步只是纯粹地从网络 TCP 层面完成了客户端和服务端之间的 Socket 连接,但远未完成 Zookeeper 客户端的会话创建。

    SendThread 会负责根据当前客户端的实际设置,构造一个 ConnectRequest 请求,该请求代表了客户端试图和服务器创建一个会话。同时,Zookeeper 客户端还会进一步将请求包装成网络 I/O 层的 Packet 对象,放入请求发送队列 outgoingQueue 中去。

  5. 发送请求。

    当客户端请求准备完毕后,就可以开始向服务端发送请求了。ClientCnxnSocket 负责从 outgoingQueue 中取出一个待发送的 Packet 对象,将其序列化成 ByteBuffer 后,向服务端进行发送。

响应处理阶段

  1. 接收服务端响应。

    ClientCnxnSocket 接收到服务端的响应后,会首先判断当前客户端状态是否是 “已初始化”,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由 readConnectResult 方法来处理该响应。

  2. 处理 Response。

    ClientCnxnSocket 会对接收到的服务端响应进行反序列化,得到 ConnectResponse 对象,并从中获取到 Zookeeper 服务端分配的会话 sessionId。

  3. 连接成功。

    连接成功后,一方面需要通知 SendThread 线程,进一步对客户端进行会话参数设置,包括 readTimeout 和 connectTimeout 等,并更新客户端状态。另一方面,需要通知地址管理器 HostProvider 当前成功连接的服务器地址。

  4. 生成事件 SyncConnected-None。

    为了能够让上层应用感知到会话的成功创建,SendThread 会生成一个 SyncConnected-None 事件,代表客户端和服务器会话创建成功,并将该事件传递给 EventThread 线程。

  5. 查询 Watcher。

    EventThread 线程收到事件后,会从 ZKWatchManager 管理器中查询出对应的 Watcher,针对 SyncConnected-None 事件,那么就直接找出步骤 2 中存储的默认 Watcher,然后将其放到 EventThread 的 waitingEvents 队列中去。

  6. 处理事件。

    EventThread 不断地从 waitingEvents 队列中取出待处理的 Watcher 对象,然后直接调用该对象的 process 接口方法,以达到触发 Watcher 的目的。

ClientCnxn: 网络 I/O

ClientCnxn 是 Zookeeper 客户端的核心工作类,负责维护客户端和服务端的网络连接并进行一系列网络通信。

Packet

Packet 是 ClientCnxn 内部定义的一个对协议层的封装,作为 Zookeeper 中请求和响应的载体,其中包含了最基本的请求头(requestHeader)、响应头(replyHeader)、请求体(request)、响应体(response)、节点路径(clientPath/serverPath)和注册的 Watcher(watchRegistration)等信息。

针对 Packet 中这么多属性,是否都会在客户端和服务端之间进行网络传输呢?答案是否定的。Packet 的 createBB( ) 方法负责对 Packet 对象进行序列化,最终生成可用于底层网络传输的 ByteBuffer 对象。在这个过程中,只会将 requestHeader、request 和 readOnly 三个属性进行序列化,其余属性都保存在客户端的上下文中,不会进行与服务端之间的网络传输。

outgoingQueue 和 pendingQueue

ClientCnxn 中有两个比较核心的 LinkedList 队列:outgoingQueue 和 pendingQueue。

  • outgoingQueue 是一个请求发送队列,用于存储那些需要发送到服务端的 Packet 集合。

  • pendingQueue 用于存储已经从客户端发送到服务端,但是需要等待服务端响应的 Packet 集合。

ClientCnxnSocket:底层 Socket 通信层

ClientCnxnSocket 定义了底层 Socket 通信的接口。在 Zookeeper 中,其默认实现是 ClientCnxnSocketNIO。该实现类使用 Java 原生的 NIO 接口,其核心是 doIO 逻辑,主要负责对请求的发送和响应接受过程。

请求发送

在正常情况下(即 TCP 连接正常且会话有效),会从 outgoingQueue 队列中取出一个可发送的 Packet 对象,同时生成一个客户端请求序号 XID 并将其设置到 Packet 请求头中,然后将其序列化后发送。那什么样的 Packet 是 可发送 的呢?在 outgoingQueue 中的 Packet 整体上是按照先进先出的顺序被处理的,但是如果检测到客户端和服务端之间正在处理 SASL 权限的话,那么那些不含请求头(requestHeader)的 Packet(例如会话创建请求)是可以被发送的,其余的都无法被发送。

请求发送完毕后,会立即将该 Packet 保存到 pendingQueue 队列中,以便等待服务端响应返回后进行相应的处理。

响应接受

客户端获取到来自服务端的响应数据后,根据不同的客户端请求类型,会进行不同的处理。

  • 如果检测到当前客户端还尚未进行初始化,那么说明当前客户端和服务端之间正在进行会话创建,那么直接将接收到的 ByteBuffer(incomingBuffer)反序列化成 ConnectResponse 对象。
  • 如果当前客户端已经处于正常的会话周期,并且接收到的服务端响应是一个事件,那么 Zookeeper 客户端会将接收到的 ByteBuffer(incomingBuffer)反序列化成 WatcherEvent 对象,并将该事件放入待处理队列中。
  • 如果是一个常规的请求响应(指的是 Create、GetData 和 Exist 等操作请求),那么会从 pendingQueue 队列中取出一个 Packet 来进行相应的处理。Zookeeper 客户端首先会校验服务端响应中包含的 XID 值来确保请求处理的顺序性,然后将接收到的 ByteBuffer(incomingBuffer)反序列化成 Response 对象。

最后,会在 finishPacket 方法中处理 Watcher 注册等逻辑。

SendThread 和 EventThread

SendThread

SendThread 是 ClientCnxn 内部一个核心的 I/O 调度线程,用于管理客户端和服务端之间的所有网络 I/O 操作。

在客户端实际运行过程中,一方面,SendThread 维护了客户端和服务端之间的会话生命周期,其通过在一定的周期频率内向服务端发送一个 PING 包来实现心跳检测。同时,在会话周期内,如果客户端和服务端之间出现 TCP 连接断开的情况,那么就会自动且透明地完成重连操作。

另一方面,SendThread 管理了客户端所有的请求发送和响应接收操作,其将上层客户端 API 操作转换成相应的请求协议并发送到服务端,并完成对同步调用的返回和异步调用的回调。同时,SendThread 还负责将来自服务端的事件传递给 EventThread 去处理。

EventThread

EventThread 是 ClientCnxn 内部另一个核心线程,负责客户端的事件处理。并触发客户端注册的 Watcher 监听。其中的 waitingEvents 队列,用于临时存放那些需要被触发的 Object,包括那些客户端注册的 Watcher 和异步接口中注册的回调器 AsyncCallback。

同时,EventThread 会不断地从 waitingEvents 中取出 Object,识别出具体类型(Watcher 或者 AsyncCallback),并分别调用 process 和 processResult 接口方法来实现对事件的触发和回调。

服务器启动

单机版服务器启动

Zookeeper 服务器的启动,大体可以分为以下五个主要步骤:配置文件解析、初始化数据管理器、初始化网络 I/O 管理器、数据恢复和对外服务。启动流程如下:

预启动

  1. 统一由 QuorumPeerMain 作为启动类。

    无论是单机模式还是集群模式,在 zkServer.cmdzkserver.sh 两个脚本中,都配置了使用 QuorumPeerMain 作为启动入口类。

  2. 解析配置文件 zoo.cfg。

    Zookeeper 首先会进行配置文件的解析,zoo.cfg 配置了 Zookeeper 运行时的基本参数,包括 tickTime、dataDir、clientPort 等参数。

  3. 创建并启动历史文件清理器 DatadirCleanupManager。

    自动清理历史数据文件的机制,包括对事务日志和快照数据文件进行定时清理。

  4. 判断当前是集群模式还是单机模式的启动。

    如果是单机模式,则委托给 ZooKeeperServerMain 进行启动处理。

  5. 再次进行配置文件的解析。

  6. 创建服务器实例 ZookeeperServer。

    ZookeeperServer 是单机版 Zookeeper 服务端最为核心的实体类。Zookeeper 首先会进行服务器实例的创建,接下去的步骤则是对该服务器实例的初始化工作,包括连接器、内存数据库和请求处理器等组件的初始化。

初始化

  1. 创建服务器统计器 ServerStats。

    ServerStats 是服务器运行时的统计器,包含了最基本的运行时信息。如packetsSent(服务端向客户端发送的响应包次数)、packetsReceived(服务端接收到来自客户端的请求包次数)等。

  2. 创建 Zookeeper 数据管理器 FileTxnSnapLog。

    FileTxnSnapLog 是 Zookeeper 上层服务器和底层数据存储之间的对接层,提供了一系列操作数据文件的接口,包括事务日志文件和快照数据文件。Zookeeper 根据 zoo.cfg 文件中解析出来的快照数据目录 dataDir 和 事务日志目录 dataLogDir 来创建 FileTxnSnapLog。

  3. 设置服务器 tickTime 和会话超时时间限制。

  4. 创建 ServerCnxnFactory。

    可以通过配置系统属性 zookeeper.serverCnxnFactory 来指定使用 Zookeeper 自己实现的 NIO 还是使用 Netty 框架来作为 Zookeeper 服务端网络连接工厂。

  5. 初始化 ServerCnxnFactory。

    Zookeeper 首先会初始化一个 Thread,作为整个 ServerCnxnFactory 的主线程,然后再初始化 NIO 服务器。

  6. 启动 ServerCnxnFactory 主线程。

    步骤 5 中已经初始化的主线程 ServerCnxnFactory 的主逻辑(run 方法)。需要注意的是,虽然这里 Zookeeper 的 NIO 服务器已经对外开放端口,客户端能够访问到 Zookeeper 的客户端服务端口 2181,但是此时 Zookeeper 服务器是无法正常处理客户端请求的。

  7. 恢复本地数据。

    每次在 Zookeeper 启动的时候,都需要从本地快照数据文件和事务日志文件中进行数据恢复。

  8. 创建并启动会话管理器 SessionTracker。

    在 Zookeeper 启动阶段,会创建一个会话管理器 SessionTracker。SessionTracker 初始化完毕后,Zookeeper 就会立即开始会话管理器的会话超时检查。

  9. 初始化 Zookeeper 的请求处理链。

    Zookeeper 的请求处理方式是典型的责任链模式的实现,在 Zookeeper 服务器上,会有多个请求处理器依次来处理一个客户端请求。在服务器启动的时候,会将这些请求处理器串联起来形成一个请求处理链。单机版服务器的请求处理链主要包括 PrepRequestProcessor、SyncRequestProcessor 和 FinalRequestProcessor 三个请求处理器。

  10. 注册 JMX 服务。

    Zookeeper 会将服务器运行时的一些信息以 JMX 的方式暴露给外部。

  11. 注册 Zookeeper 服务器实例。

    在步骤 6 中,Zookeeper 已经将 ServerCnxnFactory 主线程启动,但是同时我们提到此时 Zookeeper 依旧无法处理客户端请求,原因就是此时网络层尚不能够访问 Zookeeper 服务器实例。在经过后续步骤的初始化后,Zookeeper 服务器实例已经初始化完毕,只需要注册给 ServerCnxnFactory 即可,之后,Zookeeper 就可以对外提供正常的服务了。

集群版服务器启动

预启动

  1. 统一由 QuorumPeerMain 作为启动类。

  2. 解析配置文件 zoo.cfg。

  3. 创建并启动历史文件清理器 DatadirCleanupManager。

  4. 判断当前是集群模式还是单机模式的启动。

    在集群模式中,由于已经在 zoo.cfg 中配置了多个服务器地址,因此此处选择集群模式启动。

初始化

  1. 创建 ServerCnxnFactory。

  2. 初始化 ServerCnxnFactory。

  3. 创建 Zookeeper 数据管理器 FileTxnSnapLog。

  4. 创建 QuorumPeer 实例。

    QuorumPeer 是集群模式下特有的对象,是 Zookeeper 服务器实例(ZookeeperServer)的托管者,从集群层面看,QuorumPeer 代表了 Zookeeper 集群中的一台机器。在运行期间,QuorumPeer 会不断检测当前服务器实例的运行状态,同时根据情况发起 Leader 选举。

  5. 创建内存数据库 ZKDatabase。

    ZKDatabase 是 Zookeeper 的内存数据库,负责管理 Zookeeper 的所有会话记录以及 DataTree 和 事务日志的存储。

  6. 初始化 QuorumPeer。

    将一些核心的组件注册到 QuorumPeer 中去,包括 FileTxnSnapLog、ServerCnxnFactory 和 ZKDatabase。同时 Zookeeper 还会对 QuorumPeer 配置一些参数,包括服务器地址列表、Leader 选举算法和会话超时时间限制等。

  7. 恢复本地数据。

  8. 启动 ServerCnxnFactory。

Leader 选举

  1. 初始化 Leader 选举。

    Zookeeper 首先会根据自身的 SID(服务器 ID)、lastLoggerdZxid(最新的 ZXID)和当前的服务器epoch(currentEpoch)来生成一个初始化的投票——简单地讲,在初始化过程中,每个服务器都会给自己投票

    在初始化阶段,Zookeeper 会首先创建 Leader 选举所需的网络 I/O 层 QuorumCnxManager,同时启动对 Leader 选举端口的监听,等待集群中其他服务器创建连接。

  2. 注册 JMX 服务。

  3. 检测当前服务器状态。

    上文说到 QuorumPeer 是 Zookeeper 服务器实例的托管者,在运行期间,QuorumPeer 的核心工作就是不断检查当前服务器的状态,并作出相应的处理。在正常情况下,Zookeeper 服务器的状态在 LOOKING、LEADING 和 FOLLOWING/OBSERVING 之间进行切换。而在启动阶段,QuorumPeer 的初始化状态是 LOOKING,因此开始进行 Leader 选举。

  4. Leader 选举。

    简单的讲,Leader 选举过程就是一个集群中所有机器相互之间进行一系列投票,选举产生最合适的机器称为 Leader,同时其他机器称为 Follower 或是 Observer 的集群机器角色初始化过程。

    关于 Leader 选举算法,简而言之,就是集群中哪个机器处理的数据越新(通常我们根据每个服务器处理过的最大 ZXID 来比较确定其数据是否更新),其越有可能成为 Leader。当然,如果集群中所有机器处理的 ZXID 一致的话,那么 SID 最大的服务器成为 Leader

Leader 和 Follower 启动期交互过程

  1. 创建 Leader 服务器和 Follower 服务器。

    完成 Leader 选举后,每个服务器都会根据自己的服务器角色创建相应的服务器实例,并开始进行各自角色的主流程。

  2. Leader 服务器启动 Follower 接收器 LearnerCnxAcceptor。

    在集群运行期间,Leader 服务器需要和所有其余的服务器(以下部分,我们使用 “Learner” 来指代这类机器)保持连接以确定集群的机器存活情况。LearnerCnxAcceptor 接收器用于负责接收所有非 Leader 服务器的连接请求。

  3. Learner 服务器开始和 Leader 建立连接。

    所有的 Learner 服务器在启动完毕后,会从 Leader 选举的投票结果中找到当前集群中的 Leader 服务器,然后与其建立连接。

  4. Leader 服务器创建 LearnerHandler。

    Leader 接收到来自其他机器的连接创建请求后,会创建一个 LearnerHandler 实例。每个 LearnerHandler 实例对应一个 Leader 与 Learner 服务器之间的连接,其负责 Leader 和 Learner 服务器之间几乎所有的消息通信和数据同步。

  5. 向 Leader 注册。

    当和 Leader 建立起连接后,Learner 就会开始向 Leader 进行注册——所谓的注册,其实就是将 Learner 服务器自己的基本信息发送给 Leader 服务器,我们称之为 LearnerInfo,包括当前服务器的 SID 和服务器处理的最新的 ZXID。

  6. Leader 解析 Learner 信息,计算新的 epoch。

    Leader 服务器在接收到 Learner 的基本信息后,会解析出该 Learner 的 SID 和 ZXID,然后根据该 Learner 的 ZXID 解析出其对应的 epoch_of_learner,和当前 Leader 服务器的 epoch_of_leader 进行比较,如果该 learner 的 epoch 更大的话,那么更新 Leader 的 epoch: epoch_of_leader = epoch_of_learner + 1 ,然后,LearnerHandler 会进行等待,直到过半的 Learner 已经向 Leader 进行了注册,同时更新了 epoch_of_leader 之后,Leader 就可以确定当前集群的 epoch 了。

  7. 发送 Leader 状态。

    计算出新的 epoch 之后,Leader 会将该信息以一个 LEADERINFO 消息的形式发送给 Learner,同时等待该 Learner 的响应。

  8. Learner 发送 ACK 消息。

    Follower 在接收到来自 Leader 的 LEADERINFO 消息后,会解析出 epoch 和 ZXID,然后向 Leader 反馈一个 ACKEPOCH 响应。

  9. 数据同步。

    Leader 服务器接收到 Learner 的这个 ACK 消息后,就可以与其进行数据同步了。

  10. 启动 Leader 和 Learner 服务器。

    当有过半的 Learner 已经完成了数据同步,那么 Leader 和 Learner 服务器实例就可以开始启动了。

Leader 和 Follower 启动

  1. 创建并启动会话管理器。

  2. 初始化 Zookeeper 的请求处理链。

    和单机版服务器一样,集群模式下,每个服务器都会在启动阶段串联请求处理链,只是根据服务器角色不同,会有不同的请求处理链路。

  3. 注册 JMX 服务。