解决redis缓存和数据库的数据一致性问题

  • Post author:
  • Post category:其他


为了解决缓存和数据库的数据一致性问题 – 分布式读写锁



分布式读写锁

使用分布式读写锁可以完美解决缓存数据不一致的问题,想要读数据必须等待写数据整个操作完成。

使用阿里中间件canal: 数据同步中间件,可以用于实时监听数据库的数据变动操作,原理是伪造成数据库的从节点,订阅binlog日志来实现实时监听。

这样当数据变动的时候,我们可以根据监听的具体操作去更新缓存,达到缓存数据一致性,但可能存在一定延迟。

canal在大数据系统中可以用来解决数据异构问题,可以根据订阅表的相关操作进行数据整合和分析,生成想要的结构数据,比如监听每个用户的浏览访问记录,进行整合分析,生成用户推荐表,这样用户浏览的时候便可以看到自己喜欢的内容。

利用分布式锁我们可以很好的解决缓存击穿和数据一致性问题。

自己手写一个给予redis的分布式锁

在分布式环境下,我们可以使用 redis 来实现分布式锁:

定义一个 key 的值。

当请求进来,判断 redis 中是否存在此 key,若不存在,新增 key,生成一个随机值作为value,并设置过期时间,业务执行之后,获取 key 对应的 value 与之前生成的 value 比较,若相等,删除 key;

若 key 存在,等待一段时间,再次尝试获取,直到获取到为止。

redis 的每个命令都是原子性的,所以我们不用考虑在命令执行的过程中被其他进程干扰的问题。同时加锁操作和解锁操作必须保证其原子性,否则在大并发情况下就会出现问题。

加锁 = 判断 key + 新增 key + 设置过期时间

解锁 = 判断 key 的 value 是否一致 + 删除 key

加锁操作 redis 有原生的命令可以支持,解锁操作则需要使用 lua 脚本解决,否则无法保证其原子


public void testLock() {
    String uuid = UUID.randomUUID().toString();
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",uuid,300,TimeUnit.SECONDS);    
    if (lock) { 
       System.out.println("获取分布式锁成功...");       
       try {          
           //执行业务代码。。。
       } finally {
           String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";               
           //删除锁
           redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class) , Arrays.asList("lock"), uuid);
       }
    } else {
       System.out.println("获取分布式锁失败...等待重试");       
       try {
            Thread.sleep(200); 
       } catch (Exception ignored){}       
       //自旋获取锁
       testLock();
    }
}

使用 redisson 实现分布式锁

前面我们自己利用 redis 实现了分布式锁的基本功能,虽然比较简陋,但有助于我们理解其中的原理,在实际的开发中,我们可以直接利用 Redisson 来实现相关功能!

Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格(In-Memory Data Grid)。充分利用了 Redis 键值数据库提供的一系列优势,基于Java 实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工 具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,进一步简化了分布式环境中程序相互之间的协作。



springboot 集成 Redisson


<dependency>
     <groupId>org.redission</groupId>
     <artifactId>redission</artifactId>
     <version>3.12.0</version>
 </dependency>

配置 MyRedissonConfig:


@Configuration
public class MyRedissonConfig {    
    @Bean    
    public RedissonClient redisson() throws IOException {        
        //1、创建配置
        Config config = new Config();        
        //集群模式
        /* config.useClusterServers()
              .addNodeAddress("127.0.0.1:6379","127.0.0.1:6380")*/
       //单点模式
       //Redis url should start with redis:// or rediss://(启用SSL)
        config.useSingleServer()
              .setAddress("redis://127.0.0.1:6379");        
        //2、根据Config创建出RedissonClient实例
        RedissonClient redissonClient = Redisson.create(config);        
        return redissonClient;
    }
}

引入 redissonClient 使用:


@Autowired
private RedissonClient redissonClient;
@Test
private void testLock() {    
    //获取一把锁,只要锁名字相同,就是一把锁
    RLock lock = redisson.getLock("my-lock");    
    //加锁
    lock.lock();
    // 尝试加锁,最多等待 100 秒,上锁以后 10 秒自动解锁 
    //boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
    // 加锁以后 10 秒钟自动解锁
    //lock.lock(10, TimeUnit.SECONDS);    
    try {      
        //执行业务代码
    } finally {        
        //解锁    
        lock.unlock()
    }
}

也许大家会有疑问,在使用 lock 方法没有指定过期时间的情况下,如果负责储存分布式锁的 redisson 节点宕机后,这个锁正好处于锁定状态,会出现锁死的情况,为了避免这种情况的发生,redission 对锁设置了默认的过期时间为30s,可以通过修改Config.lockWatchdogTimeout指定,所以就算宕机,锁也会在指定时间内过期,也就不会出现死锁的情况。

redisson 内部还提供了一个监控锁的看门狗,它的作用是在 redisson 实例被关闭前,不断对锁进行续期,如果你的业务执行时间较长,它可以为锁自动续期,保证业务执行完毕后再释放锁,不被其他进程干扰,但一般情况下,我们会事先评估业务完成所需要的时间,设置锁的过期时间>=业务完成的时间,因为自动续期会消耗一定的性能,不过最终采用何种方式加锁具体还要根据项目需求而定。

此外,redisson 还支持读写锁 ReadWriteLock ,闭锁 countDownLatch,信号量 Semaphore 等,用法跟 java.util.concurrent 包下对应类的用法基本是一样的,这里就不再一一列举啦,同学们可以自行了解。



版权声明:本文为LC_Liangchao原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。