Redis有一系列以NX结尾的命令,NX是Not eXists的缩写,如SETNX命令就应该理解为:SET if not exists。
1. 用SETNX实现分布式锁
利用SETNX非常简单的实现分布式锁。例如:某客户端要获得一个名字foo的锁,客户端使用下面的命令进行获取:
SETNX lock.foo <current Unix time + lock timeout + 1>
1)
如返回1,则该进程获得锁
,把键lock.foo的值设置为锁的超时时间(当前时间 + 锁的有效时间),表示该键已被锁定,该客户端最后可以通过DEL lock.foo来释放该锁。
2)
如返回0,表明该锁已被其他进程取的
,这时我们可以先返回或进行重试等对方完成或等待锁超时。
2. 解决死锁
考虑一种情况,如果进程获得锁后,断开了与 Redis 的连接(可能是进程挂掉,或者网络中断),如果没有有效的释放锁的机制,那么其他进程都会处于一直等待的状态,即出现“死锁”。
上面在使用 SETNX 获得锁时,我们将键 lock.foo 的值设置为锁的有效时间,进程获得锁后,其他进程还会不断的检测锁是否已超时,如果超时,那么等待的进程也将有机会获得锁。
然而,锁超时时,我们不能简单地使用 DEL 命令删除键 lock.foo 以释放锁。考虑以下情况,进程P1已经首先获得了锁 lock.foo,然后进程P1挂掉了。进程P2,P3正在不断地检测锁是否已释放或者已超时,执行流程如下:
- P2和P3进程读取键 lock.foo 的值,检测锁是否已超时(通过比较当前时间和键 lock.foo 的值来判断是否超时)
- P2和P3进程发现锁 lock.foo 已超时
- P2执行 DEL lock.foo命令
- P2执行 SETNX lock.foo命令,并返回1,即P2获得锁
- P3执行 DEL lock.foo命令将P2刚刚设置的键 lock.foo 删除(这步是由于P3刚才已检测到锁已超时)
- P3执行 SETNX lock.foo命令,并返回1,即P3获得锁
- P2和P3同时获得了锁
从上面的情况可以得知,在检测到锁超时后,进程不能直接简单地执行 DEL 删除键的操作以获得锁。
为了解决上述
算法
可能出现的多个进程同时获得锁的问题,我们再来看以下的算法。
我们同样假设进程P1已经首先获得了锁 lock.foo,然后进程P1挂掉了。接下来的情况:
- 进程P4执行 SETNX lock.foo 以尝试获取锁
- 由于进程P1已获得了锁,所以P4执行 SETNX lock.foo 返回0,即获取锁失败
- P4执行 GET lock.foo 来检测锁是否已超时,如果没超时,则等待一段时间,再次检测
-
如果P4检测到锁已超时,即当前的时间大于键 lock.foo 的值,P4会执行以下操作
GETSET lock.foo <current Unix timestamp + lock timeout + 1>
- 由于 GETSET 操作在设置键的值的同时,还会返回键的旧值,通过比较键 lock.foo 的旧值是否小于当前时间,可以判断进程是否已获得锁
- 假如另一个进程P5也检测到锁已超时,并在P4之前执行了 GETSET 操作,那么P4的 GETSET 操作返回的是一个大于当前时间的时间戳,这样P4就不会获得锁而继续等待。注意到,即使P4接下来将键 lock.foo 的值设置了比P5设置的更大的值也没影响。
另外,值得注意的是,在进程释放锁,即执行 DEL lock.foo 操作前,需要先判断锁是否已超时。如果锁已超时,那么锁可能已由其他进程获得,这时直接执行 DEL lock.foo 操作会导致把其他进程已获得的锁释放掉
3. 伪代码如下:
1)
/**
* 缓存操作接口
*/
public interface Cache extends JedisCommands {
}
2)
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
}
3)
/**
* 锁的骨架实现, 真正的获取锁的步骤由子类去实现.
*/
public abstract class AbstractLock implements Lock {
protected volatile boolean locked;
private Thread exclusiveOwnerThread;
@Override
public void lock() {
try {
//如果没获取到,一直获取
while (!lock(0, null, false)) {
Thread.sleep(500L);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lock(0, null, true);
}
@Override
public boolean tryLock(long time, TimeUnit unit) {
try {
return lock(time, unit, false);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
@Override
public boolean tryLock() {
try {
return lock(0, null, false);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException {
return lock(time, unit, true);
}
@Override
public void unlock() {
// 检查当前线程是否持有锁
if (Thread.currentThread() == getExclusiveOwnerThread()) {
unlock0();
setExclusiveOwnerThread(null);
}
}
protected void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
protected abstract void unlock0();
/**
* 阻塞式获取锁的实现
*
* @param timeout
* @param unit
* @param interrupt 是否响应中断
* @return
* @throws InterruptedException
*/
protected abstract boolean lock(long timeout, TimeUnit unit, boolean interrupt)
throws InterruptedException;
}
4)
public class
RedisLock
extends AbstractLock {
private Cache cache;
private Random retryMillisRandom;
protected String lockKey;
protected long lockExpires;
/**
* @param cache
* @param lockKey
* @param lockExpires 锁的有效时长(毫秒)
*/
public RedisLock(Cache cache, String lockKey, long lockExpires) {
this.cache = cache;
this.lockKey = lockKey;
this.lockExpires = lockExpires;
this.retryMillisRandom = new Random(System.currentTimeMillis());
}
@Override
protected boolean
lock
(long timeout, TimeUnit unit, boolean interrupt) throws InterruptedException {
if (interrupt) {
checkInterruption();
}
long timeoutMillis = unit == null ? 0 : unit.toMillis(timeout);
while (timeoutMillis >= 0) {
if (interrupt) {
checkInterruption();
}
long lockExpireTime = System.currentTimeMillis() + lockExpires + 1;
String stringOfLockExpireTime = String.valueOf(lockExpireTime);
if (setNX(lockKey, stringOfLockExpireTime)) {
// 成功获取到锁, 设置相关标识
locked = true;
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
String value = this.get(lockKey);
if (value != null && isTimeExpired(value)) {
String oldValue = this.getSet(lockKey, stringOfLockExpireTime);
if (oldValue != null && oldValue.equals(value)) {
locked = true;
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
}
long delayMillis = randomDelay();
long sleepMillis = timeoutMillis < delayMillis ? timeoutMillis : delayMillis;
Thread.sleep(sleepMillis);
timeoutMillis = timeoutMillis – sleepMillis == 0 ? -1 : timeoutMillis – sleepMillis;
}
return false;
}
private long randomDelay() {
return retryMillisRandom.nextInt(50) + 50;
}
public boolean isLocked() {
if (locked) {
return true;
} else {
String value = cache.get(lockKey);
return !isTimeExpired(value);
}
}
@Override
protected void unlock0() {
// 判断锁是否过期
String value = cache.get(lockKey);
if (!isTimeExpired(value)) {
doUnlock();
}
}
private void checkInterruption() throws InterruptedException {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
}
private boolean isTimeExpired(String value) {
return value == null || Long.parseLong(value) < System.currentTimeMillis();
}
private void doUnlock() {
cache.del(lockKey);
}
public String getLockKey() {
return lockKey;
}
private String get(final String key) {
return cache.get(key);
}
private boolean setNX(final String key, final String value) {
Long result = cache.setnx(key, value);
return result != null && result == 1;
}
private String getSet(final String key, final String value) {
return cache.getSet(key, value);
}
}
5)使用场景
@Service
public class TestServiceImpl{
@Autowired
private Cache cache;
public String testRedis(String userId){
String lockKey = “CACHE_KEY_”+userId;
Lock lock = new RedisLock(cache, lockKey, 1000);
try {
if (lock.tryLock()) {
//业务逻辑
}else {
throw new BusinessException(“排队人多,请稍后重试…”):
}
} finally {
lock.unlock();
}
}
}
4.
总结
1) setNX:先通过JedisCommands的setNX(key,value)方法,设置指定key锁的值即失效时间,如果redis中不存在该值,则返回值为1,设置成功获得该锁;如果返回0,继续走第二步->
2) get:通过JedisCommands的get(key)方法,获取指定key锁的值即失效时间,与当前时间做比较,如果大于当前时间表示还未失效,当前线程不能获得锁;如果小于当前时间即为失效,继续走第三步->
3) getSet:通过JedisCommands的getSet(key,value)方法,设置指定key锁的值即新的失效时间,同时返回键的旧值,两种方法判断是否能获得锁:a) 将第二步get方法获取到的健值与当前getSet方法获取的健值比较,如果相等说明中途没有进程抢占该锁,该进程可以获得锁 ;b) 将当前getSet 方法获得的最新失效时间与当前时间做比较,如果小于当前时间,说明中途没有进程抢占该锁也没有通过getSet设置新的失效时间,则进程可以获得锁。