Redis分布式锁-SETNX实现

  • Post author:
  • Post category:其他


Redis有一系列以NX结尾的命令,NX是Not eXists的缩写,如SETNX命令就应该理解为:SET if not exists。

1. 用SETNX实现分布式锁

利用SETNX非常简单的实现分布式锁。例如:某客户端要获得一个名字foo的锁,客户端使用下面的命令进行获取:

SETNX lock.foo <current Unix time + lock timeout + 1>

1)

如返回1,则该进程获得锁

,把键lock.foo的值设置为锁的超时时间(当前时间 + 锁的有效时间),表示该键已被锁定,该客户端最后可以通过DEL lock.foo来释放该锁。

2)

如返回0,表明该锁已被其他进程取的

,这时我们可以先返回或进行重试等对方完成或等待锁超时。

2. 解决死锁

考虑一种情况,如果进程获得锁后,断开了与 Redis 的连接(可能是进程挂掉,或者网络中断),如果没有有效的释放锁的机制,那么其他进程都会处于一直等待的状态,即出现“死锁”。

上面在使用 SETNX 获得锁时,我们将键 lock.foo 的值设置为锁的有效时间,进程获得锁后,其他进程还会不断的检测锁是否已超时,如果超时,那么等待的进程也将有机会获得锁。

然而,锁超时时,我们不能简单地使用 DEL 命令删除键 lock.foo 以释放锁。考虑以下情况,进程P1已经首先获得了锁 lock.foo,然后进程P1挂掉了。进程P2,P3正在不断地检测锁是否已释放或者已超时,执行流程如下:

  • P2和P3进程读取键 lock.foo 的值,检测锁是否已超时(通过比较当前时间和键 lock.foo 的值来判断是否超时)
  • P2和P3进程发现锁 lock.foo 已超时
  • P2执行 DEL lock.foo命令
  • P2执行 SETNX lock.foo命令,并返回1,即P2获得锁
  • P3执行 DEL lock.foo命令将P2刚刚设置的键 lock.foo 删除(这步是由于P3刚才已检测到锁已超时)
  • P3执行 SETNX lock.foo命令,并返回1,即P3获得锁
  • P2和P3同时获得了锁

从上面的情况可以得知,在检测到锁超时后,进程不能直接简单地执行 DEL 删除键的操作以获得锁。

为了解决上述

算法

可能出现的多个进程同时获得锁的问题,我们再来看以下的算法。

我们同样假设进程P1已经首先获得了锁 lock.foo,然后进程P1挂掉了。接下来的情况:

  • 进程P4执行 SETNX lock.foo 以尝试获取锁
  • 由于进程P1已获得了锁,所以P4执行 SETNX lock.foo 返回0,即获取锁失败
  • P4执行 GET lock.foo 来检测锁是否已超时,如果没超时,则等待一段时间,再次检测
  • 如果P4检测到锁已超时,即当前的时间大于键 lock.foo 的值,P4会执行以下操作


    GETSET lock.foo <current Unix timestamp + lock timeout + 1>
  • 由于 GETSET 操作在设置键的值的同时,还会返回键的旧值,通过比较键 lock.foo 的旧值是否小于当前时间,可以判断进程是否已获得锁
  • 假如另一个进程P5也检测到锁已超时,并在P4之前执行了 GETSET 操作,那么P4的 GETSET 操作返回的是一个大于当前时间的时间戳,这样P4就不会获得锁而继续等待。注意到,即使P4接下来将键 lock.foo 的值设置了比P5设置的更大的值也没影响。

另外,值得注意的是,在进程释放锁,即执行 DEL lock.foo 操作前,需要先判断锁是否已超时。如果锁已超时,那么锁可能已由其他进程获得,这时直接执行 DEL lock.foo 操作会导致把其他进程已获得的锁释放掉

3. 伪代码如下:

1)

/**

* 缓存操作接口

*/

public interface Cache extends JedisCommands {


}

2)

public interface Lock {

void lock();

void lockInterruptibly() throws InterruptedException;

boolean tryLock();

boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

void unlock();

}

3)

/**

* 锁的骨架实现, 真正的获取锁的步骤由子类去实现.

*/

public abstract class AbstractLock implements Lock {

protected volatile boolean locked;

private Thread exclusiveOwnerThread;

@Override

public void lock() {


try {


//如果没获取到,一直获取

while (!lock(0, null, false)) {


Thread.sleep(500L);

}

} catch (InterruptedException e) {


Thread.currentThread().interrupt();

}

}

@Override

public void lockInterruptibly() throws InterruptedException {


lock(0, null, true);

}

@Override

public boolean tryLock(long time, TimeUnit unit) {


try {


return lock(time, unit, false);

} catch (InterruptedException e) {


Thread.currentThread().interrupt();

}

return false;

}

@Override

public boolean tryLock() {


try {


return lock(0, null, false);

} catch (InterruptedException e) {


Thread.currentThread().interrupt();

}

return false;

}

public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException {


return lock(time, unit, true);

}

@Override

public void unlock() {


// 检查当前线程是否持有锁

if (Thread.currentThread() == getExclusiveOwnerThread()) {


unlock0();

setExclusiveOwnerThread(null);

}

}

protected void setExclusiveOwnerThread(Thread thread) {


exclusiveOwnerThread = thread;

}

protected final Thread getExclusiveOwnerThread() {


return exclusiveOwnerThread;

}

protected abstract void unlock0();

/**

* 阻塞式获取锁的实现

*

* @param timeout

* @param unit

* @param interrupt 是否响应中断

* @return

* @throws InterruptedException

*/

protected abstract boolean lock(long timeout, TimeUnit unit, boolean interrupt)

throws InterruptedException;

}

