zookeeper之watcher

  • Post author:
  • Post category:其他

客户端可以在节点上注册监听器,当特定的事件发生后,zk会通知感兴趣的客户端。

我们在zkCli.sh客户端输入help命令,可以看到只有stat、ls、ls2、get四个命令可以增加watch事件。

我们想某个路径加监听器,ls path [watch],watch代表是否添加watch,1代表是的。

下面我们来看一下java源码。

zookeeper版本:

<dependency>
	<groupId>org.apache.zookeeper</groupId>
	<artifactId>zookeeper</artifactId>
	<version>3.4.10</version>
</dependency>

那我们一般是怎么调用zookeeper的呢?首先定义一个Zookeeper对象

ZooKeeper zk = new ZooKeeper("199.198.956.901:2181", 5000, new ZKDemo());

 多个IP用;隔开。其中 5000是session超时时间,new ZKDemo()是我们自定义的一个Watcher对象

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException {
        this.watchManager = new ZooKeeper.ZKWatchManager();
        LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
        this.watchManager.defaultWatcher = watcher;
        ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
        HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
        this.cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, this.watchManager, getClientCnxnSocket(), canBeReadOnly);
        this.cnxn.start();
}

在初始化zookeeper的时候,会new一个ZooKeeper.ZKWatchManager(),ZooKeeper.ZKWatchManager()是客户端watcher管理器,并把我们刚才传进来的new ZKDemo()赋值给this.watchManager.defaultWatcher,作为客户端watcherManager的默认watcher,这个默认watcher主要用来监听keeperState里面定义的各种链接状态转换事件,

/** @deprecated */
@Deprecated
Unknown(-1),
Disconnected(0),
/** @deprecated */
@Deprecated
NoSyncConnected(1),
SyncConnected(3),
AuthFailed(4),
ConnectedReadOnly(5),
SaslAuthenticated(6),
Expired(-112);

由客户端持有,伴随整个zookeeper生命周期,而不是一次触发即消亡。我们运行一下代码观察一下(IP和端口改成自己的):

public class ZKDemo implements Watcher {
	private static final CountDownLatch cdl = new CountDownLatch(1);

	public static void main(String[] args) throws IOException {
		ZooKeeper zk = new ZooKeeper("199.198.956.901:2181", 5000, new ZKDemo());
		System.out.println(zk.getState());

		try {
			cdl.await();
		} catch (Exception e) {
			System.out.println("ZK Session established.");
		}
	}

	@Override
	public void process(WatchedEvent event) {
		System.out.println("Receive watched event:" + event);
		if (KeeperState.SyncConnected == event.getState()) {
			cdl.countDown();
		}
	}
}

运行结果如下:

同步连接成功,其中state来源于Watcher接口的内部静态枚举类keeperState,type来源于EventType静态枚举类,path是指监控路径,此watcher监控的是链接状态,所以path为null。如果此时我们执行zkServer.sh stop zoo1.cfg:

会即刻被该defaultWatcher感知到:

Receive watched event:WatchedEvent state:Disconnected type:None path:null

 下面我们来模拟一个创建节点的场景

//给路径/zookeeper下的所有子节点设置一个watch
List<String> list = zk.getChildren("/zookeeper", ZKDemo.createWatcher);

 其中true-表示选择defaultWatcher,跟踪进去我们发现源码:

public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException {
    return this.getData(path, watch ? this.watchManager.defaultWatcher : null, stat);
}

此处我们不选择 defaultWatcher,自定义一个watcher,

public class CreateWatcher implements Watcher {
    @Override
    public void process(WatchedEvent watchedEvent) {
        System.out.println("watcher被触发:" + watchedEvent);
    }
}

我们在客户端1给路径 /zookeeper加watcher

public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
	ZooKeeper zk = ZKDemo.getZk();
	System.out.println(zk.getState());
    //给路径/zookeeper设置一个watch
    List<String> list = zk.getChildren("/zookeeper", ZKDemo.createWatcher);
	try {
		cdl.await();
	} catch (Exception e) {
		System.out.println("ZK Session established.");
	}
}

