博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
zookeeper客户端Watcher管理
阅读量:4315 次
发布时间:2019-06-06

本文共 10458 字,大约阅读时间需要 34 分钟。

zookeeper客户端Watcher管理

在zookeeper的设计中,有分布式通知的功能点,方式则是通过Watcher机制。基本的模式和回调一致,但是其中有些设计巧妙的地方。回调的方式,大部分流程都是如下:

  • 客户端向服务端注册一个Watcher监听
  • 当服务端的一些执行事件发生后,触发这个Watcher执行。

OL4hj7R.jpg

整个过程包含了以下几个部分:

  • 客户端线程
  • 客户端对Watcher的管理,即图中的WatchManager
  • zookeeper服务端
  • zookeeper服务端对监听的客户端的管理

在具体的工作流程中:

  1. 客户端向zk服务器注册Watcher,同时将Watcher对象存储在客户端的Watcher管理器中
  2. 服务器收到客户端注册请求,将这个信息记录(A客户端需要知道某事件的发生)
  3. 当服务器上对应事件发生,获取到监听该事件的客户端集合,依次通知他们
  4. 客户端收到服务端通知后,做相应的处理

Watcher接口

在zk中,接口类Watcher是一个标准的事件处理器接口,定义了事件通知相关的逻辑。

public interface Watcher {      abstract public void process(WatchedEvent event);}

process方法是Watcher接口中的一个回调方法,其中WatchEvent对象则表示了对事件的封装,包含了通知状态、事件类型和节点信息

public class WatchedEvent {    final private KeeperState keeperState;    final private EventType eventType;    private String path;}

riT1323.jpg

第一部分:客户端向服务端注册Watcher过程

以getData方法为例,基本的流程如下:

