搜了一些Zookeeper的相关书籍和博客,但好多基本都是讲Zookeeper的架构、用途,尤其分布式中的应用讲的真的是天花乱坠,但看完还是不会写代码,搞得自己理论丰富的一批,实践完全懵逼。对于Zookeeper的Java客户端API使用,基本没有涉及或者讲清楚,要么就是讲的很模糊。果然还是得自己来,通过Zookeeper的API来学习一下Zookeeper的功能,然后依据这些功能去思考怎么用?为什么用?逐渐摸索Zookeeper的应用。
1. 环境准备
1. 安装jdk,这就不细说了。
2.安装Zookeeper集群,网上很多详细教程,但还是简单贴出来自己的配置文件。可以直接复制。只是其中一个Zookeeper节点的配置文件,其余两个仅仅改动一下端口、dataDir以及dataLogDir的路径即可。
# 服务器与客户端之间交互的基本时间单元(ms)
tickTime=2000
#zookeeper集群中的包含多台server, 其中一台为leader, 集群中其余的server为follower. initLimit参数配置初始化连接时, follower和leader之间的最长心跳时间. 此时该参数设置为10, #说明时间限制为10倍tickTime.
initLimit=10
# 该参数配置leader和follower之间发送消息, 请求和应答的最大时间长度. 此时该参数设置为5, 说明时间限制为5倍tickTime.
syncLimit=5
# zookeeper中使用的基本时间单位, 毫秒值.
tickTime=2000
#存储内存中数据库快照的位置
dataDir=/usr/local/zookeeper-cluster/zookeeper-2181/data
#存储日志的目录,如果没有设置该参数, 将使用和#dataDir相同的设置.
dataLogDir=/usr/local/zookeeper-cluster/zookeeper-2181/logs
# 用于监听客户端连接的端口,或者说客户端请求连接的端口
clientPort=2181
maxClientCnxns=60
#这一段很重要,集群配置的关键,server.后面跟的数字是标识某台Zookeeper的关键,后续配置集群会使用
server.1=127.0.0.1:2222:2225
server.2=127.0.0.1:3333:3335
server.3=127.0.0.1:4444:4445
#autopurge.snapRetainCount=3
#autopurge.purgeInterval=1
3. 引入jar包,这里推荐使用Apache的java客户端。
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
2.API使用
1. 首先,创建Zookeeper客户端对象。很简单,直接new一个就行,但主要是注意构造方法的参数。Zookeeper构造方法重载版本比较多,必须了解其中的每个参数代表的含义。
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException
(1)String connectString
:Zookeeper集群的每个节点的IP地址和端口号,用逗号分隔。例如
String zkNodes = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
(2)int sessionTimeout
:会话超时时间,单位是毫秒,在sessionTimeout时间之内,客户端会和服务端直接通过server发送PING请求来保持会话的有效性,俗称“心跳检测”,同时server重新激活client对应的会话。
Session是指当Client创建一个同Server的连接时产生的会话。连接Connected之后Session状态就开启,Zookeeper服务器和Client采用长连接方式(Client会不停地向Server发送心跳)保证session在不出现网络问题、服务器宕机或Client宕机情况下可以一直存在。因此,在正常情况下,session会一直有效,并且ZK集群上所有机器都会保存这个Session信息。
如果超出sessionTimeout的时间服务端仍未接收到客户端的心跳检测请求,那么服务端就会将客户端看做下线状态,会将存储的session给删除。在ZK中,很多数据和状态都是和会话绑定的,一旦会话失效,那么ZK就开始清除和这个会话有关的信息,包括这个会话创建的临时节点和注册的所有Watcher。
但还有另一种情况就是,客户端正常,但是当前会话所连接的Zookeeper集群节点宕机或者其他原因心跳检测失败,也就是无法ping,ZK Client会马上捕获到这个异常,封装为一个ConnectionLoss的事件,然后启动自动重连机制在地址列表中选择新的地址进行重连。重连会有三种结果:
- 在session timeout时间内重连成功,client会重新收到一个syncconnected的event,并将连接重新持久化为connected状态
- 超过session timeout时间段后重连成功,client会收到一个expired的event,并将连接持久化为closed状态
- 一直重连不上,client将不会收到任何event
(3)Watcher watcher
:这是Zookeeper中非常重要的一个特征,由于客户端与Zookeeper之间的连接是采用长连接,所以,可以通过客户端在Zookeeper上注册一个事件监听器,也就是Watcher对象,当Zookeeper中发生某一个事件时就会回调该Watcher对象中的方法。这个Watcher对象的作用非常重要,这个事件监听器将会一直存在,直到Zookeeper客户端关闭连接。
(4)long sessionId
:每一个会话session的建立,Zookeeper都会为自动该会话分配一个全局唯一的会话id,所以一般该id不会由我们指定,一般都不会指定传递该参数。
(5)boolean canBeReadOnly
:是否提供只读服务(不提供写服务)。
(6)HostProvider aHostProvider
:随机提供host进行连接。没啥用,默认的即可。
(7)ZKClientConfig clientConfig
:连接参数配置,
(8)byte[] sessionPasswd
:提供连接zookeeper的sessionId和密码,通过这两个确定唯一一台客户端,目的是可以提供重复会话。
实际上,真正必须的参数也就是connectString、sessionTimeout和watcher,其余的默认即可。所以Zookeeper也提供了这三个参数的重载版本,也是最常用的构造方法。
注意:zookeeper客户端和服务器端会话的建立是一个异步的过程,也就是说在程序中,我们程序方法在处理完客户端初始化后,立即返回(程序往下执行代码,这样,大多数情况下我们并没有真正构建好一个可用会话,在会话的声明周期处于”CONNECTING”时才算真正建立完毕)
创建Zookeeper客户端并建立连接的示例代码如下:
public static void createZookeeperClient() throws IOException, InterruptedException {
/*
由于Zookeeper客户端对象的建立和连接是异步执行,所以很有可能会因为Zookeeper对象尚未建立连接,
主线程就继续执行导致的执行报错,这一点可以通过一些线程同步类辅助,保证Zookeeper连接完全建立成功后在进行后续操作
*/
final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper(zkNodes, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//已建立连接
if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
countDownLatch.countDown();
}
}
});
countDownLatch.await();
//do something...
}
2. Watcher对象的使用
:Watcher可以实现对Zookeeper的监控,这个特性是来自于Zookeeper客户端和服务端采用TCP长连接的方式进行通信。当Zookeeper发生某个事件时,会通过Watcher对象进行回调通知。Watcher是一个接口,所以必须要实现一个具体实现类,仅有一个接口方法process(),该方法的参数WatchedEvent就是Zookeeper服务端发生的事件,其中包含了该事件的所有信息。客户端在向zk服务器注册Watcher的同时,会将Watcher对象存储在客户端的WatchManager中。当zk服务端触发Watcher事件后,会向客户端发送通知,客户端线程从WatcherManager中取出对应的Watcher对象。来执行回调逻辑。
public class WatcherTest1 implements Watcher {
@Override
public void process(WatchedEvent watchedEvent) {
}
}
对于WatchedEvent,其源码比较简单,其中只包含了三个主要的域变量,分别是EventType eventType, KeeperState keeperState, String path。eventType表示事件类型,keeperState表示连接状态,path就表示事件的发生在哪个数据节点路径上发生的。
public class WatchedEvent {
private final KeeperState keeperState;
private final EventType eventType;
private String path;
····
}
EventType和KeeperState都是枚举类型
public static enum EventType {
None(-1),//无此节点
NodeCreated(1),//节点已创建成功
NodeDeleted(2),//节点已删除成功
NodeDataChanged(3),//节点数据改变
NodeChildrenChanged(4),//子节点被创建、被删除会发生事件触发
DataWatchRemoved(5),//数据监视已被移除
ChildWatchRemoved(6);//子节点监视已被移除
private final int intValue;
....
}
public static enum KeeperState {
Unknown(-1),//从3.1.0版本开始被废弃
Disconnected(0),//客户端和服务器处于断开连接状态
NoSyncConnected(1),//从3.1.0版本开始被废弃
SyncConnected(3),//客户端和服务器处于连接状态
AuthFailed(4),//权限验证失败状态,通常同时也会收到AuthFailedException
ConnectedReadOnly(5),//只读连接
SaslAuthenticated(6),//权限验证通过
Expired(-112),//此时客户端会话失效,通常同时也会收到SessionExpiredException
Closed(7);//连接资源关闭
....
}
Watcher特性
一次性
无论是服务端还是客户端,一旦一个 Watcher 被触发,ZooKeeper 都将其从相应的存储中移除。因此,开发人员在 Watcher 的使用上要记住的一点是需要反复注册。例如,如果客户端执行
getData("/znode1",true)
,后面对
/znode1
的更改或删除,客户端都会获得
/znode1
的监控事件通知。如果
/znode1
再次更改,如果客户端没有执行新一次设置新监视点的读取,是不会发送监视事件通知的。但是,有一个监听器对象是例外,在创建Zookeeper对象时,调用的构造方法中必须传递的Watcher对象就是和Zookeeper对象具有相同的生命周期,直到Zookeeper客户端关闭连接,该Watcher才会关闭对服务端的监听。
客户端串行执行
客户端Watcher回调的过程是一个串行同步的过程,这为我们保证了顺序,同时,需要开发人员注意的一点是,千万不要因为一个Watcher的处理逻辑影响了整个客户端的Watcher回调。
轻量
WatchedEvent 是 ZooKeeper 整个 Watcher 通知机制的最小通知单元,这个数据结构中只包含三部分内容:通知状态、事件类型和节点路径。也就是说,Watcher 通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。
另外,客户端向服务端注册 Watcher 的时候,并不会把客户端真实的 Watcher 对象传递给服务端,仅仅只是在客户端请求中使用 boolean 类型属性进行了标记,同时服务端也仅仅只是保存了当前连接的 ServerCnxn 对象。如此轻量的Watcher机制设计,在网络开销和服务端内存开销上都是非常廉价的。
3. 创建节点:Zookeeper的Java客户端对于数据操作(增删改查)都有同步和异步两种实现方式。
同步操作一般会有返回值,并且会抛出相应的异常。异步操作没有返回值,也不会抛出异常。此外异步方法参数在同步方法参数的基础上,会增加Callback和context两个参数。
同步创建节点:全参数版本
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode, Stat stat, long ttl)
(1)path指的就是数据节点的路径。
(2)data就是要存储的字符串的字节数组形式。(不支持序列化方式,如果需要实现序列化,可使用java相关的序列化框架,如Hession)
(3)acl指节点权限,统一使用Ids.OPEN_ACL_UNSAFE权限即可(一般在权限没有太高要求的场景下,没必要关注)
(4)createMode节点类型,创建节点的类型:CreateMode.* 提供四种节点类型,分别是
PERSISTENT (持久节点)、PERSISTENT_SEQUENTIAL(持久顺序节点)、
EPHEMRAL(临时节点)、
EPHEMRAL_SEQUENTIAL(临时顺序节点)
(5)stat节点状态信息:在创建节点时可以手动指定节点的状态信息,但一般无需传入该参数。(非必须参数)
(6)ttl:过期时间,如果该节点在ttl时间之内未发生变动,就会被删除。(非必须参数)
所以常用的同步创建节点方法的重载版本如下
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
示例代码如下
public static void createNode() throws IOException, KeeperException, InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper(zkNodes, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//已建立连接
if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
countDownLatch.countDown();
}
}
});
countDownLatch.await();
//节点权限设置
ACL acl = new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.ANYONE_ID_UNSAFE);
List<ACL> acls = new ArrayList<ACL>();
acls.add(acl);
//创建节点
zooKeeper.create("/demo", "helloworld".getBytes(), acls, CreateMode.PERSISTENT);
System.out.println("over");
}
异步创建节点:
public void create(String path, byte[] data, List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
除了create同步方法中的四个必须参数以外,异步模式的create方法还增加了callback和context两个参数。这个callback中的processResult方法会在节点创建好之后被调用,它有四个参数。第一个是int类型的resultCode,为服务端响应码, 0表示调用成功,-4表示端口连接,-110表示指定节点存在,-112表示会话已经过期。第二个参数是创建节点的路径。第三个参数是context,当一个StringCallback类型对象作为多个create方法的参数时,这个参数就很有用。第四个参数是创建节点的名字,其实与path参数相同。异步的方法不会抛出异常,而是会在回调StringCallback中处理所有的事件。
public interface StringCallback extends AsyncCallback {
void processResult(int var1, String var2, Object var3, String var4);
}
示例代码
public static void createNode() throws IOException, KeeperException, InterruptedException {
//创建连接对象省略
final CountDownLatch countDownLatch = new CountDownLatch(1);
//节点权限设置
ACL acl = new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.ANYONE_ID_UNSAFE);
List<ACL> acls = new ArrayList<ACL>();
acls.add(acl);
zk.create("/demo", "helloworld".getBytes(), acls, CreateMode.PERSISTENT,new IStringCallBack(), countDownLatch);
countDownLatch.await();
System.out.println("over");
}
static class IStringCallBack implements AsyncCallback.StringCallback {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
switch (KeeperException.Code.get(rc)) {
case CONNECTIONLOSS:
System.out.println("CONNECTIONLOSS");
break;
case OK:
System.out.println("OK - {" + path + ", " + name + ", " + ctx + "}");
CountDownLatch countDownLatch = (CountDownLatch) ctx;
countDownLatch.countDown();
break;
case NODEEXISTS:
System.out.println(path + "exists");
break;
default:
System.out.println("DEFAULT");
break;
}
}
}
4. 删除节点
:和创建节点类似,分同步和异步。
(1)同步
public void delete(String path, int version)
参数path,表示节点路径
参数version,表示版本号,即表示本次删除操作是针对该数据的某个版本进行操作。只有当version参数的值与节点状态信息中的dataVersion值相等时,数据修改才能成功,否则会抛出BadVersion异常。这是为了防止丢失数据的更新,在ZooKeeper提供的API中,所有的对已有节点的写数据操作都有version参数。
(2)异步
public void delete(String path, int version, VoidCallback cb, Object ctx)
5. 修改节点数据
:
(1)同步
public Stat setData(String path, byte[] data, int version)
(2)异步
public void setData(String path, byte[] data, int version, StatCallback cb, Object ctx)
参数path:表示数据节点路径
参数data:表示要设置的数据
参数version:表示修改数据版本
6. 获取节点的数据
:
(1)同步:
public byte[] getData(String path, boolean watch, Stat stat)
zooKeeper.getData方法的返回值就是节点中存储的数据值,它有三个参数,第一个参数是节点的路径,用于表示要获取哪个节点中的数据。第三个参数stat用于存储节点的状态信息,在调用getData方法前,会先构造一个空的Stat类型对象作为参数传给getData方法,当getData方法调用返回后,节点的状态信息会被填充到stat对象中。
private void getDataSync() throws KeeperException, InterruptedException {
Stat stat = new Stat();
// getData的返回值是该节点的数据值,节点的状态信息会赋值给stat对象
byte[] data = zooKeeper.getData("/node_1",true, stat);
System.out.println(new String(data));
System.out.println(stat);
}
第二个参数是一个bool类型的watch,这个参数比较重要。当watch为true时,表示我们想要监控这个节点的数据变化,而使用的监听器对象就是我们在创建Zookeeper客户端时指定的那个监听器对象,这个boolean参数时,false表示不对该节点监控。当节点的数据发生变化时,我们就可以拿到zk服务器推送给我们的通知。
第二个参数,我们还可以使用自定义的Watcher对象,但这种监控只能生效一次,这是getData方法的另一个重载版本
public byte[] getData(String path, Watcher watcher, Stat stat)
(2)异步
public void getData(String path, Watcher watcher, DataCallback cb, Object ctx)
public void getData(String path, boolean watch, DataCallback cb, Object ctx)
private void getDataAsync() {
zooKeeper.getData("/node", true, new AsyncCallback.DataCallback() {
public void processResult(int resultCode, String path, Object ctx, byte[] data, Stat stat) {
System.out.println(resultCode);
System.out.println(path);
System.out.println(ctx);
System.out.println(new String(data));//data就是获取到的数据
System.out.println(stat);
}
}, "异步获取节点的数据");
}