我们在客户端2给路径/zookeeper下创建一个子节点/t1

        ZooKeeper zk = ZKDemo.getZk();
        try {
            Stat stat = zk.exists("/zookeeper/t1",null);
            if (stat != null) {
                zk.delete("/zookeeper/t1",stat.getVersion());//版本如果-1,删除所有版本
            }
            String re = zk.create("/zookeeper/t1","t1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println(re);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

先启动客户端1

CONNECTING
Receive watched event:WatchedEvent state:SyncConnected type:None path:null

 再启动客户端2

Receive watched event:WatchedEvent state:SyncConnected type:None path:null
/zookeeper/t1

此时我们观察客户端1,我们发现

watcher被触发:WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/zookeeper

 由此,我们可以看出客户端1监听到路径/zookeeper下面创建了新的节点。

那么客户端是如何发送watcher,服务端又是如何检测到这个节点添加了watcher,并在执行相关操作的时候触发watcher的呢?

看Zookeeper类我们发现,getChildren,exists,getData三个方法才能对节点加watcher,我们选择getChildren方法跟踪进去看一下,

    public List<String> getChildren(String path, Watcher watcher) throws KeeperException,         InterruptedException {
        PathUtils.validatePath(path);//验证路径,里面定义了路径规则,必须以/开头,长度,不能有特殊字符等。
        ZooKeeper.WatchRegistration wcb = null;
        if (watcher != null) {//如果客户端设置了watcher
            wcb = new ZooKeeper.ChildWatchRegistration(watcher, path);watcher
        }

        String serverPath = this.prependChroot(path);//对path进行加工传给server
        RequestHeader h = new RequestHeader();
        h.setType(8);
        GetChildrenRequest request = new GetChildrenRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);//告诉服务端的本次请求是否带有watcher
        GetChildrenResponse response = new GetChildrenResponse();
        ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb);//提交请求到outgoingQueue,outgoingQueue是一个链表,先进先出,保证请求提交顺序
        if (r.getErr() != 0) {
            throw KeeperException.create(Code.get(r.getErr()), path);//返回错误码
        } else {
            return response.getChildren();//返回该路径下所有子节点集合
        }
    }

我们重点看一下

wcb = new ZooKeeper.ChildWatchRegistration(watcher, path);

 这段代码将watcher和path封装成ChildWatchRegistration对象,我们去看ChildWatchRegistration的父类WatchRegistration就会发现里面有一个register方法

public void register(int rc){
   if(shouldAddWatch(rc)){
       Map watches = getWatches(rc);
       synchronized(watches){
           Set watchers = (Set)watches.get(clientPath);
           if(watchers == null){
               watchers = new HashSet();
               watches.put(clientPath, watchers);
           }
           watchers.add(watcher);
        }
    }
 }

每当我们注册watcher的时候,先通过getWatches(rc)方法

protected Map getWatches(int rc){
    return watchManager.childWatches;
}

从当前zookeeper实例的watcherManager(ZKWatchManager客户端watcher管理器)属性中获取当前事件类型的全部watcher事件,然后添加一个新的watcher进去。childWatches是一个Map结构

private final Map dataWatches;
private final Map existWatches;
private final Map childWatches;

key是path,value是当前path下的所有Watcher(HashSet结构,因此,同一路径下同一类型watcher只能生效一次)。

由此可见,客户端在提交请求的时候,把watcher对象封装成ZooKeeper.ChildWatchRegistration对象并且告诉了服务器是否含有watcher。追踪submitRequest方法,到ClientCnxn.class的queuePacket方法,发现zookeeper把每个请求封装之后丢进了队列中

outgoingQueue.add(packet);

然后唤醒了客户端socket线程,sendThread.getClientCnxnSocket().wakeupCnxn(),ClientCnxnSocketNIO.class中的

