Zookeeper Watcher 机制

Watcher(事件监听器),是Zookeeper中的一个很重要的特性。Zookeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制是Zookeeper实现分布式协调服务的重要特性。

客户端注册 Watcher

以 getData() 为例:

封装 Watcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);

// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
// 封装 Watcher 成 WatchRegistration,此处是 DataWatchRegistration
wcb = new DataWatchRegistration(watcher, clientPath);
}

final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
// 仅仅标记是否需要注册 watcher
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
// 请求
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
// 复制节点状态到传进入的 stat 中
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
// 封装 WatchRegistration 成 Packet
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}

queuePacket() 方法会将 Packet 添加到发送队列中,随后 Zookeeper 客户端就会发送这个请求并等待返回。

由客户端 SendThread 线程的 readResponse 方法接收响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void finishPacket(Packet p) {
if (p.watchRegistration != null) {
// 取出 Watcher 并注册到 ZKWatchManager 的 dataWatches
p.watchRegistration.register(p.replyHeader.getErr());
}

if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}

public void register(int rc) {
if (shouldAddWatch(rc)) {
// return watchManager.dataWatches;
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}

ZKWatchManager 的 dataWatches 是一个 Map<String, Set> 类型的数据结构,用于将数据节点和 Watcher 对象一一映射后管理起来。

传输对象

有一个问题:如果每次请求都带着 Watcher 对象传输,那么服务端肯定会出现内存紧张或者其他性能问题。Zookeeper 怎么做的呢?

上面提到把 WatchRegistration 封装到 Packet 对象中去,但是底层实际的网络传输序列化过程中,并没有将 WatchRegistration 对象完全的序列化到
底层字节数组中。为了证实这点,可以看下 Packet 内部的序列化过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
requestHeader.serialize(boa, "header");
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception", e);
}
}

可以看到只会将 requestHeader 和 readOnly/request 两个属性进行序列化,而 WatchRegistration 并没有序列化到底层字节数组中。

总结

  1. 标记 request,封装 Watcher 成 WatchRegistration 对象。
  2. 封装 Packet 对象,Packet 可以看成最小的通信协议单元,任何需要传输的对象都需要封装成 Packet。
  3. 发送 request,但是并没有传输 Watcher。
  4. 接收响应,从 Packet 中取出 Watcher 并注册到 ZKWatchManager 的 Map<String, Set> 中。

服务端处理 Watcher

上面说到客户端并没有将 Watcher 传递到服务端,那么服务端怎么进行处理的呢?

ServerCnxn 存储

服务端接收到客户端的请求后,会在 FinalRequestProcessor 的 processRequest 方法中进行是否需要注册 Watcher 的判断,代码片段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);
Stat stat = new Stat();
// 获取数据,根据 getDataRequest.getWatch() 来判断是否需要注册 Watcher
// 需要的话传入 ServerCnxn 对象
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
}

ServerCnxn 是客户端和服务端之间的连接接口,代表客户端和服务端之间的连接。

ServerCnxn 实现了 Watcher 接口,因此可以看成是一个 Watcher 对象。

上面 ZKDatabase.getData() 会调用 DataTree.getData() 方法,相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 对应数据变更
*/
private final WatchManager dataWatches = new WatchManager();
/**
* 对应子节点变更
*/
private final WatchManager childWatches = new WatchManager();

public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
// 根据路径获取节点
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
// 保存 path 和 watcher 到 WatchManager
dataWatches.addWatch(path, watcher);
}
return n.data;
}
}

WatchManager 是服务端 Watcher 的管理者,内部用 watchTable 和 watch2Paths
从两个维度来管理 Watcher,其相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private final HashMap<String, HashSet<Watcher>> watchTable =
new HashMap<String, HashSet<Watcher>>();

private final HashMap<Watcher, HashSet<String>> watch2Paths =
new HashMap<Watcher, HashSet<String>>();

public synchronized void addWatch(String path, Watcher watcher) {
HashSet<Watcher> list = watchTable.get(path);
if (list == null) {
// don't waste memory if there are few watches on a node
// rehash when the 4th entry is added, doubling size thereafter
// seems like a good compromise
list = new HashSet<Watcher>(4);
watchTable.put(path, list);
}
list.add(watcher);

HashSet<String> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
paths = new HashSet<String>();
watch2Paths.put(watcher, paths);
}
paths.add(path);
}

