1. 简介
在
zookeeper-体验原生api
这篇文章中,我们体验了一下官方的原生API,有没有觉得API很难用呢。
zookeeper原生客户端的不足之处:
(1)Zookeeper的Watcher是一次性的,每次触发之后都需要重新进行注册;
(2)Session超时之后没有实现重连机制;
(3)异常处理繁琐,Zookeeper提供了很多异常,对于开发人员来说可能根本不知道该如何处理这些异常信息;
(4)只提供了简单的byte[]数组的接口,没有提供针对对象级别的序列化;
(5)创建节点时如果节点存在抛出异常,需要自行检查节点是否存在;
(6)删除节点无法实现级联删除;
为此,开源界提供了两个好用的库,一个是zkclient,另一个是Curator。
1.1 zkclient
ZkClient 在 Zookeeper原生API接口的基础上进行了包装,更便于开发人员使用。内部实现了Session超时重连,Watcher反复注册等功能。
虽然ZkClient对原生API进行了封装,但也有它自身的不足之处:
- 几乎没有参考文档;
- 异常处理简化(抛出RuntimeException);
- 重试机制比较难用;
- 没有提供各种使用场景的实现;
1.2 Curator
Curator是Netflix公司开源的一套Zookeeper客户端框架,和ZkClient一样,解决了非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等。目前已经成为Apache的顶级项目。另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。
除此之外,Curator中还提供了Zookeeper各种应用场景(Recipe,如共享锁服务、Master选举机制和分布式计算器等)的抽象封装。
本文,将会对zkclient的使用进行说明,Curator的使用将会在下篇 文章介绍。
2. zkclient的使用
创建会话
@Test
public void testCreateSession() throws InterruptedException {
ZkClient zkClient = new ZkClient(ZookeeperHelper.zkAddress,
ZookeeperHelper.sessionTimeout,
ZookeeperHelper.connectionTimeout);
System.out.println("-------------zookeeper session established...");
Thread.sleep(1000);
zkClient.close();
}
创建节点Node
zkClient对于创建节点,提供了丰富的接口。例如 createEphemeral 和 createPersistent,语义非常清晰;除此之外,它在创建持久化节点时,可以递归创建节点,方法如下:createPersistent(String path, boolean createParents),当createParents为true时,将会递归创建节点,它的实现是在zkclient的内部,从根节点开始一个个查询节点是否存在,如果不存在就创建该节点,一直追溯。
@Test
public void testCreateNode() {
String ephemeral_path = rootPath + "/create_ephemeral"; // 临时节点
String persistent_path = rootPath + "/create_persistent"; // 持久节点
String data = "";
zkClient.createEphemeral(ephemeral_path, data);
System.out.println("---------success to create node : " + ephemeral_path);
String ephemeralSequentialPath = zkClient.createEphemeralSequential(ephemeral_path, data);
System.out.println("---------success to create node : " + ephemeralSequentialPath);
zkClient.createPersistent(persistent_path, data); //创建持久节点
System.out.println("---------success to create node : " + persistent_path);
String persistentSequentialPath = zkClient.createPersistentSequential(persistent_path, data);
System.out.println("---------success to create node : " + persistentSequentialPath);
//清除
zkClient.delete(ephemeral_path);
zkClient.delete(ephemeralSequentialPath);
zkClient.delete(persistent_path);
zkClient.delete(persistentSequentialPath);
}
##删除节点 在原生API中,删除节点需要自行递归删除,否则抛异常,现在zkclient提供了递归删除节点的功能。
@Test
public void testDeleteNode() {
String path = rootPath + "/delete/node"; // 多级路径
zkClient.createPersistent(path, true); // 创建父节点
System.out.println("---------success to create node : " + path);
//递归删除
zkClient.deleteRecursive(rootPath);
}
监听子节点的变化
原生API中,监听节点的数据或者子节点的变化都是很麻烦的,因为监听器只生效一次。zkclient提供了一组永久监听节点的API。
subscribeChildChanges //订阅子节点变化的事件
unsubscribeChildChanges //取消订阅子节点变化的事件
subscribeDataChanges //订阅节点数据变化的事件
unsubscribeDataChanges //取消订阅节点数据变化的事件
subscribeStateChanges //订阅节点状态变化的事件
unsubscribeStateChanges //取消订阅节点状态变化的事件
unsubscribeAll //取消订阅所有事件
@Test
public void testNodeChangedListener() throws InterruptedException {
String parentPath = rootPath + "/node_changed_parent";
zkClient.subscribeDataChanges();
//监听器不再是一次性,此时parentPath还不存在,可以对还没存在的节点进行监听
zkClient.subscribeChildChanges(parentPath, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println(parentPath + "的子节点发生改变----" + "它的子节点是:" + currentChilds);
}
});
zkClient.createPersistent(parentPath, true);
Thread.sleep(1000);
//创建子节点
String childrenPath = parentPath + "/children";
zkClient.createPersistent(childrenPath, true);
Thread.sleep(1000);
//删除子节点
zkClient.deleteRecursive(childrenPath);
Thread.sleep(1000);
//删除父节点
zkClient.deleteRecursive(parentPath);
Thread.sleep(1000);
zkClient.unsubscribeAll();
}
获取节点的子节点
@Test
public void testGetChildren() throws InterruptedException, IOException, KeeperException {
String nodePath1 = rootPath + "/node1";
String nodePath2 = rootPath + "/node2";
zkClient.createPersistent(nodePath1, true);
zkClient.createPersistent(nodePath2, true);
List<String> children = zkClient.getChildren(rootPath);
for (String child : children) {
/**
* 输出:节点:node2 节点:node1
*/
System.out.println("节点:" + child);
}
zkClient.deleteRecursive(rootPath);
}
获取节点数据
@Test
public void testReadData() throws InterruptedException, IOException, KeeperException {
String test_read_data_path = rootPath + "/test_read_data";
zkClient.createPersistent(test_read_data_path, "123"); //创建持久节点
System.out.println("---------success to create node : " + test_read_data_path);
zkClient.subscribeDataChanges(test_read_data_path, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("path : " + dataPath + " changed" + " ,new data : " + data);
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("path : " + dataPath + " deleted.");
}
});
zkClient.writeData(test_read_data_path, "456");
String readData = zkClient.readData(test_read_data_path);
System.out.println("读取数据:" + readData);
Thread.sleep(1000);
zkClient.delete(test_read_data_path);
Thread.sleep(1000);
}
更新数据
@Test
public void testWriteData() {
String test_write_data_path = rootPath + "/test_write_data";
String data = "123";
zkClient.createPersistent(test_write_data_path, data); //创建持久节点
System.out.println("---------success to create node : " + test_write_data_path);
Stat stat = new Stat();
String data1 = zkClient.readData(test_write_data_path, stat);
System.out.println("节点当前数据:" + data1);
//使用正确的版本号更新数据
Stat stat1 = zkClient.writeDataReturnStat(test_write_data_path, "456", stat.getVersion());
String data2 = zkClient.readData(test_write_data_path);
System.out.println("第一次更新后,节点数据:" + data2);
try {
//使用错误的版本号更新数据
zkClient.writeDataReturnStat(test_write_data_path, "456",
stat1.getVersion() + 1);
} catch (Exception e) {
// e.printStackTrace();
System.out.println("第二次更新失败");
} finally {
String data3 = zkClient.readData(test_write_data_path);
System.out.println("第二次更新后,节点数据:" + data3);
}
//清除
zkClient.delete(test_write_data_path);
}
节点是否存在判断
@Test
public void testExists() {
boolean exists = zkClient.exists(rootPath);
System.out.println(rootPath + " exists?" + exists);
}
转载于:https://my.oschina.net/thinwonton/blog/994924