基于zk的分布式锁使用及原理分析

  • Post author:
  • Post category:其他


Zookeeper 版本 3.4.12

curator-framework、curator-recipes 版本 2.13.0


curator版本要与zk版本对应,不然启动会报错



使用示例

先看一个基于zk实现分布式锁的例子

public static void main(String[] args) throws InterruptedException {

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework zkClient = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(4000).build();

        zkClient.start();

        InterProcessMutex lock = new InterProcessMutex(zkClient, "/lock");
				//倒计时 子线程执行完毕后 主线程才结束
        CountDownLatch countDownLatch=new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                try {
                  //获取锁
                    lock.acquire();
                    System.out.println(Thread.currentThread().getName()+"===>获取锁");
                  //模拟业务执行耗时
                    Thread.sleep(4000);
                  //释放锁
                    lock.release();
                    countDownLatch.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }

            },"Thread-"+i).start();
        }

        countDownLatch.await();
        System.out.println("执行结束~~~");
    }

我们知道,zk有几种节点类型:持久化节点、临时节点、持久化有序节点、临时有序节点。

这里用到的是临时有序节点。



分布式锁流程分析

我们结合上面的代码以

5

个线程抢占分布式锁为例图表方式 简要说明下锁的获取与释放过程

1、首先会将**/lock**作为父节点 创建5个临时有序子节点

在这里插入图片描述

2、序号最小的子节点对应的线程先获取锁,为避免惊群效应其余的子节点 监听前一个节点(序号最小的子节点不监听节点了),然后对应的线程进行阻塞

在这里插入图片描述

图中 标红的子节点对应的线程 进行阻塞,标绿的子节点 对应的线程代表获取到了锁

3、当

xxx–lock-0000000001

节点对应线程释放锁后,当前节点会被删除,后一个节点对应的线程因为watch机制 会被唤醒

在这里插入图片描述

如上xxx–lock-0000000001节点会被删除,并唤醒xxx–lock-0000000002对应的线程。

4、以此类推 后面的线程大体按照这个流程去获取锁



源码分析



InterProcessMutex类
    
		private final LockInternals internals;
    //基准路径 会根据这个路径创建临时有序节点
		private final String basePath;
    //存储线程与锁信息的对应关系
    private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData;
    //锁的名称
    private static final String LOCK_NAME = "lock-";

		//client zk客户端对象
		public InterProcessMutex(CuratorFramework client, String path) {
        this(client, path, new StandardLockInternalsDriver());
    }

    public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
        this(client, path, "lock-", 1, driver);
    }
		//maxLeases=1 这个在后面会起到作用
		InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, 			LockInternalsDriver driver) {
        this.threadData = Maps.newConcurrentMap();
        this.basePath = PathUtils.validatePath(path);
        this.internals = new LockInternals(client, driver, path, lockName, maxLeases);
    }


acquire加锁分析

在这里插入图片描述

上面给出了acquire的调用时序图 我们主要从

internalLock

方法开始看起

