Redis分布式锁实现:随机值一般为UUID,删除key释放锁的时候通过UUID进行判断再删除,获得锁的主线程开启分线程去key是否存在,如果存在延长过期时间为原来的1/3,失效时间不好控制,一般为业务时间的2-3倍左右。
SET key value [EX seconds] [PX milliseconds] [NX|XX]
将字符串值
value
关联到
key
。
如果
key
已经持有其他值,
SET
就覆写旧值,无视类型。
对于某个原本带有生存时间(TTL)的键来说, 当
SET
命令成功在这个键上执行时, 这个键原有的 TTL 将被清除。
可选参数
从 Redis 2.6.12 版本开始,
SET
命令的行为可以通过一系列参数来修改:
-
EX second
:设置键的过期时间为
second
秒。
SET key value EX second
效果等同于
SETEX key second value
。 -
PX millisecond
:设置键的过期时间为
millisecond
毫秒。
SET key value PX millisecond
效果等同于
PSETEX key millisecond value
。 -
NX
:只在键不存在时,才对键进行设置操作。
SET key value NX
效果等同于
SETNX key value
。 -
XX
:只在键已经存在时,才对键进行设置操作。
因为
SET
命令可以通过参数来实现和
SETNX
、
SETEX
和
PSETEX
三个命令的效果,所以将来的 Redis 版本可能会废弃并最终移除
SETNX
、
SETEX
和
PSETEX
这三个命令。
可用版本:
>= 1.0.0
时间复杂度:
O(1)
返回值:
在 Redis 2.6.12 版本以前,
SET
命令总是返回
OK
。
从 Redis 2.6.12 版本开始,
SET
在设置操作成功完成时,才返回
OK
。
如果设置了
NX
或者
XX
,但因为条件没达到而造成设置操作未执行,那么命令返回空批量回复(NULL Bulk Reply)。
———————————————————————————————————————————————————–
Zookeeper实现分布式锁:
一:创建临时节点:
path改为protected
二:创建临时有序节点:
实现共享锁
1.IDEA创建maven项目,pom.xml添加如下依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>curator</groupId>
<artifactId>curator</artifactId>
<version>0.0.7</version>
</dependency>
<dependency>
<groupId>cn.itlym.shoulder</groupId>
<artifactId>lombok</artifactId>
<version>0.1</version>
</dependency>
2.鼠标右击,重新构建项目
3.创建CuratorCRUD类,和ZKShareLock类
public class CuratorCRUD {
public CuratorFramework curatorFramework;
public String ip=”192.168.159.151:2181,192.168.159.151:2182,192.168.159.151:2183″;
public CuratorCRUD() {
createZkCuratorConnection();
}
/**
* 创建客户端连接
*/
public void createZkCuratorConnection(){
curatorFramework=CuratorFrameworkFactory
.builder()
.connectString(ip)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.build();
curatorFramework.start();
}
/**
* 关闭客户端连接
*/
public void deleteZkCuratorConnection(){
curatorFramework.close();
}
}
public class ZKShareLock extends Thread{
private Object o=new Object();
private CuratorFramework curatorFramework;
private String basePath=”/ShareLocks”;
private String userName=basePath+”/User-“;
private String cname;//客户端名字
public ZKShareLock(CuratorFramework curatorFramework,String cname) {
this.curatorFramework = curatorFramework;
this.cname=cname;
}
@Override
public void run() {
try {
//创建节点并获取节点名字
//得到完整目录/ShareLocks/User-0000000092
String nodeName = curatorFramework
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(userName, cname.getBytes());
System.out.println(“创捷节点成功”);
System.out.println(nodeName);
System.out.println();
//获取目录下子节点
List<String> tempNodeNames = curatorFramework.getChildren().forPath(basePath);
List<String> nodeNames =new ArrayList();
for (int i = 0; i < tempNodeNames.size(); i++) {
String name =tempNodeNames.get(i);
name=basePath+”/”+name;
nodeNames.add(name);
}
Collections.sort(nodeNames);
int index=nodeNames.indexOf(nodeName);
System.out.printf(“index =%d \n”,index);
if(index==0){
doSomthings(nodeName);
unlock(nodeName);
}else {
addWatcherWithTreeCache(nodeNames.get(index-1));
synchronized (o){
o.wait();
}
doSomthings(nodeName);
unlock(nodeName);
}
} catch (Exception e) {
e.printStackTrace();
}
curatorFramework.close();
}
//添加监听器
public void addWatcherWithTreeCache(String path) throws Exception {
TreeCache treeCache=new TreeCache(curatorFramework,path);
TreeCacheListener treeCacheListene=new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
if(treeCacheEvent.getType()==TreeCacheEvent.Type.NODE_REMOVED){
System.out.printf(“%s delete\n”,path);
synchronized (o){
o.notify();
}
System.out.printf(“%s notify\n”,path);
}
}
};
treeCache.getListenable().addListener(treeCacheListene);
treeCache.start();
}
public void unlock(String nodeName){
System.out.printf(“%s unlock”,nodeName);
}
public void doSomthings(String nodeName){
System.out.printf(“%s doSomthings”,nodeName);
}
public static void main(String[] args) {
Date date=new Date();
for (int i = 0; i < 100; i++) {
new ZKShareLock(new CuratorCRUD().curatorFramework,”用户 “+i).start();
}
Date date1=new Date();
System.out.println(date1.getTime()-date.getTime());
}
}
运行之前,需要在zookeeper客户端创建/ShareLocks节点,且上面的ip修改为自己的集群ip
部分运行结果:
实现排他锁
Curator中封装了一种分布式可重入排他锁:InterProcessMutex
创建CuratorMutex类,并在zookeeper中创建/Mutex节点
public class CuratorMutex implements Runnable{
public CuratorFramework curatorFramework;
public String basePath;
public InterProcessLock processLock;
public int idx;
public CuratorMutex(CuratorFramework curatorFramework, String basePath, int idx) {
this.curatorFramework = curatorFramework;
this.basePath = basePath;
this.processLock=new InterProcessMutex(curatorFramework,basePath);
this.idx=idx;
}
@Override
public void run() {
Logger logger= Logger.getLogger(“”);
try {
//线程加锁
processLock.acquire(1000, TimeUnit.SECONDS);
logger.info(String.format(“线程%d获取锁”,idx));
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}finally {
//线程解锁
try {
processLock.release();
} catch (Exception e) {
e.printStackTrace();
}
logger.info(String.format(“线程%d释放锁”,idx));
}
}
public static void main(String[] args) {
CuratorFramework framework = new CuratorCRUD().curatorFramework;
for (int i = 0; i < 20; i++) {
new Thread(new CuratorMutex(framework,”/Mutex”,i)).start();
}
while (true){
}
}
}
部分运行结果:
下面将从源码里带大家讲解如何实现可重入排他锁
acquire方法内部实际实际上调用了internalLock方法
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn’t necessary
*/
//获取当前线程,并获取LockData锁信息
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
//lockCount自增,锁重入
lockData.lockCount.incrementAndGet();
return true;
}
//获取锁
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
//创建锁,并将锁信息存放到threadData这个Map中
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
public void release() throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn’t necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null )
{
throw new IllegalMonitorStateException(“You do not own the lock: ” + basePath);
}
//减少重入次数
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 )
{
return;
}
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException(“Lock count has gone negative for lock: ” + basePath);
}
try
{ //释放锁
internals.releaseLock(lockData.lockPath);
}
finally
{ //从Map中移除该线程
threadData.remove(currentThread);
}
}