zookeeper客户端Watcher管理
在zookeeper的设计中,有分布式通知的功能点,方式则是通过Watcher机制。基本的模式和回调一致,但是其中有些设计巧妙的地方。回调的方式,大部分流程都是如下:
- 客户端向服务端注册一个Watcher监听
- 当服务端的一些执行事件发生后,触发这个Watcher执行。
整个过程包含了以下几个部分:
- 客户端线程
- 客户端对Watcher的管理,即图中的WatchManager
- zookeeper服务端
- zookeeper服务端对监听的客户端的管理
在具体的工作流程中:
- 客户端向zk服务器注册Watcher,同时将Watcher对象存储在客户端的Watcher管理器中
- 服务器收到客户端注册请求,将这个信息记录(A客户端需要知道某事件的发生)
- 当服务器上对应事件发生,获取到监听该事件的客户端集合,依次通知他们
- 客户端收到服务端通知后,做相应的处理
Watcher接口
在zk中,接口类Watcher是一个标准的事件处理器接口,定义了事件通知相关的逻辑。
public interface Watcher { abstract public void process(WatchedEvent event);}
process方法是Watcher接口中的一个回调方法,其中WatchEvent对象则表示了对事件的封装,包含了通知状态、事件类型和节点信息
public class WatchedEvent { final private KeeperState keeperState; final private EventType eventType; private String path;}
第一部分:客户端向服务端注册Watcher过程
以getData方法为例,基本的流程如下:
zk初始化过程中zkWatchManager对象
在初始化ZooKeeper中,传递的Watcher会作为整个ZooKeeper会话期间的默认Watcher,会一直保存在ZKWatchManager的defaultWatcher中。同时,zk客户端也可以通过getData、getChildren、exist接口来向zk服务器来注册新的Watcher,同时ZKWatchManager则有三个集合,用来保存已经注册成功的Watcher,都是Set<String,Map>的结构,key是节点路径
封装WatchRegistration和构造Request对象
当客户端通过getData、getChildren、exist接口来注册新的Watcher的时候,会首先执行封装WatchRegistration和构造Request请求,以getData为例:
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (stat != null) { DataTree.copyStat(response.getStat(), stat); } return response.getData(); }
- WatchRegistration相关于Watcher的注册信息,基本信息是需要监听的节点clientPath,执行的watcher对象
- 封装Watcher到Request对象中,在发送的时候通知服务端,注意:Request对象中只记录了是否需要注册watcher信息,具体是哪个Watcher并不需要,这也是zk设计巧妙的地方
public class GetDataRequest implements Record { private String path; private boolean watch; public GetDataRequest() { } }
交给ClientCnxn去构造Packet对象
Packet对象是zk Client和Server之间通信的最小单元,用于进行客户端和服务端之间的网络传输,任何需要传输的对象都需要被包装成Packet对象。
Packet(RequestHeader requestHeader, ReplyHeader replyHeader, Record request, Record response, WatchRegistration watchRegistration, boolean readOnly) { this.requestHeader = requestHeader; this.replyHeader = replyHeader; this.request = request; this.response = response; this.readOnly = readOnly; this.watchRegistration = watchRegistration; }
将构造好的Packet对象放入到Packet对象中,这些Watcher就可以随着请求发送到服务端,让后返回给客户端进行回调。但是,如果采用上面的做法,是有问题的,试想,如果所有的Watcher对象都被传递给Server,那么Server肯定会内存爆炸。zk在设计的时候,虽然将WatchRegistration封装到Packet对象中,但是并没有传递给Server,具体的做法在SendThread发送的时候体现。
在SendThread中会阻塞在doTransport中,在上面将packet加入到outgoingQueue时,也调用了select.wakeup来唤醒阻塞线程,在doTransport中调用ClientCnxn的doIO方法,以原生的NIO示例,即ClinetCnxnSocketNIO实现为例:
void doIO(List
pendingQueue, LinkedList outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { //.... if (sockKey.isWritable()) { synchronized(outgoingQueue) { Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()); if (p != null) { updateLastSend(); // If we already started writing p, p.bb will already exist if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } sock.write(p.bb); if (!p.bb.hasRemaining()) { sentCount++; outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } } } //... } 可以看到,上面的逻辑主要是找到Packe对象,然后构造发送的ByteBuffer,即Packet的bb对象,重点则在构造的过程creatBB
public void createBB() { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); boa.writeInt(-1, "len"); // We'll fill this in later if (requestHeader != null) { requestHeader.serialize(boa, "header"); } if (request instanceof ConnectRequest) { request.serialize(boa, "connect"); // append "am-I-allowed-to-be-readonly" flag boa.writeBool(readOnly, "readOnly"); } else if (request != null) { request.serialize(boa, "request"); } baos.close(); this.bb = ByteBuffer.wrap(baos.toByteArray()); this.bb.putInt(this.bb.capacity() - 4); this.bb.rewind(); } catch (IOException e) { LOG.warn("Ignoring unexpected exception", e); } }
上面的代码中,bb并没有将Packet的所有属性序列化,仅仅序列化了requestHeader和request,在上面构造Request对象过程中也说明了,reqeust并不持有Watcher对象,仅仅有一个标志位
private boolean watch;
向Server表示是否需要watch。分析到此,就会发现,在通信过程中,并没有把WatchRegistration对象序列化,不会增大网络传输消耗。那么WatchRegistration有什么用?试想下,如果服务端注册成功Watcher,回复客户端后,客户端怎么知道是哪一个Watcher对象监听了这个事件?
这个地方还需要关注下,在处理完成后Packet对象从outgoingQueue队列中去掉,加入了pendingQueue中
客户端处理Response
通过SendThread的doIO方法,从SocketChannel中读取到服务端的响应,读取到incomingBuffer中,交给readResponse方法去处理,并且从finishPacket中将Watcher注册到zkWatcherManager中
private void finishPacket(Packet p) { if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); } if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } } public void register(int rc) { if (shouldAddWatch(rc)) { Map
> watches = getWatches(rc); synchronized(watches) { Set watchers = watches.get(clientPath); if (watchers == null) { watchers = new HashSet (); watches.put(clientPath, watchers); } watchers.add(watcher); } } }
第二部分:事件通知,Watcher的触发
针对第一部分,客户端已经建立了对应的watcher,而Server的处理本篇不介绍了(写zk启动的时候分析过),接下来看一下如何触发watcher,以服务端收到setData()请求为例,当setData请求发生时,最终会调用WatchManager的triggerWatch方法,然后调用process方法触发Watcher,这部分不做具体说明,在服务端watcher管理中会写(还没整理完),关键的代码是,发送时设置了WatcherEvent的ReplyHeader标记为-1,表示这是一个通知
@Overridesynchronized public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); sendResponse(h, e, "notification");}
重点关注的是Client的处理,对于服务器的报文,依然是由SendThread来处理,整体逻辑如下:
前面的几步相对简单,代码如下:
void readResponse(ByteBuffer incomingBuffer) throws IOException { if (replyHdr.getXid() == -1) { // -1 means notification WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); //... WatchedEvent we = new WatchedEvent(event); //... eventThread.queueEvent( we ); return; }
EventThread是zk客户端专门用来处理服务端通知事件的线程
public void queueEvent(WatchedEvent event) { if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); // materialize the watchers based on the event WatcherSetEventPair pair = new WatcherSetEventPair( watcher.materialize(event.getState(), event.getType(), event.getPath()), event); // queue the pair (watch set & event) for later processing waitingEvents.add(pair); }
在入队列的时候,由watchManager依据事件的类型,状态和节点信息获取到监听该事件的Set
@Override public Setmaterialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath){ //... case NodeDataChanged: case NodeCreated: synchronized (dataWatches) { addTo(dataWatches.remove(clientPath), result); } synchronized (existWatches) { addTo(existWatches.remove(clientPath), result); } break; //... }
然后包装成了WatchSetEventPair对象,即WatchedEvent和Set的关系元组,直接remove掉了,这个就是后面说的一次性的体现
EventThread线程从队列中取出事件后,交给对应的Watcher去回调
private void processEvent(Object event) { try { if (event instanceof WatcherSetEventPair) { // each watcher will process the event WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } }
至此,服务器事件变更通知结束。
注意:
在整个Watcher的注册过程中,zk在设计的时候,都是一次性的:一旦Watcher被触发,zk都会将其从对应的储存中移除。这就要开发人员再使用Watcher的时候反复注册。这样做的好处是有效减轻服务器压力。如果Watcher一直有效,那么每次服务端都会想大量的客户端发送事件通知,这个对性能消耗是比较严重的。
同时,watcher的设计也保持了轻量的特性,在与Server通信过程中,整个通知结构只包含三部分:通知状态、事件类型和节点信息。
参考:
从PAXOS到ZOOKEEPER分布式一致性原理与实践