Zookeeper 版本 3.4.12
curator-framework、curator-recipes 版本 2.13.0
curator版本要与zk版本对应,不然启动会报错
使用示例
先看一个基于zk实现分布式锁的例子
public static void main(String[] args) throws InterruptedException {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
.retryPolicy(retryPolicy)
.connectionTimeoutMs(4000).build();
zkClient.start();
InterProcessMutex lock = new InterProcessMutex(zkClient, "/lock");
//倒计时 子线程执行完毕后 主线程才结束
CountDownLatch countDownLatch=new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
//获取锁
lock.acquire();
System.out.println(Thread.currentThread().getName()+"===>获取锁");
//模拟业务执行耗时
Thread.sleep(4000);
//释放锁
lock.release();
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
},"Thread-"+i).start();
}
countDownLatch.await();
System.out.println("执行结束~~~");
}
我们知道,zk有几种节点类型:持久化节点、临时节点、持久化有序节点、临时有序节点。
这里用到的是临时有序节点。
分布式锁流程分析
我们结合上面的代码以
5
个线程抢占分布式锁为例图表方式 简要说明下锁的获取与释放过程
1、首先会将**/lock**作为父节点 创建5个临时有序子节点
2、序号最小的子节点对应的线程先获取锁,为避免惊群效应其余的子节点 监听前一个节点(序号最小的子节点不监听节点了),然后对应的线程进行阻塞
图中 标红的子节点对应的线程 进行阻塞,标绿的子节点 对应的线程代表获取到了锁
3、当
xxx–lock-0000000001
节点对应线程释放锁后,当前节点会被删除,后一个节点对应的线程因为watch机制 会被唤醒
如上xxx–lock-0000000001节点会被删除,并唤醒xxx–lock-0000000002对应的线程。
4、以此类推 后面的线程大体按照这个流程去获取锁
源码分析
InterProcessMutex类
private final LockInternals internals;
//基准路径 会根据这个路径创建临时有序节点
private final String basePath;
//存储线程与锁信息的对应关系
private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData;
//锁的名称
private static final String LOCK_NAME = "lock-";
//client zk客户端对象
public InterProcessMutex(CuratorFramework client, String path) {
this(client, path, new StandardLockInternalsDriver());
}
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
this(client, path, "lock-", 1, driver);
}
//maxLeases=1 这个在后面会起到作用
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
this.threadData = Maps.newConcurrentMap();
this.basePath = PathUtils.validatePath(path);
this.internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
acquire加锁分析
上面给出了acquire的调用时序图 我们主要从
internalLock
方法开始看起
public void acquire() throws Exception {
//这里直接调用internalLock方法,传入时间-1 表示不设置超时时间
if (!this.internalLock(-1L, (TimeUnit)null)) {
throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
}
}
private boolean internalLock(long time, TimeUnit unit) throws Exception {
//获取当前线程
Thread currentThread = Thread.currentThread();
//从map中查找当前线程的锁信息
InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
//如果查找的到 则将加锁次数+1 从这里可以看出zk的锁也是可重入的
if (lockData != null) {
lockData.lockCount.incrementAndGet();
return true;
} else {
//尝试获取锁 并返回锁路径
String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
//如果路径不为空 说明当前线程抢占到锁 因此在map中维护当前线程与锁信息的对应关系
if (lockPath != null) {
InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
this.threadData.put(currentThread, newLockData);
return true;
} else {
return false;
}
}
}
LockInternals
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
//获取当前时间
long startMillis = System.currentTimeMillis();
//如果设置了超时时间 这里会对超时时间进行单位换算,由于我们未设置 这里返回null
Long millisToWait = unit != null ? unit.toMillis(time) : null;
//我们也没有为临时节点设置值 因此lockNodeBytes为null,localLockNodeBytes也为空
byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
//是否持有了锁
boolean hasTheLock = false;
//是否操作已完成
boolean isDone = false;
while(!isDone) {
isDone = true;
try {
//这里每个线程会基于path生成一个属于当前线程的临时有序节点,并返回节点路径 如/lock/xxx--lock-0000000002
ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
//抢锁及监听操作 在这里
hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
} catch (NoNodeException var14) {
if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
throw var14;
}
isDone = false;
}
}
return hasTheLock ? ourPath : null;
}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
try {
if (this.revocable.get() != null) {
((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
}
//如果当前客户端是启动状态 并且当前线程未获取锁
while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
//这里会将basePath下的所有节点取到 并按照自然顺序进行排序,集合中字符串格式形如/xxx--lock-0000000002
List<String> children = this.getSortedChildren();
//这里将ourPath中前缀去掉,如/lock/xxx--lock-0000000002==>xxx--lock-0000000002
String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
//如果当前节点前没有字节点了 那么会获取锁 这里将haveTheLock标记为true
if (predicateResults.getsTheLock()) {
haveTheLock = true;
} else {
//如果未获取锁 拼接待监听的节点路径
String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
try {
//当前子节点监听previousSequencePath这个路径
((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
//未设置超时时间 直接阻塞线程
if (millisToWait == null) {
this.wait();
} else {
//设置了超时时间
millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
//如果未超时 那么线程阻塞指定时间,否则超时后,会将当前子节点进行删除,可参见finally语句块逻辑
if (millisToWait > 0L) {
this.wait(millisToWait);
} else {
doDelete = true;
break;
}
}
} catch (NoNodeException var19) {
}
}
}
}
} catch (Exception var21) {
ThreadUtils.checkInterrupted(var21);
doDelete = true;
throw var21;
} finally {
if (doDelete) {
this.deleteOurPath(ourPath);
}
}
//返回是否获取锁的标记
return haveTheLock;
}
StandardLockInternalsDriver
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
//children子节点列表是有序的,这里是查找当前子节点序号在children列表中的位置
int ourIndex = children.indexOf(sequenceNodeName);
//如果未找到sequenceNodeName 则outIndex=-1这里主要对这种情况进行校验
validateOurIndex(sequenceNodeName, ourIndex);
//这里就是获取锁的关键 前面提到过maxLeases=1,如果getsTheLock=true,就只有ourIndex=0,也从侧面印证了 每次只能序号最小的节点关联的线程才能获取锁
boolean getsTheLock = ourIndex < maxLeases;
//如果未获取到锁 那就找一下当前子节点的前一个节点的路径 作为被监听的对象
String pathToWatch = getsTheLock ? null : (String)children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
**acquire(long time, TimeUnit unit)**带超时时间的获取锁流程 与之有些不同:当超时时间内未获取到锁,则将当前子节点删除,下一个子节点继续尝试获取锁
release 释放锁
zk分布式锁是可重入的,因此在释放锁时 只有当重入次数减为0 才真正地去释放锁
public void release() throws Exception {
//获取当前线程
Thread currentThread = Thread.currentThread();
//获取当前线程的锁信息,通过acquire代码可知 只有当前线程获取了锁,才能有锁信息
InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
} else {
//重入次数-1
int newLockCount = lockData.lockCount.decrementAndGet();
if (newLockCount <= 0) {
//异常情况判断
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
} else {
try {
//当重入次数=0 时 释放锁
this.internals.releaseLock(lockData.lockPath);
} finally {
//最后 清除该线程的锁信息
this.threadData.remove(currentThread);
}
}
}
}
}
LockInternals
void releaseLock(String lockPath) throws Exception {
this.revocable.set((Object)null);
//这里实际上是删除当前被加锁的子节点
this.deleteOurPath(lockPath);
}
由上面可知,因为后一个节点会监听当前节点,因此当持有锁的节点被删除后会被监听到,触发监听逻辑
private synchronized void notifyFromWatcher() {
//唤醒线程
this.notifyAll();
}
触发监听逻辑后,当前节点的后一个节点所关联的线程将被唤醒,然后继续执行LockInternals#internalLockLoop方法获取锁。
以上 就是zk实现分布式锁的主要逻辑了。带超时时间的锁实现逻辑没有详细地分析,其实现逻辑大体类似 读者可自行阅读分析。