同时 WatchManager 还负责 Watcher 事件的触发,并移除已经被触发的 Watcher,可见 Zookeeper 的事件监听是一次性的。

这里的 WatchManager 是一个统称,在服务端,DataTree 中会托管两个 WatchManager:dataWatches 和 childWatches,分别对应数据变更 Watcher 和子节点变更 Watcher。这里因为是 getData() 方法,所以会保存到 dataWatches 中。

Watcher 触发

NodeDataChanged 事件的触发条件是 “Watcher 监听的对应数据节点的数据内容发生变更”,也就是 DataTree#setData() 方法,代码如下:

1
2
3
4
5
6
7
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
// ...
// 触发相关事件
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}

在对指定的数据节点更新后,通过调用 WatchManager 的 triggerWatch 方法来触发相关的事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public Set<Watcher> triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
}

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
// 从 watchTable 中移除
watchers = watchTable.remove(path);
// 如果不存在 watcher,则直接返回
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
// 从 watch2Paths 中移除
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
// 触发 Watcher
w.process(e);
}
return watchers;
}

这里的 w 是之前存储的 ServerCnxn,其 process 方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}

// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();

sendResponse(h, e, "notification");
}
  • 在请求头中标记 ”-1“,表明当前是一个通知。
  • 将 WatchedEvent 包装成 WatcherEvent,以便进行网络传输序列化。
  • 向客户端发送通知。

可见 process 本质上并不是处理客户端 Watcher 真正的业务逻辑,而是借助当前客户端连接的 ServerCnxn 对象来实现对客户端的 WatchedEvent 传递,真正的客户端 Watcher 回调与业务逻辑执行都是在客户端。

总结

无论是 dataWatchers 还是 clildWatchers,事件触发逻辑都是一样的,基本步骤如下:

  1. 封装 WatchedEvent

    首先将通知状态(KeeperState)、事件类型(EventType)以及节点路径(Path)封装成一个WatchedEvent 对象。

  2. 查询 Watcher

    根据节点路径从 watchTable 中取出 Watcher,并从 watchTable 和 watch2Paths 中移除该 Watcher,说明 Watcher 在服务端是一次性的,触发一次就失效了。

  3. 调用 process 方法来触发 Watcher

    process 本质上并不是处理客户端 Watcher 真正的业务逻辑,而是借助当前客户端连接的 ServerCnxn 对象来实现对客户端的 WatchedEvent 传递,真正的客户端 Watcher 回调与业务逻辑执行都是在客户端。

客户端回调 Watcher

SendThread 接收事件通知

对于一个来自服务端的响应,客户端都是由 SendThread.readResponse() 方法来统一进行处理的。如果响应头 replyHdr 中标识了 XID 为 -1,表明这是一个通知类型的响应。代码片段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
if (replyHdr.getXid() == -1) {
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
// 反序列化
event.deserialize(bbia, "response");

// convert from a server path to a client path
// chrootPath 处理
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}

// 还原 WatchedEvent
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}

// 将 WatchedEvent 交给 EventThread 线程
eventThread.queueEvent( we );
return;
}

处理逻辑大致如下:

  1. 反序列化

    客户端接收到响应后,首先会将字节流转换成 WatcherEvent 对象。

  2. 处理 chrootPath

    如果客户端设置了 chrootPath 属性,那么对于服务端传过来的节点路径进行 chrootPath 处理,生成客户端的一个相对节点路径。例如 chrootPath 为 /app1,那么针对服务端传递的 /app1/locks,经过 chrootPath 处理,就会变成一个相对路径 :/locks。

  3. 还原 WatchedEvent

    将 WatcherEvent 对象还原成 WatchedEvent 对象。

  4. 回调 Watcher。

    最后将 WatchedEvent 对象交给一个 EventThread 线程,在下一个轮询周期中进行 Watcher 回调。

EventThread 处理事件通知