ywdcJ3l.jpg

  • zk初始化过程中zkWatchManager对象

    s4mTXnM.jpg

    在初始化ZooKeeper中,传递的Watcher会作为整个ZooKeeper会话期间的默认Watcher,会一直保存在ZKWatchManager的defaultWatcher中。同时,zk客户端也可以通过getData、getChildren、exist接口来向zk服务器来注册新的Watcher,同时ZKWatchManager则有三个集合,用来保存已经注册成功的Watcher,都是Set<String,Map>的结构,key是节点路径

  • 封装WatchRegistration和构造Request对象

    当客户端通过getData、getChildren、exist接口来注册新的Watcher的时候,会首先执行封装WatchRegistration和构造Request请求,以getData为例:

    public byte[] getData(final String path, Watcher watcher, Stat stat)  throws KeeperException, InterruptedException  {      final String clientPath = path;      PathUtils.validatePath(clientPath);      // the watch contains the un-chroot path      WatchRegistration wcb = null;      if (watcher != null) {          wcb = new DataWatchRegistration(watcher, clientPath);      }      final String serverPath = prependChroot(clientPath);      RequestHeader h = new RequestHeader();      h.setType(ZooDefs.OpCode.getData);      GetDataRequest request = new GetDataRequest();      request.setPath(serverPath);      request.setWatch(watcher != null);      GetDataResponse response = new GetDataResponse();      ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);      if (r.getErr() != 0) {          throw KeeperException.create(KeeperException.Code.get(r.getErr()),                  clientPath);      }      if (stat != null) {          DataTree.copyStat(response.getStat(), stat);      }      return response.getData();  }
    1. WatchRegistration相关于Watcher的注册信息,基本信息是需要监听的节点clientPath,执行的watcher对象
    2. 封装Watcher到Request对象中,在发送的时候通知服务端,注意:Request对象中只记录了是否需要注册watcher信息,具体是哪个Watcher并不需要,这也是zk设计巧妙的地方

    public class GetDataRequest implements Record {
    private String path; private boolean watch; public GetDataRequest() {
    } }
  • 交给ClientCnxn去构造Packet对象

    Packet对象是zk Client和Server之间通信的最小单元,用于进行客户端和服务端之间的网络传输,任何需要传输的对象都需要被包装成Packet对象。

    Packet(RequestHeader requestHeader, ReplyHeader replyHeader,         Record request, Record response,         WatchRegistration watchRegistration, boolean readOnly) {      this.requestHeader = requestHeader;      this.replyHeader = replyHeader;      this.request = request;      this.response = response;      this.readOnly = readOnly;      this.watchRegistration = watchRegistration;  }

    将构造好的Packet对象放入到Packet对象中,这些Watcher就可以随着请求发送到服务端,让后返回给客户端进行回调。但是,如果采用上面的做法,是有问题的,试想,如果所有的Watcher对象都被传递给Server,那么Server肯定会内存爆炸。zk在设计的时候,虽然将WatchRegistration封装到Packet对象中,但是并没有传递给Server,具体的做法在SendThread发送的时候体现。

    在SendThread中会阻塞在doTransport中,在上面将packet加入到outgoingQueue时,也调用了select.wakeup来唤醒阻塞线程,在doTransport中调用ClientCnxn的doIO方法,以原生的NIO示例,即ClinetCnxnSocketNIO实现为例:

    void doIO(List
    pendingQueue, LinkedList
    outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { //.... if (sockKey.isWritable()) { synchronized(outgoingQueue) { Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()); if (p != null) { updateLastSend(); // If we already started writing p, p.bb will already exist if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } sock.write(p.bb); if (!p.bb.hasRemaining()) { sentCount++; outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } } } //... }

    可以看到,上面的逻辑主要是找到Packe对象,然后构造发送的ByteBuffer,即Packet的bb对象,重点则在构造的过程creatBB

    public void createBB() {      try {          ByteArrayOutputStream baos = new ByteArrayOutputStream();          BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);          boa.writeInt(-1, "len"); // We'll fill this in later          if (requestHeader != null) {              requestHeader.serialize(boa, "header");          }          if (request instanceof ConnectRequest) {              request.serialize(boa, "connect");              // append "am-I-allowed-to-be-readonly" flag              boa.writeBool(readOnly, "readOnly");          } else if (request != null) {              request.serialize(boa, "request");          }          baos.close();          this.bb = ByteBuffer.wrap(baos.toByteArray());          this.bb.putInt(this.bb.capacity() - 4);          this.bb.rewind();      } catch (IOException e) {          LOG.warn("Ignoring unexpected exception", e);      }  }

    上面的代码中,bb并没有将Packet的所有属性序列化,仅仅序列化了requestHeader和request,在上面构造Request对象过程中也说明了,reqeust并不持有Watcher对象,仅仅有一个标志位 private boolean watch;向Server表示是否需要watch。

    分析到此,就会发现,在通信过程中,并没有把WatchRegistration对象序列化,不会增大网络传输消耗。那么WatchRegistration有什么用?试想下,如果服务端注册成功Watcher,回复客户端后,客户端怎么知道是哪一个Watcher对象监听了这个事件?

    这个地方还需要关注下,在处理完成后Packet对象从outgoingQueue队列中去掉,加入了pendingQueue中

  • 客户端处理Response

    通过SendThread的doIO方法,从SocketChannel中读取到服务端的响应,读取到incomingBuffer中,交给readResponse方法去处理,并且从finishPacket中将Watcher注册到zkWatcherManager中

    private void finishPacket(Packet p) {      if (p.watchRegistration != null) {          p.watchRegistration.register(p.replyHeader.getErr());      }      if (p.cb == null) {          synchronized (p) {              p.finished = true;              p.notifyAll();          }      } else {          p.finished = true;          eventThread.queuePacket(p);      }  }  public void register(int rc) {      if (shouldAddWatch(rc)) {          Map
    > watches = getWatches(rc); synchronized(watches) { Set
    watchers = watches.get(clientPath); if (watchers == null) { watchers = new HashSet
    (); watches.put(clientPath, watchers); } watchers.add(watcher); } } }

    sho0aiB.jpg

第二部分:事件通知,Watcher的触发

针对第一部分,客户端已经建立了对应的watcher,而Server的处理本篇不介绍了(写zk启动的时候分析过),接下来看一下如何触发watcher,以服务端收到setData()请求为例,当setData请求发生时,最终会调用WatchManager的triggerWatch方法,然后调用process方法触发Watcher,这部分不做具体说明,在服务端watcher管理中会写(还没整理完),关键的代码是,发送时设置了WatcherEvent的ReplyHeader标记为-1,表示这是一个通知

@Overridesynchronized public void process(WatchedEvent event) {    ReplyHeader h = new ReplyHeader(-1, -1L, 0);    if (LOG.isTraceEnabled()) {        ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,                                 "Deliver event " + event + " to 0x"                                 + Long.toHexString(this.sessionId)                                 + " through " + this);    }    // Convert WatchedEvent to a type that can be sent over the wire    WatcherEvent e = event.getWrapper();    sendResponse(h, e, "notification");}

重点关注的是Client的处理,对于服务器的报文,依然是由SendThread来处理,整体逻辑如下:

j8TL5m1.jpg

前面的几步相对简单,代码如下:

void readResponse(ByteBuffer incomingBuffer) throws IOException {       if (replyHdr.getXid() == -1) {            // -1 means notification            WatcherEvent event = new WatcherEvent();            event.deserialize(bbia, "response");            //...            WatchedEvent we = new WatchedEvent(event);            //...            eventThread.queueEvent( we );            return;        }

EventThread是zk客户端专门用来处理服务端通知事件的线程

public void queueEvent(WatchedEvent event) {        if (event.getType() == EventType.None                && sessionState == event.getState()) {            return;        }        sessionState = event.getState();        // materialize the watchers based on the event        WatcherSetEventPair pair = new WatcherSetEventPair(                watcher.materialize(event.getState(), event.getType(),                        event.getPath()),                        event);        // queue the pair (watch set & event) for later processing        waitingEvents.add(pair);    }

在入队列的时候,由watchManager依据事件的类型,状态和节点信息获取到监听该事件的Set

@Override    public Set
materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath){ //... case NodeDataChanged: case NodeCreated: synchronized (dataWatches) { addTo(dataWatches.remove(clientPath), result); } synchronized (existWatches) { addTo(existWatches.remove(clientPath), result); } break; //... }

然后包装成了WatchSetEventPair对象,即WatchedEvent和Set的关系元组,直接remove掉了,这个就是后面说的一次性的体现

EventThread线程从队列中取出事件后,交给对应的Watcher去回调

private void processEvent(Object event) {      try {          if (event instanceof WatcherSetEventPair) {              // each watcher will process the event              WatcherSetEventPair pair = (WatcherSetEventPair) event;              for (Watcher watcher : pair.watchers) {                  try {                      watcher.process(pair.event);                  } catch (Throwable t) {                      LOG.error("Error while calling watcher ", t);                  }              }          }

至此,服务器事件变更通知结束。

注意:

在整个Watcher的注册过程中,zk在设计的时候,都是一次性的:一旦Watcher被触发,zk都会将其从对应的储存中移除。这就要开发人员再使用Watcher的时候反复注册。这样做的好处是有效减轻服务器压力。如果Watcher一直有效,那么每次服务端都会想大量的客户端发送事件通知,这个对性能消耗是比较严重的。

同时,watcher的设计也保持了轻量的特性,在与Server通信过程中,整个通知结构只包含三部分:通知状态、事件类型和节点信息。

参考:

从PAXOS到ZOOKEEPER分布式一致性原理与实践

转载于:https://www.cnblogs.com/kakaxisir/p/6805150.html

你可能感兴趣的文章
微信分享
查看>>
《数据结构》第1章:绪论
查看>>
基于域名的虚拟主机(最常用)
查看>>
第八讲 shiro 整合 ssm
查看>>
Lucene
查看>>
[LeetCode] 83. Remove Duplicates from Sorted List 移除有序链表中的重复项
查看>>
CNN反卷积理解
查看>>
chrome 中firstChild老是出错
查看>>
Java 7 新的 try-with-resources 语句,自动资源释放
查看>>
封装一个函数, 查看数字在数组中是否出现过, 如果出现过就返回数字在数组中的位置,没有出现过返回-1;...
查看>>
查看用户登录情况
查看>>
双栈排序-NOIP2008T4
查看>>
C#多线程及GDI(Day 23)
查看>>
static关键字
查看>>
Oracle the network adapter could not establish the connection
查看>>
powerdesigner 不显示表字段只显示表名
查看>>
《分布式任务调度平台XXL-JOB》
查看>>
SQL分布式查询、跨数据库查询
查看>>
python国内豆瓣源
查看>>
redux、immutablejs和mobx性能对比(三)
查看>>