zookeeper – zkclient的使用

  • Post author:
  • Post category:其他


1. 简介



zookeeper-体验原生api

这篇文章中,我们体验了一下官方的原生API,有没有觉得API很难用呢。

zookeeper原生客户端的不足之处:

(1)Zookeeper的Watcher是一次性的,每次触发之后都需要重新进行注册;

(2)Session超时之后没有实现重连机制;

(3)异常处理繁琐,Zookeeper提供了很多异常,对于开发人员来说可能根本不知道该如何处理这些异常信息;

(4)只提供了简单的byte[]数组的接口,没有提供针对对象级别的序列化;

(5)创建节点时如果节点存在抛出异常,需要自行检查节点是否存在;

(6)删除节点无法实现级联删除;

为此,开源界提供了两个好用的库,一个是zkclient,另一个是Curator。

1.1 zkclient

ZkClient 在 Zookeeper原生API接口的基础上进行了包装,更便于开发人员使用。内部实现了Session超时重连,Watcher反复注册等功能。

虽然ZkClient对原生API进行了封装,但也有它自身的不足之处:

  • 几乎没有参考文档;
  • 异常处理简化(抛出RuntimeException);
  • 重试机制比较难用;
  • 没有提供各种使用场景的实现;

1.2 Curator

Curator是Netflix公司开源的一套Zookeeper客户端框架,和ZkClient一样,解决了非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等。目前已经成为Apache的顶级项目。另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。

除此之外,Curator中还提供了Zookeeper各种应用场景(Recipe,如共享锁服务、Master选举机制和分布式计算器等)的抽象封装。

本文,将会对zkclient的使用进行说明,Curator的使用将会在下篇 文章介绍。

2. zkclient的使用

创建会话

@Test
    public void testCreateSession() throws InterruptedException {
        ZkClient zkClient = new ZkClient(ZookeeperHelper.zkAddress,
                ZookeeperHelper.sessionTimeout,
                ZookeeperHelper.connectionTimeout);
        System.out.println("-------------zookeeper session established...");

        Thread.sleep(1000);

        zkClient.close();
    }

创建节点Node

zkClient对于创建节点,提供了丰富的接口。例如 createEphemeral 和 createPersistent,语义非常清晰;除此之外,它在创建持久化节点时,可以递归创建节点,方法如下:createPersistent(String path, boolean createParents),当createParents为true时,将会递归创建节点,它的实现是在zkclient的内部,从根节点开始一个个查询节点是否存在,如果不存在就创建该节点,一直追溯。

@Test
    public void testCreateNode() {
        String ephemeral_path = rootPath + "/create_ephemeral";  // 临时节点
        String persistent_path = rootPath + "/create_persistent";  // 持久节点

        String data = "";

        zkClient.createEphemeral(ephemeral_path, data);
        System.out.println("---------success to create node : " + ephemeral_path);

        String ephemeralSequentialPath = zkClient.createEphemeralSequential(ephemeral_path, data);
        System.out.println("---------success to create node : " + ephemeralSequentialPath);

        zkClient.createPersistent(persistent_path, data); //创建持久节点
        System.out.println("---------success to create node : " + persistent_path);

        String persistentSequentialPath = zkClient.createPersistentSequential(persistent_path, data);
        System.out.println("---------success to create node : " + persistentSequentialPath);

        //清除
        zkClient.delete(ephemeral_path);
        zkClient.delete(ephemeralSequentialPath);
        zkClient.delete(persistent_path);
        zkClient.delete(persistentSequentialPath);
    }

##删除节点 在原生API中,删除节点需要自行递归删除,否则抛异常,现在zkclient提供了递归删除节点的功能。

    @Test
    public void testDeleteNode() {
        String path = rootPath + "/delete/node";  // 多级路径

        zkClient.createPersistent(path, true); // 创建父节点
        System.out.println("---------success to create node : " + path);

        //递归删除
        zkClient.deleteRecursive(rootPath);
    }

监听子节点的变化

原生API中,监听节点的数据或者子节点的变化都是很麻烦的,因为监听器只生效一次。zkclient提供了一组永久监听节点的API。

