Redis分布式锁实现,Zookeeper实现分布式锁

  • Post author:
  • Post category:其他


Redis分布式锁实现:随机值一般为UUID,删除key释放锁的时候通过UUID进行判断再删除,获得锁的主线程开启分线程去key是否存在,如果存在延长过期时间为原来的1/3,失效时间不好控制,一般为业务时间的2-3倍左右。


SET key value [EX seconds] [PX milliseconds] [NX|XX]

将字符串值

value

关联到

key

如果

key

已经持有其他值,

SET

就覆写旧值,无视类型。

对于某个原本带有生存时间(TTL)的键来说, 当

SET

命令成功在这个键上执行时, 这个键原有的 TTL 将被清除。


可选参数

从 Redis 2.6.12 版本开始,

SET

命令的行为可以通过一系列参数来修改:


  • EX second

    :设置键的过期时间为

    second

    秒。

    SET key value EX second

    效果等同于

    SETEX key second value


  • PX millisecond

    :设置键的过期时间为

    millisecond

    毫秒。

    SET key value PX millisecond

    效果等同于

    PSETEX key millisecond value


  • NX

    :只在键不存在时,才对键进行设置操作。

    SET key value NX

    效果等同于

    SETNX key value


  • XX

    :只在键已经存在时,才对键进行设置操作。

因为

SET

命令可以通过参数来实现和

SETNX



SETEX



PSETEX

三个命令的效果,所以将来的 Redis 版本可能会废弃并最终移除

SETNX



SETEX



PSETEX

这三个命令。


可用版本:

>= 1.0.0


时间复杂度:

O(1)


返回值:

在 Redis 2.6.12 版本以前,

SET

命令总是返回

OK

从 Redis 2.6.12 版本开始,

SET

在设置操作成功完成时,才返回

OK

如果设置了

NX

或者

XX

,但因为条件没达到而造成设置操作未执行,那么命令返回空批量回复(NULL Bulk Reply)。

———————————————————————————————————————————————————–

Zookeeper实现分布式锁:

一:创建临时节点:


path改为protected

二:创建临时有序节点:



实现共享锁

1.IDEA创建maven项目,pom.xml添加如下依赖

<dependency>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

<version>3.8.0</version>

</dependency>

<dependency>

<groupId>curator</groupId>

<artifactId>curator</artifactId>

<version>0.0.7</version>

</dependency>

<dependency>

<groupId>cn.itlym.shoulder</groupId>

<artifactId>lombok</artifactId>

<version>0.1</version>

</dependency>

2.鼠标右击,重新构建项目

3.创建CuratorCRUD类,和ZKShareLock类

public class CuratorCRUD {

public   CuratorFramework curatorFramework;

public   String ip=”192.168.159.151:2181,192.168.159.151:2182,192.168.159.151:2183″;

public CuratorCRUD() {


createZkCuratorConnection();

}

/**

* 创建客户端连接

*/

public  void  createZkCuratorConnection(){


curatorFramework=CuratorFrameworkFactory

.builder()

.connectString(ip)

.retryPolicy(new ExponentialBackoffRetry(1000,3))

.build();

curatorFramework.start();

}

/**

* 关闭客户端连接

*/

public  void deleteZkCuratorConnection(){


curatorFramework.close();

}

}

public class ZKShareLock extends Thread{

private Object o=new Object();

private CuratorFramework curatorFramework;

private String basePath=”/ShareLocks”;

private String userName=basePath+”/User-“;

private String cname;//客户端名字

public ZKShareLock(CuratorFramework curatorFramework,String cname) {


this.curatorFramework = curatorFramework;

this.cname=cname;

}

@Override

public void run() {


try {


//创建节点并获取节点名字

//得到完整目录/ShareLocks/User-0000000092

String nodeName = curatorFramework

.create()

.creatingParentsIfNeeded()

.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)

.forPath(userName, cname.getBytes());

System.out.println(“创捷节点成功”);

System.out.println(nodeName);

System.out.println();

//获取目录下子节点

List<String> tempNodeNames = curatorFramework.getChildren().forPath(basePath);

List<String> nodeNames =new ArrayList();

for (int i = 0; i < tempNodeNames.size(); i++) {


String name =tempNodeNames.get(i);

name=basePath+”/”+name;

nodeNames.add(name);

}

Collections.sort(nodeNames);

int index=nodeNames.indexOf(nodeName);

System.out.printf(“index =%d  \n”,index);

if(index==0){


doSomthings(nodeName);

unlock(nodeName);

}else {


addWatcherWithTreeCache(nodeNames.get(index-1));

synchronized (o){


o.wait();

}

doSomthings(nodeName);

unlock(nodeName);

}

} catch (Exception e) {


e.printStackTrace();

}

curatorFramework.close();

}