doTransport方法,从这个方法中我们可以看出zookeeper底层用了java NIO 的selector模型,

synchronized(this)
{
    selected = selector.selectedKeys();
}

当发现有已经准备好的通道时就调用doIO方法处理请求。zookeeper基于TCP/IP协议将请求传输到服务端,那么服务端是如何解析请求的,又是如何注册watcher的呢?

服务端也有一个类似于ZooKeeper.ZKWatchManager的WatcherManager,在server包下,这个类是服务端用来管理watcher的类,客户端注册的watcher事件都会保存在这个类里面,这个类有两个重要的字段

private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap();
private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap();

watchTable定义了一个路径下的多个不同的watcher,下面我们来看一下这种数据结构如何在实际应用体现

List<String> list1 = zk.getChildren("/zookeeper", ZKDemo.createWatcher);
List<String> list2 = zk.getChildren("/zookeeper", ZKDemo.createWatcher);

这种情况下,我们运行后发现,watcher只被触发了一次

CONNECTING
Receive watched event:WatchedEvent state:SyncConnected type:None path:null
watcher被触发:WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/zookeeper

如果我们向下面这样定义

List<String> list1 = zk.getChildren("/zookeeper", ZKDemo.createWatcher);
List<String> list2 = zk.getChildren("/zookeeper", new CreateWatcher());

 watcher被触发了2次

CONNECTING
Receive watched event:WatchedEvent state:SyncConnected type:None path:null
watcher被触发:WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/zookeeper
watcher被触发:WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/zookeeper

所以同一种watcher其实指的是是否是同一个Watcher对象,通过以上例子我们也知道,如果想给一个路径下放多个watcher,多new几个Watcher对象就可以了。

watch2Paths定义了一个Watcher可以同时监听多个path,我们再来创建一个新的节点/se,我这里直接打开zkCli.sh来创建

zk.getChildren("/zookeeper", ZKDemo.createWatcher);//给路径/zookeeper加Watcher
zk.getChildren("/se", ZKDemo.createWatcher);//给路径/se加Watcher

触发 /zookeeper路径的Watcher

ZooKeeper zk = ZKDemo.getZk();
Stat stat = zk.exists("/zookeeper/t1",null);
if (stat != null) {
   zk.delete("/zookeeper/t1",stat.getVersion());
}
String re = zk.create("/zookeeper/t1","t1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(re);

 触发 /se路径的Watcher

ZooKeeper zk = ZKDemo.getZk();
Stat stat = zk.exists("/se/t1",null);
if (stat != null) {
   zk.delete("/se/t1",stat.getVersion());
}
String re = zk.create("/se/t1","t1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(re);

都运行之后,我们可以观察到

CONNECTING
Receive watched event:WatchedEvent state:SyncConnected type:None path:null
watcher被触发:WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/zookeeper
watcher被触发:WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/se

 由此可见,同一个Watcher对象是可以监听多个path的。

现在我们已经知道server是如何管理我们的各种Watcher事件的,下面我们来看一下是如何触发Watcher事件的。WatcherManager中有个方法是用来触发Watcher的

Set triggerWatch(String path, org.apache.zookeeper.Watcher.Event.EventType type, Set supress)

该方法中先从watchTable中获取该路径下的所有watcher对象watchers,然后再循环将watch2Paths中该watcher下的该path监听删除,并逐步调用process方法

do{
    if(!i$.hasNext())
        break;
    Watcher w = (Watcher)i$.next();
    if(supress == null || !supress.contains(w))
        w.process(e);
} while(true);

 这个process方法是通知响应客户端,watcher已经被触发了,客户端收到通知,将watcher移除,并将这些Watches进行封装放入一个队列中,客户端有一个专用的线程去轮询这个队列,依次执行用户自定义的process方法。

参考文献:https://blog.csdn.net/dd864140130/article/details/50299687


版权声明:本文为qq_35689573原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。