Redis学习
一:setnx锁问题和分布式锁redission
1.setnx 锁问题
基于setnx实现的分布式锁存在下面的问题:
不可重入
:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。
不可重试
:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。
**超时释放:**锁超时释放虽然可以避免死锁,但如果业务耗时较长,也会导致锁释放,存在着安全隐患。
主从一致性:
如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。概率低,同步是毫秒级.
2.redisson
2.1 定义
那么什么是Redission呢?
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
简单说就是redis在分布式系统上工具的集合
,Redission提供了分布式锁的多种多样的功能.
2.2 快速入门
引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
配置Redisson客户端:
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
// 配置单节点
Config config = new Config();
//多节点config.useClusterServers()
config.useSingleServer().setAddress("redis://192.168.150.101:6379")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
如何使用Redission的分布式锁
@Resource
private RedissionClient redissonClient;
@Test
void testRedisson() throws Exception{
//获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
//判断获取锁成功
if(isLock){
try{
System.out.println("执行业务");
}finally{
//释放锁
lock.unlock();
}
}
}
在 VoucherOrderServiceImpl
注入RedissonClient
@Resource
private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
//创建锁对象 这个代码不用了,因为我们现在要使用分布式锁
//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userId);
//获取锁对象
boolean isLock = lock.tryLock();
//加锁失败
if (!isLock) {
return Result.fail("不允许重复下单");
}
try {
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
//释放锁
lock.unlock();
}
}
二:redission可重入锁原理
1.synchronized 和 Lock 可重入原理
在Lock锁中,他是借助于底层的一个voaltile的一个state变量来记录重入的状态的,比如当前没有人持有这把锁,那么state=0,假如有人持有这把锁,那么state=1,如果持有这把锁的人再次持有这把锁,那么state就会+1 ;
对于synchronized而言,他在底层语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有。
2.redission 可重入原理
2.1可重入锁示例
@Autowired
private RedissonClient redissonClient;
public void method1() {
RLock lock = redissonClient.getLock("lock");
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败,1");
}
try {
log.info("获取锁成功,1");
method2();
}finally {
log.info("释放锁,1");
lock.unlock();
}
}
public void method2() {
RLock lock = redissonClient.getLock("lock");
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败,2");
}
try {
log.info("获取锁成功,2");
}finally {
log.info("释放锁,2");
lock.unlock();
}
}
因为setnx 无法实现可重入,所以底层使用hash结构来进行存储.原理类似Lock
key | field | Value |
---|---|---|
lock | thread_id | 1 |
锁名称 | 锁唯一标识 | 锁值 入锁加一,出锁减1.为0 删除锁 |
2.2 tryLock
获取锁的Lua脚本源码
-- 判断锁是否存在
if (redis.call('exists', KEYS[1]) == 0) then
-- 不存在 获取锁
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 设置有效器
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 锁存在,判断threadId是否属于自己
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 设置有效期
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
2.3 unLock
释放锁的Lua脚本源码
local key = KEYS[1] -- 锁的key
local threadId = ARGV[1] -- 线程唯一标识
local releaseTime = ARGV[2] -- 锁的自动释放时间
-- 判断锁是否还被自己持有
if (redis.call('hexists', key,threadId) == 0) then
-- 不是自己的锁 直接返回
return nil;
end;
--是自己的锁,重入次数 -1
local count = redis.call('hincrby',key,threadId,-1);
--判断重入次数是否为0
if (count > 0) then
-- 大于0,说明不能释放锁,重置有效期然后返回
redis.call('expire', key,releaseTime);
return nil;
else
redis.call("del",key);
return nil;
end
2.4 可重入逻辑图
三:redission可重试锁原理
//源码
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
//1.将等待时间转换为毫秒数,获取当前的线程
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
//2.尝试获取锁,返回null 代表没有锁,返回有值标识锁的过期时间
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// 3.成功获取锁
if (ttl == null) {
return true;
}
//4.尝试获取锁耗时超过了等待时间,确认失败
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
//5.消息队列 订阅了其他线程释放锁的信号
//在unlock 脚本中 有一个 redis.call('publish',key[2],argv[1])
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
//6.当这个future 在指定时间内完成,返回true,否则false
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
//等到最大等待时间结束,还没有等到,取消订阅,返回false
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
//7.再次判断时间是否超出
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
//todo 8.开始锁重试
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
//9.利用信号量来进行获取
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
//取消订阅
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
四:redission看门狗 WatchDog续约
场景
Redisson锁重试的问题是解决了, 但是总会发生一些问题, 如果我们的业务阻塞超时了ttl到期了, 别的线程看见我们的ttl到期了, 他重试他就会拿到本该属于我们的锁, 这时候就有安全问题了, 所以该怎么解决?
我们必须确保锁是业务执行完释放的, 而不是因为阻塞而释放的
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//自定义了时间
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//2.默认过期实践 30s 看门狗
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
//3. 当ttlRemainingFuture的异步尝试获取锁完成以后,
//先判断执行过程中是否有异常, 如果有异常就直接返回了结束执行.
//如果没有发生异常, 则判断ttlRemaining(剩余有效期)是否为空,
//为空的话就代表获取锁成功, 执行锁到期续约的核心方法scheduleExpectationRenew
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
//更新有效期,内部源码 通过定时任务每隔10s,定时重置有效期
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
那么什么时候释放锁呢?
当然是在释放锁的时候,具体连接如下:
总结
执行流程
五:redission锁的MutiLock原理
为了提高redis的可用性,我们会搭建集群或者主从,以主从为例,此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。
为了解决这个问题,redission提出来了MutiLock锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。
WatchDog 机制 和 MutiLock原理
六:总结
-
以上内容参考来源:
黑马Redis
; -
不可重入的redis锁分布式锁:
-
原理
:利用setnx的互斥性,利用ex避免死锁,释放锁时判断线程标示; -
缺陷
:不可重入,无法重试,锁超时失效
-
-
可重入的redis锁分布式锁 redisson:
-
原理
:利用hash结构,记录线程标识和重入次数,利用watchDog延续锁时间,利用信号量控制锁重试; -
缺陷:
redis 宕机引起的锁失效问题;
-
-
Redisson 的 multiLock
-
原理
:多个独立的redis节点,必须在所有节点都获取重入锁,才算获取成功; -
缺陷
:运维成本高
-