程序员黑哥
2023-09-17 22:25
发表于湖南
收录于合集
#高并发1个
#redis25个
#面试97个
#雪崩1个
#编程95个
无论是在开发过程中还是在准备跑路的面试过程中,有关redis相关的,难免会涉及到四个特殊场景:缓存穿透、缓存雪崩、缓存击穿以及数据一致性。如果在开发中不注意这些场景的话,在高并发场景下有可能会导致系统崩溃,数据错乱等情况。现在,结合实际的业务场景来复现并解决这些问题。
相关技术:springboot2.2.2+mybatisplus3.1+redis5.0+hutool5.8
缓存穿透
缓存穿透是指查询缓存和数据库中都不存在的数据,导致所有的查询压力全部给到了数据库。
比如查询一篇文章信息并对其进行缓存,一般的逻辑是先查询缓存中是否存在该文章,如果存在则直接返回,否则再查询数据库并将查询结果进行缓存。
@Slf4j
@Service
public class DocumentInfoServiceImpl extends ServiceImpl<DocumentInfoMapper, DocumentInfo> implements DocumentInfoService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public DocumentInfo getDocumentDetail(int docId) {
String redisKey = "doc::info::" + docId;
String obj = stringRedisTemplate.opsForValue().get(redisKey);
DocumentInfo documentInfo = null;
if (StrUtil.isNotEmpty(obj)) { //缓存命中
log.info("==== select from cache ====");
documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
} else {
log.info("==== select from db ====");
documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
if (ObjectUtil.isNotNull(documentInfo)) { // 缓存结果
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);
}
}
return documentInfo;
}
}
@GetMapping("/doc/queryById")
public Result<DocumentInfo> queryById(@RequestParam(name = "docId") Integer docId) {
return Result.success(documentInfoService.getDocumentDetail(docId));
}
如果项目的并发量不大,这样写的话几乎没啥问题。如果项目的并发量很大,那么这就存在一个隐藏问题,如果在访问了一个不存在的文章(这个文章已经被分享出去,但是在后台可能是被删除或者下线状态),那么就会导致所有的请求全部需要到数据库中进行查询,从而给数据库造成压力,甚至造成宕机。
2023-01-05 10:18:57.954 INFO 19692 --- [nio-8081-exec-8] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:58.121 INFO 19692 --- [nio-8081-exec-5] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:58.350 INFO 19692 --- [io-8081-exec-10] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:58.519 INFO 19692 --- [nio-8081-exec-3] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:58.661 INFO 19692 --- [nio-8081-exec-6] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:58.859 INFO 19692 --- [nio-8081-exec-4] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:59.012 INFO 19692 --- [nio-8081-exec-9] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 10:18:59.154 INFO 19692 --- [nio-8081-exec-7] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
解决方案一:缓存空对象
针对缓存穿透问题缓存空对象可以有效避免所产生的影响,当查询一条不存在的数据时,在缓存中存储一个空对象并设置一个过期时间(设置过期时间是为了避免出现数据库中存在了数据但是缓存中仍然是空数据现象),这样可以避免所有请求全部查询数据库的情况。
// 查询对象不存在
if(StrUtil.equals(obj,"")){
log.info("==== select from cache , data not available ====");
return null;
}
if (StrUtil.isNotEmpty(obj)) {
log.info("==== select from cache ====");
documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
} else {
log.info("==== select from db ====");
documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
//如果数据不存在,则缓存一个空对象并设置过期时间
stringRedisTemplate.opsForValue().set(redisKey, ObjectUtil.isNotNull(documentInfo)?JSONUtil.toJsonStr(documentInfo):"", 5L, TimeUnit.SECONDS);
// if (ObjectUtil.isNotNull(documentInfo)) {
// stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);
// }
}
2023-01-05 13:15:01.057 INFO 16600 --- [nio-8081-exec-3] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ====
2023-01-05 13:15:01.214 INFO 16600 --- [nio-8081-exec-4] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ====
2023-01-05 13:15:01.384 INFO 16600 --- [nio-8081-exec-5] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ====
2023-01-05 13:15:01.540 INFO 16600 --- [nio-8081-exec-6] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ====
2023-01-05 13:15:01.720 INFO 16600 --- [nio-8081-exec-7] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ====
解决方案二:布隆过滤器
缓存空对象的缺点在于无论数据存不存在都需要查询一次数据库,并且redis中存储了大量的空数据,这个时候可以采用布隆过滤器来解决。布隆过滤器可以简单的理解为由一个很长的二进制数组结合n个hash算法计算出n个数组下标,将这些数据下标置为1。在查找数据时,再次通过n个hash算法计算出数组下标,如果这些下标的值为1,表示该值可能存在(存在hash冲突的原因),如果为0,则表示该值一定不存在。
/**
* 布隆过滤器添加元素伪代码
*/
BitArr[] bit = new BitArr[10000]; // 新建一个二进制数组
List<String> insertData = Arrays.asList("A", "B", "C"); // 待添加元素
for (String insertDatum : insertData) {
for (int i=1;i<=3;i++){ // 使用3中hash算法计算出3个数组下标
int bitIdx = hash_i(insertDatum); //hash1(insertDatum),hash2(insertDatum),hash3(insertDatum)
bit[bitIdx]=1; // 将下标元素置为1
}
}
/**
* 布隆过滤器查找元素伪代码
*/
BitArr[] bit = new BitArr[10000];
for (int i=1;i<=3;i++){
int bitIdx = hash_i("E"); //计算E的数组下标
if(bit[bitIdx]==0){ //如果对应的元素为0,则一定不存在
return false;
}
}
return true;
-
布隆过滤器的实现
在使用布隆过滤器时有两个核心参数,分别是预估的数据量size以及期望的误判率fpp,这两个参数我们可以根据自己的业务场景和数据量进行自主设置。在实现布隆过滤器时,有两个核心问题,分别是hash函数的选取个数n以及确定bit数组的大小len。
-
根据预估数据量size和误判率fpp,可以计算出bit数组的大小len。
-
根据预估数据量size和bit数组的长度大小len,可以计算出所需要的hash函数个数n。
单机版布隆过滤器
目前单机版的布隆过滤器实现方式有很多,比如Guava提供的BloomFilter,Hutool工具包中提供的BitMapBloomFilter等。以Guava为例,需要引入对应的依赖包,在BloomFilter类中提供了create方法来进行布隆过滤器的创建。
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
</dependency>
public static BloomFilter<Integer> localBloomFilter = BloomFilter.create(Funnels.integerFunnel(),10000L,0.01);
创建完成后,将需要筛选的数据同步到过滤器中。
/**
* 单机版布隆过滤器数据初始化
*/
@PostConstruct
public void initDocumentDataLocal(){
List<DocumentInfo> documentInfos = documentInfoService.lambdaQuery().select(DocumentInfo::getId).list();
if(CollUtil.isNotEmpty(documentInfos)){
documentInfos.stream().map(DocumentInfo::getId).forEach(e->{
BloomFilterUtil.localBloomFilter.put(e);
});
}
}
在业务代码中,可以直接调用BloomFilter提供的mightContain方法,判断目标docId是否可能存在于过滤器中,如果可能存在,那么继续向下执行业务逻辑,否则直接中断执行。
@Override
public DocumentInfo getDocumentDetail(int docId) {
//布隆过滤器拦截
boolean mightContain = BloomFilterUtil.localBloomFilter.mightContain(docId);
if(!mightContain){ //是否有可能存在于布隆过滤器中
log.info("==== select from bloomFilter , data not available ====");
return null;
}
String redisKey = "doc::info::" + docId;
String obj = stringRedisTemplate.opsForValue().get(redisKey);
DocumentInfo documentInfo = null;
if (StrUtil.isNotEmpty(obj)) {
log.info("==== select from cache ====");
documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
} else {
log.info("==== select from db ====");
documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
if(ObjectUtil.isNotNull(documentInfo)){
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);
}
}
return documentInfo;
}
自定义分布式版布隆过滤器
自定义分布式布隆过滤器的存储依赖于redis的bitmap数据结构来实现,另外还需要定义四个参数,分别为预估数据量size,误判率fpp,数组大小bitNum以及hash函数个数hashNum其中预估数据量和误判率需要配置在yml文件中。
@Resource
private StringRedisTemplate stringRedisTemplate;
@Value("${bloom.filter.size}")
private long size; // 预估数据量
@Value("${bloom.filter.fpp}")
private double fpp; // 误判率
private long bitNum; //数组大小len
private int hashNum; // hash函数个数size
根据上面的两个公式,计算出相应的数组长度以及所需的hash函数个数,并在redis设置一个布隆过滤器的key。
/*
* 计算出数组长度,hash函数个数并初始化数组
*/
@PostConstruct
private void initBloom() {
this.bitNum = getNumOfBits(size, fpp);
this.hashNum = getNumOfHashFun(size, bitNum);
//借助redis的bitmap来实现二进制数组
stringRedisTemplate.opsForValue().setBit("bloom::filter", bitNum, false);
}
/**
* 计算bit数组大小
*
* @param size
* @param fpp
* @return
*/
private long getNumOfBits(long size, double fpp) {
return (long) (-size * Math.log(fpp) / (Math.log(2) * Math.log(2)));
}
/**
* 计算所需的hash个数
*
* @param size
* @param numOfBits
* @return
*/
private int getNumOfHashFun(long size, long numOfBits) {
return Math.max(1, (int) Math.round((double) numOfBits / size * Math.log(2)));
}
另外,需要提供两个方法,分别为添加元素的putBloomFilterRedis方法和判断元素是否有可能存在的方法existBloomFilterRedis,其中的实现方式参考了guava。
/**
* 像自定义布隆过滤器中添加元素
* @param key
*/
public void putBloomFilterRedis(String key) {
long hash64 = HashUtil.metroHash64(key.getBytes());
int hash1 = (int) hash64;
int hash2 = (int) (hash64 >>> 32);
for (int i = 1; i <= hashNum; i++) {
/**
* 上面不是说,要使用n个hash函数吗??为啥这里直接用一个动态变量取乘积了呢???
* 不用担心,请看《Less Hashing, Same Performance: Building a Better Bloom Filter》,
* 里面论述了这种操作不会影响布隆过滤器的性能,毕竟hash的代价还是很大的,这算是个有效的优化手段吧:
* A standard technique from the hashing literature is to use two hash
* functions h(x) and h(x) to simulate additional hash functions of the form g(x) = h(x) + ih(x) .
*/
int combinedHash = hash1 + i * hash2;
if (combinedHash < 0) {
//如果为负数,则取反(保证结果为正数)
combinedHash = ~combinedHash;
}
// 计算出数组下标,并将下标值置为1
int bitIdx = (int) (combinedHash % bitNum);
stringRedisTemplate.opsForValue().setBit("bloom::filter", bitIdx, true);
}
}
/**
* 判断自定义布隆过滤器中元素是否有可能存在
* @param key
* @return
*/
public boolean existBloomFilterRedis(String key) {
long hash64 = HashUtil.metroHash64(key.getBytes());
int hash1 = (int) hash64;
int hash2 = (int) (hash64 >>> 32);
for (int i = 1; i <= hashNum; i++) {
int combinedHash = hash1 + i * hash2;
if (combinedHash < 0) {
combinedHash = ~combinedHash;
}
int bitIdx = (int) (combinedHash % bitNum);
//判断下标值是否为1,如果不为1直接返回false
Boolean bit = stringRedisTemplate.opsForValue().getBit("bloom::filter", bitIdx);
if (!bit) {
return false;
}
}
return true;
}
方法实现后,将所有的key值数据同步到redis中。
@Component
public class BloomFilterInitData {
@Resource
private BloomFilterUtil bloomFilterUtil;
@Resource
private DocumentInfoService documentInfoService;
@PostConstruct
public void initDocumentData(){
List<DocumentInfo> documentInfos = documentInfoService.lambdaQuery().select(DocumentInfo::getId).list();
if(CollUtil.isNotEmpty(documentInfos)){
documentInfos.stream().map(m -> {
return "doc::info::" + m.getId().intValue();
}).forEach(e->{
bloomFilterUtil.putBloomFilterRedis(e);
});
}
}
}
上面全部搞定后,启动项目并测试结果是否有效,在启动前先在数据表中搞几条测试数据。
@Override
public DocumentInfo getDocumentDetail(int docId) {
String redisKey = "doc::info::" + docId;
// 布隆过滤器中是否有可能存在这个key
boolean b = bloomFilterUtil.existBloomFilterRedis(redisKey);
if(!b){
// 如果不存在,直接返回空
log.info("==== select from bloomFilter , data not available ====");
return null;
}
String obj = stringRedisTemplate.opsForValue().get(redisKey);
DocumentInfo documentInfo = null;
if (StrUtil.isNotEmpty(obj)) {
log.info("==== select from cache ====");
documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
} else {
log.info("==== select from db ====");
documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
if(ObjectUtil.isNotNull(documentInfo)){
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);
}
}
return documentInfo;
}
查询存在的数据。
查询不存在的数据。
查看两次请求打印的log,不存在的数据成功被拦截掉了,避免再去查询数据库,即使存在一定的误判率,也几乎不会有啥影响,最多就是查询一次数据库。
虽然布隆过滤器可以有效的解决缓存穿透问题,并且实现的算法查找效率也很快。但是,也存在一定的缺点,由于存在hash冲突的原因,一方面存在一定的误判率(某个在过滤器中并不存在的key,但是通过hash计算出来的下标值都为1)。另一方面,删除比较困难(如果将一个数组位置为0,那么这个位置有可能也代表其他key的值,会影响到其他的key)。
缓存击穿
缓存击穿是指访问某个热点数据时,缓存中并不存在该数据或者缓存过期了,这个时候全部的请求压力给到了数据库。
基于上面的代码,来模拟短时间内进行并发请求,看看会不会将请求全部打到数据库。
public static void main(String[] args) throws InterruptedException {
/**
* 短时间内并发请求接口,并访问同一个数据
*/
ExecutorService executorService = Executors.newFixedThreadPool(1000);
CountDownLatch countDownLatch = new CountDownLatch(1000);
for (int i = 0; i < 1000; i++) {
executorService.execute(() -> {
HttpResponse response = HttpUtil.createGet("http://127.0.0.1:8081/doc/queryById?docId=1").execute();
System.out.println(response.body());
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
}
根据日志输出结果显示,请求确实给到了数据库。
针对缓存击穿问题,有两种解决方案,一种是对热点数据不设置过期时间,另一种是采用互斥锁的方式。
解决方案一:热点数据不设置过期时间
热点数据不设置过期时间,当后台更新热点数据数需要同步更新缓存中的数据,这种解决方式适用于不严格要求缓存一致性的场景。
解决方案二:使用互斥锁
如果是单机部署的环境下可以使用synchronized或lock来处理,保证同时只能有一个线程来查询数据库,其他线程可以等待数据缓存成功后在被唤醒,从而直接查询缓存即可。如果是分布式部署,可以采用分布式锁来实现互斥。
@Component
public class RedisLockUtil {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 模拟互斥锁
* @param key
* @param value
* @param exp
* @return
*/
public boolean tryLock(String key, String value, long exp) {
Boolean absent = stringRedisTemplate.opsForValue().setIfAbsent(key, value, exp, TimeUnit.SECONDS);
if (absent) {
return true;
}
return tryLock(key, value, exp); //如果线程没有获取锁,则在此处循环获取
}
/**
* 释放锁
* @param key
* @param value
*/
public void unLock(String key, String value) {
String s = stringRedisTemplate.opsForValue().get(key);
if (StrUtil.equals(s, value)) { //避免锁被其他线程误删
stringRedisTemplate.delete(key);
}
}
}
有了上面的两个方法,可以对业务代码进行改造,在查询数据库前进行加锁,读取完成后在释放锁。
@Override
public DocumentInfo getDocumentDetail(int docId) {
String redisKey = "doc::info::" + docId;
boolean b = bloomFilterUtil.existBloomFilterRedis(redisKey);
if (!b) {
log.info("==== select from bloomFilter , data not available ====");
return null;
}
String obj = stringRedisTemplate.opsForValue().get(redisKey);
DocumentInfo documentInfo = null;
if (StrUtil.isNotEmpty(obj)) {
log.info("==== select from cache ====");
documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
} else {
String s = UUID.randomUUID().toString(); //给锁加个标识,避免误删
String lockKey = redisKey+"::lock";
boolean lock = redisLockUtil.tryLock(lockKey, s, 60); //尝试加锁
if (lock) {
try {
//如果加锁成功,先再次查询缓存,有可能上一个线程查询并添加到缓存了
obj = stringRedisTemplate.opsForValue().get(redisKey);
if (StrUtil.isNotEmpty(obj)) {
log.info("==== select from cache ====");
documentInfo = JSONUtil.toBean(obj, DocumentInfo.class);
} else {
log.info("==== select from db ====");
documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one();
if (ObjectUtil.isNotNull(documentInfo)) {
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS);
}
}
} finally {
redisLockUtil.unLock(lockKey, s); //释放锁
}
}
}
return documentInfo;
}
一顿梭哈后,再次模拟并发查询,看看最终效果,理想的结果状态应该是查询一次数据库,后面的查询直接通过缓存获取。通过日志输出可以看出来,击穿问题被有效解决啦。
缓存雪崩
缓存雪崩是指对热点数据设置了相同的过期时间,在同一时间这些热点数据key大批量发生过期,请求全部转发到数据库,从而导致数据库压力骤增,甚至宕机。与缓存击穿不同的是,缓存击穿是单个热点数据过期,而缓存雪崩是大批量热点数据过期。
针对缓存雪崩问题,常见的解决方案有多种,比如设置随机的过期时间或者不设置过期时间,搭建高可用的缓存架构避免redis服务宕机,服务降级等。
解决方案一:设置随机的过期时间
将key的过期时间后面加上一个随机数,这个随机数值的范围可以根据自己的业务情况自行设定,这样可以让key均匀的失效,避免大批量的同时失效。
if (ObjectUtil.isNotNull(documentInfo)) {
//生成一个随机数
int randomInt = RandomUtil.randomInt(2, 10);
//过期时间+随机数
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L+randomInt, TimeUnit.SECONDS);
}
解决方案二:不设置过期时间
不设置过期时间时,需要注意的是,在更新数据库数据时,同时也需要更新缓存数据,否则数据会出现不一致的情况。这种方式比较适用于不严格要求缓存一致性的场景。
if (ObjectUtil.isNotNull(documentInfo)) {
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo));
}
解决方案三:搭建高可用集群
缓存服务故障时,也会触发缓存雪崩,为了避免因服务故障而发生的雪崩,推荐使用高可用的服务集群,这样即使发生故障,也可以进行故障转移。集群相关的在之前也有介绍过Redis高可用架构搭建到原理分析,这里就不再介绍了,如果想要理解,可以看这篇文章。
数据一致性
通常情况下,使用缓存的直接目的是为了提高系统的查询效率,减轻数据库的压力。一般情况下使用缓存是下面这几步骤:
-
查询缓存,数据是否存在
-
如果数据存在,直接返回
-
如果数据不存在,再查询数据库
-
如果数据库中数据存在,那么将该数据存入缓存并返回。如果不存在,返回空。
这么搞好像看上去并没有啥问题,那么会有一个细节问题:当一条数据存入缓存后,立刻又被修改了,那么这个时候缓存该如何更新呢。不更新肯定不行,这样导致了缓存中的数据与数据库中的数据不一致。一般情况下对于缓存更新有下面这几种情况:
-
先更新缓存,再更新数据库
-
先更新数据库,再更新缓存
-
先删除缓存,再更新数据库
-
先更新数据库,再删除缓存
先更新缓存,再更新数据库
先更新缓存,再更新数据库这种情况下,如果业务执行正常,不出现网络等问题,这么操作不会有啥问题,两边都可以更新成功。但是,如果缓存更新成功了,但是当更新数据库时或者在更新数据库之前出现了异常,导致数据库无法更新。这种情况下,缓存中的数据变成了一条实际不存在的假数据。
比如,在更新文章详情时,先修改了redis中的数据,在更新数据库前抛出一个异常来模拟数据库更新失败的场景。
public boolean updateDocument(DocumentInfo documentInfo) {
String redisKey = "doc::info::" + documentInfo.getId();
// 先更新缓存
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo));
// 模拟更新数据库前出现异常
int i = 1 / 0;
// 在更新数据库
boolean b = this.updateById(documentInfo);
return b;
}
@PostMapping("/doc/updateDocument")
public Result<Boolean> updateDocument(@RequestBody DocumentInfo documentInfo) {
return Result.success(documentInfoService.updateDocument(documentInfo));
}
先调用更新的接口,在调用查询的接口,结果发现查询的接口返回的并不是数据库中的实际数据,这个时候就造成了缓存与数据库数据不一致的情况。
先更新数据库,再更新缓存
先更新数据库,再更新缓存和先更新缓存,再更新数据库的情况基本一致,如果失败,会导致数据库中是最新的数据,缓存中是旧数据。还有一种极端情况,在高并发情况下容易出现数据覆盖的现象:A线程更新完数据库后,在要执行更新缓存的操作时,线程被阻塞了,这个时候线程B更新了数据库并成功更新了缓存,当B执行完成后线程A继续向下执行,那么最终线程B的数据会被覆盖。
@Override
public boolean updateDocument(DocumentInfo documentInfo) {
String redisKey = "doc::info::" + documentInfo.getId();
// 更新数据库
boolean b = this.updateById(documentInfo);
//以标题为标识,模拟线程阻塞。当一个请求的标题为‘模拟数据覆盖’时,线程停4秒
if(StrUtil.equals(documentInfo.getTitle(),"模拟数据覆盖")){
try {
Thread.sleep(4000);
}catch (Exception e){
}
}
// 更新缓存
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo));
// 模拟更新数据库前出现异常
return b;
}
先删除缓存,再更新数据库
先删除缓存,再更新数据库这种情况,如果并发量不大用起来不会有啥问题。但是在并发场景下会有这样的问题:线程A在删除缓存后,在写入数据库前发生了阻塞。这时线程B查询了这条数据,发现缓存中不存在,继而向数据库发起查询请求,并将查询结果缓存到了redis。当线程B执行完成后,线程A继续向下执行更新了数据库,那么这时缓存中的数据为旧数据,与数据库中的值不一致。
先更新数据库,再删除缓存
先更新数据库,再删除缓存也并不是绝对安全的,在高并发场景下,如果线程A查询一条在缓存中不存在的数据(这条数据有可能过期被删除了),查询数据库后在要将查询结果缓存到redis时发生了阻塞。这个时候线程B发起了更新请求,先更新了数据库,再次删除了缓存。当线程B执行成功后,线程A继续向下执行,将查询结果缓存到了redis中,那么此时缓存中的数据与数据库中的数据发生了不一致。
解决数据不一致方案
延时双删
延时双删,即在写数据库之前删除一次,写完数据库后,再删除一次,在第二次删除时,并不是立即删除,而是等待一定时间在做删除。
这个延时的功能可以使用mq来实现,这里为了省事,偷个懒,本地测试使用的延时队列来模拟mq达到延时效果。首先需要定义一个队列元素对象DoubleDeleteTask。
@Data
public class DoubleDeleteTask implements Delayed {
private String key; // 需要删除的key
private long time; //需要延迟的时间
public DoubleDeleteTask(String key, long time) {
this.key = key;
this.time = time + System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(time, ((DoubleDeleteTask) o).time);
}
}
然后定义一个队列并交给spring管理。
@Configuration
public class DoubleDeleteQueueConfig {
@Bean(name = "doubleDeleteQueue")
public DelayQueue<DoubleDeleteTask> doubleDeleteQueue() {
return new DelayQueue<DoubleDeleteTask>();
}
}
设置一个独立线程,特意用来处理延时的任务。如果数据删除失败,可以自定义重试次数以保证数据的一致性,但是也会带来一定的性能影响,如果在实际项目中,建议还是以异步的方式来实现重试。
@Slf4j
@Component
public class DoubleDeleteTaskRunner implements CommandLineRunner {
@Resource
private DelayQueue<DoubleDeleteTask> doubleDeleteQueue;
@Resource
private StringRedisTemplate stringRedisTemplate;
private static final int retryCount = 3; //失败重试次数
@Override
public void run(String... args) throws Exception {
new Thread(() -> {
try {
while (true) {
DoubleDeleteTask take = doubleDeleteQueue.take(); //取出队列元素
String key = take.getKey();
try {
stringRedisTemplate.delete(key);
log.info("====延时删除key:{}====",key);
} catch (Exception e) { //失败重试
int count = 1;
for (int i = 1; i <= retryCount; i++) {
if (count <= retryCount) {
log.info("====延时删除key:{},失败重试次数:{}====",key,count);
Boolean r = stringRedisTemplate.delete(key);
if (r) {
break;
} else {
count++;
}
}else
break;
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}, "double-delete-task").start();
}
}
使用延时队列,处理延时双删。最终测试,通过日志的打印可以确定实现了延时删除的功能。
@Override
public boolean updateDocument(DocumentInfo documentInfo) {
String redisKey = "doc::info::" + documentInfo.getId();
// 更新缓存
stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo));
// 更新数据库
boolean b = this.updateById(documentInfo);
// 再次延时删除缓存
doubleDeleteQueue.add(new DoubleDeleteTask(redisKey,2000L));
return b;
}
最后
在高并发的场景下,使用reids还是存在很多坑的,稍不注意就会出现缓存穿透,缓存雪崩等情况,严重的话可以直接造成服务宕机,所以在以后的开发中需要注意(如果项目没啥并发量的话,可以不用考虑😀😀😀)。