EventThread 线程是 Zookeeper 客户端中专门用来处理服务端通知事件的核心,上面说到 SendThread 接收到服务端的通知事件后,会通过 EventThread.queueEvent() 方法将事件传递给 EventThread 线程,其逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();

// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}

首先会根据该通知事件,从 ZKWatcherManager 中取出所有相关的 Watcher:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
{
Set<Watcher> result = new HashSet<Watcher>();

switch (type) {
// ...
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
// ...
return result;
}
}

final private void addTo(Set<Watcher> from, Set<Watcher> to) {
if (from != null) {
to.addAll(from);
}
}

客户端根据 EventType 会从相应的 Watcher 存储(即 dataWatchers、existWatchers 或 childWatchers 中的一个或多个,本例中就是从 dataWatchers 和 existWatchers 两个存储中获取)中去除对应的 Watcher。同样表明 Watcher 是一次性的。

获取到相关的 Watcher 后,会将其放入到 waitingEvents 这个队列中去。waitingEvents 是一个待处理 Watcher 的队列,EventThread 的 run 方法会不断对该队列进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
}
// ...
}

private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
}
}
// ...
}

可以看到 EventThread 线程每次都会从 waitingEvents 队列中取出一个 Watcher,并进行串行同步处理。processEvent 方法中的这个的 Watcher 才是之前客户端真正注册的 Watcher,调用其 process 方法就可以实现 Watcher 的回调了。

事件监听流程总结

  1. 客户端封装 Watcher,封装传输对象 Packet 后发送请求。
  2. 客户端 SendThread 线程接受响应,由 ZKWatchManager 的 dataWatches 进行 Watcher 管理。
  3. 服务端对应的 Watcher 对象是 ServerCnxn,它代表客户端和服务端之间的连接。
  4. WatchManager 是服务端 Watcher 的管理者,内部用 watchTable 和 watch2Paths
    从两个维度来管理 Watcher。
  5. Watcher 触发时,服务端并不真正执行监听逻辑。而是借助当前客户端连接的 ServerCnxn 对象来实现对客户端的 WatchedEvent 传递,真正的客户端 Watcher 回调与业务逻辑执行都是在客户端。
  6. 客户端 SendThread 接收事件通知,在一些处理后将 WatchedEvent 对象交给一个 EventThread 线程。
  7. EventThread 线程是 Zookeeper 客户端中专门用来处理服务端通知事件的核心,首先根据该通知事件,从 ZKWatcherManager 中取出所有相关的 Watcher。
  8. 客户端根据 EventType 会从相应的 Watcher 存储(即 dataWatchers、existWatchers 或 childWatchers 中的一个或多个)中去除对应的 Watcher。
  9. 获取到相关的 Watcher 后,会将其放入到 waitingEvents 这个队列中去。waitingEvents 是一个待处理 Watcher 的队列。
  10. EventThread 线程每次都会从 waitingEvents 队列中取出一个 Watcher,并进行串行同步处理。processEvent 方法中的这个的 Watcher 才是之前客户端真正注册的 Watcher,调用其 process 方法就可以实现 Watcher 的回调了。

Watcher 特性总结

  • 一次性

    无论客户端还是服务端,一旦一个 Watcher 被触发,Zookeeper 都会从相应的存储中移除该 Watcher。这样的设计有效的减轻了服务端的压力。

  • 客户端串行执行

    客户端 Watcher 回调是一个串行同步的过程,这为我们保证了顺序。

  • 轻量

    WatchedEvent 是 Zookeeper 整个 Watcher 通知机制的最小通知单元,这个数据结构只包含三个部分:通知状态、事件类型和节点路径。也就是说,Watcher 通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。例如 NodeDataChanged 事件只会通知客户端节点数据发生了变更,而对于原始数据和变更后的数据都无法从通知中获取,而是需要客户端主动重新去获取数据。

    另外客户端注册 Watcher 的时候,并不会把客户端真实的 Watcher 对象传递到服务端,仅仅只是在客户端请求中用 boolean 类型属性标记,同时客户端也仅仅保存了当前连接的 ServerCnxn 对象。

    如此轻量的 Watcher 机制设计,在网络开销和服务端内存开销上都是非常廉价的。