//添加监听器

public   void addWatcherWithTreeCache(String path) throws Exception {


TreeCache treeCache=new TreeCache(curatorFramework,path);

TreeCacheListener treeCacheListene=new TreeCacheListener() {


@Override

public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {


if(treeCacheEvent.getType()==TreeCacheEvent.Type.NODE_REMOVED){


System.out.printf(“%s delete\n”,path);

synchronized (o){


o.notify();

}

System.out.printf(“%s notify\n”,path);

}

}

};

treeCache.getListenable().addListener(treeCacheListene);

treeCache.start();

}

public void unlock(String nodeName){


System.out.printf(“%s unlock”,nodeName);

}

public void doSomthings(String nodeName){


System.out.printf(“%s doSomthings”,nodeName);

}

public static void main(String[] args) {


Date date=new Date();

for (int i = 0; i < 100; i++) {


new ZKShareLock(new CuratorCRUD().curatorFramework,”用户 “+i).start();

}

Date date1=new Date();

System.out.println(date1.getTime()-date.getTime());

}

}

运行之前,需要在zookeeper客户端创建/ShareLocks节点,且上面的ip修改为自己的集群ip

部分运行结果:

实现排他锁

Curator中封装了一种分布式可重入排他锁:InterProcessMutex

创建CuratorMutex类,并在zookeeper中创建/Mutex节点

public class CuratorMutex implements Runnable{

public  CuratorFramework curatorFramework;

public  String basePath;

public InterProcessLock processLock;

public int idx;

public CuratorMutex(CuratorFramework curatorFramework, String basePath, int idx) {


this.curatorFramework = curatorFramework;

this.basePath = basePath;

this.processLock=new InterProcessMutex(curatorFramework,basePath);

this.idx=idx;

}

@Override

public void run() {


Logger logger= Logger.getLogger(“”);

try {


//线程加锁

processLock.acquire(1000, TimeUnit.SECONDS);

logger.info(String.format(“线程%d获取锁”,idx));

Thread.sleep(10);

} catch (Exception e) {


e.printStackTrace();

}finally {


//线程解锁

try {


processLock.release();

} catch (Exception e) {


e.printStackTrace();

}

logger.info(String.format(“线程%d释放锁”,idx));

}

}

public static void main(String[] args) {


CuratorFramework framework = new CuratorCRUD().curatorFramework;

for (int i = 0; i < 20; i++) {


new Thread(new CuratorMutex(framework,”/Mutex”,i)).start();

}

while (true){

}

}

}

部分运行结果:

下面将从源码里带大家讲解如何实现可重入排他锁

acquire方法内部实际实际上调用了internalLock方法

private boolean internalLock(long time, TimeUnit unit) throws Exception

{


/*

Note on concurrency: a given lockData instance

can be only acted on by a single thread so locking isn’t necessary

*/

//获取当前线程,并获取LockData锁信息

Thread currentThread = Thread.currentThread();

LockData lockData = threadData.get(currentThread);

if ( lockData != null )

{


// re-entering

//lockCount自增,锁重入

lockData.lockCount.incrementAndGet();

return true;

}

//获取锁

String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());

if ( lockPath != null )

{


//创建锁,并将锁信息存放到threadData这个Map中

LockData newLockData = new LockData(currentThread, lockPath);

threadData.put(currentThread, newLockData);

return true;

}

return false;

}

public void release() throws Exception

{


/*

Note on concurrency: a given lockData instance

can be only acted on by a single thread so locking isn’t necessary

*/

Thread currentThread = Thread.currentThread();

LockData lockData = threadData.get(currentThread);

if ( lockData == null )

{


throw new IllegalMonitorStateException(“You do not own the lock: ” + basePath);

}

//减少重入次数

int newLockCount = lockData.lockCount.decrementAndGet();

if ( newLockCount > 0 )

{


return;

}

if ( newLockCount < 0 )

{


throw new IllegalMonitorStateException(“Lock count has gone negative for lock: ” + basePath);

}

try

{    //释放锁

internals.releaseLock(lockData.lockPath);

}

finally

{    //从Map中移除该线程

threadData.remove(currentThread);

}

}



版权声明:本文为qq_36864672原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。