4)

public class


RedisLock


extends AbstractLock {

private Cache cache;

private Random retryMillisRandom;

protected String lockKey;

protected long lockExpires;

/**

* @param cache

* @param lockKey

* @param lockExpires 锁的有效时长(毫秒)

*/

public RedisLock(Cache cache, String lockKey, long lockExpires) {


this.cache = cache;

this.lockKey = lockKey;

this.lockExpires = lockExpires;

this.retryMillisRandom = new Random(System.currentTimeMillis());

}

@Override

protected boolean


lock


(long timeout, TimeUnit unit, boolean interrupt) throws InterruptedException {

if (interrupt) {


checkInterruption();

}

long timeoutMillis = unit == null ? 0 : unit.toMillis(timeout);

while (timeoutMillis >= 0) {


if (interrupt) {


checkInterruption();

}

long lockExpireTime = System.currentTimeMillis() + lockExpires + 1;

String stringOfLockExpireTime = String.valueOf(lockExpireTime);

if (setNX(lockKey, stringOfLockExpireTime)) {


// 成功获取到锁, 设置相关标识

locked = true;

setExclusiveOwnerThread(Thread.currentThread());

return true;

}

String value = this.get(lockKey);

if (value != null && isTimeExpired(value)) {


String oldValue = this.getSet(lockKey, stringOfLockExpireTime);

if (oldValue != null && oldValue.equals(value)) {


locked = true;

setExclusiveOwnerThread(Thread.currentThread());

return true;

}

}

long delayMillis = randomDelay();

long sleepMillis = timeoutMillis < delayMillis ? timeoutMillis : delayMillis;

Thread.sleep(sleepMillis);

timeoutMillis = timeoutMillis – sleepMillis == 0 ? -1 : timeoutMillis – sleepMillis;

}

return false;

}

private long randomDelay() {


return retryMillisRandom.nextInt(50) + 50;

}

public boolean isLocked() {


if (locked) {


return true;

} else {


String value = cache.get(lockKey);

return !isTimeExpired(value);

}

}

@Override

protected void unlock0() {


// 判断锁是否过期

String value = cache.get(lockKey);

if (!isTimeExpired(value)) {


doUnlock();

}

}

private void checkInterruption() throws InterruptedException {


if (Thread.currentThread().isInterrupted()) {


throw new InterruptedException();

}

}

private boolean isTimeExpired(String value) {


return value == null || Long.parseLong(value) < System.currentTimeMillis();

}

private void doUnlock() {


cache.del(lockKey);

}

public String getLockKey() {


return lockKey;

}

private String get(final String key) {


return cache.get(key);

}

private boolean setNX(final String key, final String value) {


Long result = cache.setnx(key, value);

return result != null && result == 1;

}

private String getSet(final String key, final String value) {


return cache.getSet(key, value);

}

}

5)使用场景

@Service

public class TestServiceImpl{

@Autowired

private Cache cache;

public String testRedis(String userId){

String lockKey = “CACHE_KEY_”+userId;

Lock lock = new RedisLock(cache, lockKey, 1000);

try {


if (lock.tryLock()) {


//业务逻辑

}else {


throw new BusinessException(“排队人多,请稍后重试…”):

}

} finally {


lock.unlock();

}

}

}

4.


总结

1) setNX:先通过JedisCommands的setNX(key,value)方法,设置指定key锁的值即失效时间,如果redis中不存在该值,则返回值为1,设置成功获得该锁;如果返回0,继续走第二步->

2) get:通过JedisCommands的get(key)方法,获取指定key锁的值即失效时间,与当前时间做比较,如果大于当前时间表示还未失效,当前线程不能获得锁;如果小于当前时间即为失效,继续走第三步->

3) getSet:通过JedisCommands的getSet(key,value)方法,设置指定key锁的值即新的失效时间,同时返回键的旧值,两种方法判断是否能获得锁:a) 将第二步get方法获取到的健值与当前getSet方法获取的健值比较,如果相等说明中途没有进程抢占该锁,该进程可以获得锁 ;b) 将当前getSet 方法获得的最新失效时间与当前时间做比较,如果小于当前时间,说明中途没有进程抢占该锁也没有通过getSet设置新的失效时间,则进程可以获得锁。



版权声明:本文为qq_34896887原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。