subscribeChildChanges  //订阅子节点变化的事件
unsubscribeChildChanges //取消订阅子节点变化的事件
subscribeDataChanges //订阅节点数据变化的事件
unsubscribeDataChanges //取消订阅节点数据变化的事件
subscribeStateChanges //订阅节点状态变化的事件
unsubscribeStateChanges //取消订阅节点状态变化的事件
unsubscribeAll //取消订阅所有事件
@Test
    public void testNodeChangedListener() throws InterruptedException {
        String parentPath = rootPath + "/node_changed_parent";
        zkClient.subscribeDataChanges();
        //监听器不再是一次性,此时parentPath还不存在,可以对还没存在的节点进行监听
        zkClient.subscribeChildChanges(parentPath, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println(parentPath + "的子节点发生改变----" + "它的子节点是:" + currentChilds);
            }
        });

        zkClient.createPersistent(parentPath, true);
        Thread.sleep(1000);

        //创建子节点
        String childrenPath = parentPath + "/children";
        zkClient.createPersistent(childrenPath, true);
        Thread.sleep(1000);

        //删除子节点
        zkClient.deleteRecursive(childrenPath);
        Thread.sleep(1000);

        //删除父节点
        zkClient.deleteRecursive(parentPath);
        Thread.sleep(1000);
    
        zkClient.unsubscribeAll();
    }

获取节点的子节点

    @Test
    public void testGetChildren() throws InterruptedException, IOException, KeeperException {
        String nodePath1 = rootPath + "/node1";
        String nodePath2 = rootPath + "/node2";

        zkClient.createPersistent(nodePath1, true);
        zkClient.createPersistent(nodePath2, true);

        List<String> children = zkClient.getChildren(rootPath);
        for (String child : children) {
            /**
             * 输出:节点:node2   节点:node1
             */
            System.out.println("节点:" + child);
        }

        zkClient.deleteRecursive(rootPath);
    }

获取节点数据

@Test
    public void testReadData() throws InterruptedException, IOException, KeeperException {
        String test_read_data_path = rootPath + "/test_read_data";

        zkClient.createPersistent(test_read_data_path, "123"); //创建持久节点
        System.out.println("---------success to create node : " + test_read_data_path);

        zkClient.subscribeDataChanges(test_read_data_path, new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println("path : " + dataPath + " changed" + " ,new data : " + data);
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("path : " + dataPath + " deleted.");
            }
        });

        zkClient.writeData(test_read_data_path, "456");

        String readData = zkClient.readData(test_read_data_path);
        System.out.println("读取数据:" + readData);

        Thread.sleep(1000);

        zkClient.delete(test_read_data_path);

        Thread.sleep(1000);
    }

更新数据

    @Test
    public void testWriteData() {
        String test_write_data_path = rootPath + "/test_write_data";

        String data = "123";

        zkClient.createPersistent(test_write_data_path, data); //创建持久节点
        System.out.println("---------success to create node : " + test_write_data_path);

        Stat stat = new Stat();
        String data1 = zkClient.readData(test_write_data_path, stat);
        System.out.println("节点当前数据:" + data1);

        //使用正确的版本号更新数据
        Stat stat1 = zkClient.writeDataReturnStat(test_write_data_path, "456", stat.getVersion());
        String data2 = zkClient.readData(test_write_data_path);
        System.out.println("第一次更新后,节点数据:" + data2);

        try {
            //使用错误的版本号更新数据
            zkClient.writeDataReturnStat(test_write_data_path, "456",
                    stat1.getVersion() + 1);
        } catch (Exception e) {
            // e.printStackTrace();
            System.out.println("第二次更新失败");
        } finally {
            String data3 = zkClient.readData(test_write_data_path);
            System.out.println("第二次更新后,节点数据:" + data3);
        }

        //清除
        zkClient.delete(test_write_data_path);
    }

节点是否存在判断

    @Test
    public void testExists() {
        boolean exists = zkClient.exists(rootPath);
        System.out.println(rootPath + " exists?" + exists);
    }

转载于:https://my.oschina.net/thinwonton/blog/994924