public void acquire() throws Exception {
  			//这里直接调用internalLock方法,传入时间-1 表示不设置超时时间
        if (!this.internalLock(-1L, (TimeUnit)null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
        }
    }
private boolean internalLock(long time, TimeUnit unit) throws Exception {
  			//获取当前线程
        Thread currentThread = Thread.currentThread();
  			//从map中查找当前线程的锁信息
        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
  			//如果查找的到 则将加锁次数+1 从这里可以看出zk的锁也是可重入的
        if (lockData != null) {
            lockData.lockCount.incrementAndGet();
            return true;
        } else {
          //尝试获取锁 并返回锁路径
            String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
          	//如果路径不为空 说明当前线程抢占到锁 因此在map中维护当前线程与锁信息的对应关系
            if (lockPath != null) {
                InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
                this.threadData.put(currentThread, newLockData);
                return true;
            } else {
                return false;
            }
        }
    }


LockInternals

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
  			//获取当前时间 
        long startMillis = System.currentTimeMillis();
  			//如果设置了超时时间 这里会对超时时间进行单位换算,由于我们未设置 这里返回null
        Long millisToWait = unit != null ? unit.toMillis(time) : null;
  			//我们也没有为临时节点设置值 因此lockNodeBytes为null,localLockNodeBytes也为空
        byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
        int retryCount = 0;
        String ourPath = null;
  			//是否持有了锁
        boolean hasTheLock = false;
  			//是否操作已完成
        boolean isDone = false;

        while(!isDone) {
            isDone = true;

            try {
              	//这里每个线程会基于path生成一个属于当前线程的临时有序节点,并返回节点路径 如/lock/xxx--lock-0000000002
                ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
              	//抢锁及监听操作 在这里
                hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
            } catch (NoNodeException var14) {
                if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                    throw var14;
                }

                isDone = false;
            }
        }

        return hasTheLock ? ourPath : null;
    }
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;

        try {
            if (this.revocable.get() != null) {
                ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
            }
						//如果当前客户端是启动状态 并且当前线程未获取锁
            while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
              	//这里会将basePath下的所有节点取到 并按照自然顺序进行排序,集合中字符串格式形如/xxx--lock-0000000002
                List<String> children = this.getSortedChildren();
                //这里将ourPath中前缀去掉,如/lock/xxx--lock-0000000002==>xxx--lock-0000000002
                String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
                PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
                //如果当前节点前没有字节点了 那么会获取锁 这里将haveTheLock标记为true
                if (predicateResults.getsTheLock()) {
                    haveTheLock = true;
                } else {
                   //如果未获取锁 拼接待监听的节点路径
                    String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
                    synchronized(this) {
                        try {
                           //当前子节点监听previousSequencePath这个路径
                          ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
                          	//未设置超时时间 直接阻塞线程  
                          if (millisToWait == null) {
                              
                                this.wait();
                            } else {
                            //设置了超时时间
                                millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                            		//如果未超时 那么线程阻塞指定时间,否则超时后,会将当前子节点进行删除,可参见finally语句块逻辑
                                if (millisToWait > 0L) {
                                    this.wait(millisToWait);
                                } else {
                                    doDelete = true;
                                    break;
                                }
                            }
                        } catch (NoNodeException var19) {
                        }
                    }
                }
            }
        } catch (Exception var21) {
            ThreadUtils.checkInterrupted(var21);
            doDelete = true;
            throw var21;
        } finally {
            if (doDelete) {
                this.deleteOurPath(ourPath);
            }

        }
				//返回是否获取锁的标记
        return haveTheLock;
    }


StandardLockInternalsDriver

 public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
   			//children子节点列表是有序的,这里是查找当前子节点序号在children列表中的位置
        int ourIndex = children.indexOf(sequenceNodeName);
   			//如果未找到sequenceNodeName 则outIndex=-1这里主要对这种情况进行校验
        validateOurIndex(sequenceNodeName, ourIndex);
   			//这里就是获取锁的关键 前面提到过maxLeases=1,如果getsTheLock=true,就只有ourIndex=0,也从侧面印证了 每次只能序号最小的节点关联的线程才能获取锁
        boolean getsTheLock = ourIndex < maxLeases;
   			//如果未获取到锁 那就找一下当前子节点的前一个节点的路径 作为被监听的对象
        String pathToWatch = getsTheLock ? null : (String)children.get(ourIndex - maxLeases);
        return new PredicateResults(pathToWatch, getsTheLock);
    }

**acquire(long time, TimeUnit unit)**带超时时间的获取锁流程 与之有些不同:当超时时间内未获取到锁,则将当前子节点删除,下一个子节点继续尝试获取锁



release 释放锁

zk分布式锁是可重入的,因此在释放锁时 只有当重入次数减为0 才真正地去释放锁

public void release() throws Exception {
  			//获取当前线程
        Thread currentThread = Thread.currentThread();
  			//获取当前线程的锁信息,通过acquire代码可知 只有当前线程获取了锁,才能有锁信息
        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
        if (lockData == null) {
            throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
        } else {
          	//重入次数-1
            int newLockCount = lockData.lockCount.decrementAndGet();
            if (newLockCount <= 0) {
								//异常情况判断
                if (newLockCount < 0) {
                    throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
                } else {
                    try {
                      	//当重入次数=0 时 释放锁
                        this.internals.releaseLock(lockData.lockPath);
                    } finally {
                      	//最后 清除该线程的锁信息
                        this.threadData.remove(currentThread);
                    }

                }
            }
        }
    }


LockInternals

void releaseLock(String lockPath) throws Exception {
        this.revocable.set((Object)null);
  			//这里实际上是删除当前被加锁的子节点
        this.deleteOurPath(lockPath);
    }

由上面可知,因为后一个节点会监听当前节点,因此当持有锁的节点被删除后会被监听到,触发监听逻辑

private synchronized void notifyFromWatcher() {
  			//唤醒线程
        this.notifyAll();
    }

触发监听逻辑后,当前节点的后一个节点所关联的线程将被唤醒,然后继续执行LockInternals#internalLockLoop方法获取锁。

以上 就是zk实现分布式锁的主要逻辑了。带超时时间的锁实现逻辑没有详细地分析,其实现逻辑大体类似 读者可自行阅读分析。



版权声明:本文为zyxwvuuvwxyz原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。