教你从0到1搭建秒杀系统-缓存与数据库双写一致

  • Post author:
  • Post category:其他


本文是秒杀系统的第四篇,我们来讨论秒杀系统中缓存热点数据的问题,进一步延伸到数据库和缓存的双写一致性问题。

在秒杀实际的业务中,一定有很多需要做缓存的场景,比如售卖的商品,包括名称,详情等。访问量很大的数据,可以算是“热点”数据了,尤其是一些读取量远大于写入量的数据,更应该被缓存,而不应该让请求打到数据库上。

当然并不是所有的数据都需要进行缓存,那么一般哪些数据适合缓存呢?缓存量大但又不常变化的数据,比如详情,评论等适合缓存。对于那些经常变化的数据,其实并不适合缓存,一方面会增加系统的复杂性(缓存的更新,缓存脏数据),另一方面也给系统带来一定的不稳定性(缓存系统的维护)。上缓存之后,可以给我们带来一定的好处:

  • 能够缩短服务的响应时间,给用户带来更好的体验;
  • 能够增大系统的吞吐量,依然能够提升用户体验;
  • 减轻数据库的压力,防止高峰期数据库被压垮,导致整个线上服务OOM。

但是上了缓存,也会引入很多额外的问题:

  • 缓存有多种选型,是内存缓存,memcached还是redis,你是否都熟悉,如果不熟悉,无疑增加了维护的难度(本来是个纯洁的数据库系统);
  • 缓存系统也要考虑分布式,比如redis的分布式缓存还会有很多坑,无疑增加了系统的复杂性;
  • 在特殊场景下,如果对缓存的准确性有非常高的要求,就必须考虑缓存和数据库的一致性问题。

本文想要重点讨论的,就是缓存和数据库的一致性问题。



缓存和数据库双写一致性

在读取缓存方面,大家没啥疑问,都是按照下图的流程来进行业务操作:

在这里插入图片描述

但是在更新缓存方面,对于更新完数据库,再更新缓存呢,还是删除缓存。又或者是先删除缓存,再更新数据库,其实大家存在很大的争议。从理论上来说,给缓存设置过期时间,是保证最终一致性的解决方案。这种方案下,我们可以对存入缓存的数据设置过期时间,所有的写操作以数据库为准,对缓存操作只是尽最大努力即可。也就是说如果数据库写成功,缓存更新失败,那么只要到达过期时间,则后面的读请求自然会从数据库中读取新值然后回填缓存。因此,接下来讨论的思路不依赖于给缓存设置过期时间这个方案。我们讨论主要讨论以下三种更新策略:

  • 先更新数据库,再更新缓存
  • 删除缓存,再更新数据库
  • 先更新数据库,再删除缓存



先更新数据库,再更新缓存

这套方案,大家是普遍反对的。为什么呢?我们从以下两个方面来进行说明。



线程安全

假设同时有请求A和请求B进行更新操作,那么会出现以下情况:

  1. 线程A更新了数据库
  2. 线程B更新了数据库
  3. 线程B更新了缓存
  4. 线程A更新了缓存

请求A更新缓存应该比请求B更新缓存早才对,但是因为网络等原因,B却比A更早更新了缓存。这就导致了脏数据,因此不考虑。



业务场景
  1. 如果是写数据库比较多,而读数据比较少的业务需求,采用这种方案就会导致数据压根还没读到,缓存就被频繁的更新,浪费性能;
  2. 如果写入数据库的值,并不是直接写入缓存的,而是要经过一系列复杂的计算再写入缓存,那么每次写入数据库后,都再次计算写入缓存的值,无疑是浪费性能的。显然,删除缓存更为适合。



删除缓存,再更新数据库

假设同时有一个请求A进行更新操作,另一个请求B进行查询操作。那么会出现如下情形:

  1. 请求A进行写操作,删除缓存
  2. 请求B查询发现缓存不存在
  3. 请求B去数据库查询得到旧值
  4. 请求B将旧值写入缓存
  5. 请求A将新值写入数据库

上述情况会导致数据不一致的情形。如果不采用给缓存设置过期时间策略,该数据永远都是脏数据。这种情况下我们就可以采用延时双删策略:先淘汰缓存,再写数据库最后再休眠1秒,再次淘汰缓存。当然这个休眠的时间,读者应该自行评估自己的项目的读数据业务逻辑的耗时,然后写数据的休眠时间则在读数据业务逻辑的耗时基础上,加几百ms即可。这么做的目的,就是确保读请求结束,写请求可以删除读请求造成的缓存脏数据。

有的人就会想到,如果我使用的是mysql的读写分离架构怎么办?在这种情况下,造成数据不一致的原因如下,还是两个请求,一个请求A进行更新操作,另一个请求B进行查询操作:

  1. 请求A进行写操作,删除缓存
  2. 请求A将数据写入数据库了
  3. 请求B查询缓存发现,缓存没有值
  4. 请求B去从库查询,这时,还没有完成主从同步,因此查询到的是旧值
  5. 请求B将旧值写入缓存
  6. 数据库完成主从同步,从库变为新值

这种情况还是使用双删延时策略。只是睡眠时间修改为在主从同步的延时时间基础上,加几百ms。那如果采用这种同步淘汰策略,吞吐量降低怎么办?我们可以将第二次删除作为异步的,自己起一个线程,异步删除。这样,写的请求就不用沉睡一段时间后再返回,加大吞吐量。那如果第二次删除删除失败怎么办?第二次删除失败,就会出现如下情形。还是有两个请求,一个请求A进行更新操作,另一个请求B进行查询操作,为了方便,假设是单库:

  1. 请求A进行写操作,删除缓存
  2. 请求B查询发现缓存不存在
  3. 请求B去数据库查询得到旧值
  4. 请求B将旧值写入缓存
  5. 请求A将新值写入数据库
  6. 请求A试图去删除请求B写入对缓存值,结果失败了。

如果第二次删除缓存失败,会再次出现缓存和数据库不一致的问题。那么如何解决呢?请你继续往下看。



先更新数据库,再删除缓存

假设这会有两个请求,一个请求A做查询操作,一个请求B做更新操作,那么会有如下情形产生:

  1. 缓存刚好失效
  2. 请求A查询数据库,得一个旧值
  3. 请求B将新值写入数据库
  4. 请求B删除缓存
  5. 请求A将查到的旧值写入缓存

如果发生上述情况,确实是会发生脏数据。但是发生这样的情况的条件是这样的:步骤3的写数据库操作比步骤2的读数据库操作耗时更短,才有可能使得步骤4先于步骤5。但是实际上数据库的读操作的速度远快于写操作的,因此步骤3耗时比步骤2更短,这一情形很难出现。那如果真的出现了怎么办呢?首先,给缓存设有效时间是一种方案。其次,采用先删除缓存,再更新数据库策略里给出的异步延时删除策略,保证读请求完成以后,再进行删除操作。这样又回到上一个策略中遗留的问题:第二次删除缓存失败怎么办?提供一个保障的重试机制即可,这里给出两套方案。



方案一

在这里插入图片描述

如上图,我们简化一下步骤:

  1. 更新数据库数据;
  2. 缓存因为种种问题删除失败
  3. 将需要删除的key发送至消息队列
  4. 自己消费消息,获得需要删除的key
  5. 继续重试删除操作,直到成功

该方案有一个缺点,对业务线代码造成大量的侵入。于是有了方案二,在方案二中,启动一个订阅程序去订阅数据库的binlog,获得需要操作的数据。在应用程序中,另起一段程序,获得这个订阅程序传来的信息,进行删除缓存操作。



方案二

在这里插入图片描述

如上图,我们简化一下步骤:

  1. 更新数据库数据
  2. 数据库会将操作信息写入binlog日志当中
  3. 订阅程序提取出所需要的数据以及key
  4. 另起一段非业务代码,获得该信息
  5. 尝试删除缓存操作,发现删除失败
  6. 将这些信息发送至消息队列
  7. 重新从消息队列中获得该数据,重试操作。

读取binlog的中间件,可以采用阿里开源的canal。到这里我们已经把缓存双写一致性的思路彻底梳理了一遍,下面对这几种思路在我们原来代码的基础上进行代码实战,方便有需要的朋友参考。



秒杀实战



先删除缓存,再更新数据库

我们在秒杀项目的代码上OrderController中增加接口:先删除缓存,再更新数据库:

/**
 * 下单接口:先删除缓存,再更新数据库
 * @param sid
 * @return
 */
@RequestMapping("/createOrderWithCacheV1/{sid}")
@ResponseBody
public String createOrderWithCacheV1(@PathVariable int sid) {
    int count = 0;
    try {
        // 删除库存缓存
        stockService.delStockCountCache(sid);
        // 完成扣库存下单事务
        orderService.createPessimisticOrder(sid);
    } catch (Exception e) {
        LOGGER.error("购买失败:[{}]", e.getMessage());
        return "购买失败,库存不足";
    }
    LOGGER.info("购买成功,剩余库存为: [{}]", count);
    return String.format("购买成功,剩余库存为:%d", count);
}

stockService中新增:

@Override
public void delStockCountCache(int id) {
    String hashKey = CacheKey.STOCK_COUNT.getKey() + "_" + id;
    stringRedisTemplate.delete(hashKey);
    LOGGER.info("删除商品id:[{}] 缓存", id);
}



先更新数据库,再删缓存

如果是先更新数据库,再删缓存,那么代码只是在业务顺序上颠倒了一下:

/**
 * 下单接口:先更新数据库,再删缓存
 * @param sid
 * @return
 */
@RequestMapping("/createOrderWithCacheV2/{sid}")
@ResponseBody
public String createOrderWithCacheV2(@PathVariable int sid) {
    int count = 0;
    try {
        // 完成扣库存下单事务
        orderService.createPessimisticOrder(sid);
        // 删除库存缓存
        stockService.delStockCountCache(sid);
    } catch (Exception e) {
        LOGGER.error("购买失败:[{}]", e.getMessage());
        return "购买失败,库存不足";
    }
    LOGGER.info("购买成功,剩余库存为: [{}]", count);
    return String.format("购买成功,剩余库存为:%d", count);
}



缓存延时双删

如何做延时双删呢,最好的方法是开设一个线程池,在线程中删除key。更新前先删除缓存,然后更新数据,再延时删除缓存。OrderController中新增接口:


// 延时时间:预估读数据库数据业务逻辑的耗时,用来做缓存再删除
private static final int DELAY_MILLSECONDS = 1000;


/**
 * 下单接口:先删除缓存,再更新数据库,缓存延时双删
 * @param sid
 * @return
 */
@RequestMapping("/createOrderWithCacheV3/{sid}")
@ResponseBody
public String createOrderWithCacheV3(@PathVariable int sid) {
    int count;
    try {
        // 删除库存缓存
        stockService.delStockCountCache(sid);
        // 完成扣库存下单事务
        count = orderService.createPessimisticOrder(sid);
        // 延时指定时间后再次删除缓存
        cachedThreadPool.execute(new delCacheByThread(sid));
    } catch (Exception e) {
        LOGGER.error("购买失败:[{}]", e.getMessage());
        return "购买失败,库存不足";
    }
    LOGGER.info("购买成功,剩余库存为: [{}]", count);
    return String.format("购买成功,剩余库存为:%d", count);
}

OrderController中新增线程池:

// 延时双删线程池
private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());


/**
 * 缓存再删除线程
 */
private class delCacheByThread implements Runnable {
    private int sid;
    public delCacheByThread(int sid) {
        this.sid = sid;
    }
    public void run() {
        try {
            LOGGER.info("异步执行缓存再删除,商品id:[{}], 首先休眠:[{}] 毫秒", sid, DELAY_MILLSECONDS);
            Thread.sleep(DELAY_MILLSECONDS);
            stockService.delStockCountCache(sid);
            LOGGER.info("再次删除商品id:[{}] 缓存", sid);
        } catch (Exception e) {
            LOGGER.error("delCacheByThread执行出错", e);
        }
    }
}

调用接口createOrderWithCacheV3

在这里插入图片描述

删除缓存前库存为48

在这里插入图片描述

删除缓存前库存为null,没有数据

在这里插入图片描述

然后正常下单以后库存变为47,此时将缓存更新到redis中

在这里插入图片描述

最后异步将缓存数据再次删除:

在这里插入图片描述

的确是做了两次缓存删除:

在这里插入图片描述



删除缓存重试机制

以上删除有可能会失败。要解决删除失败的问题,需要用到消息队列,进行删除操作的重试。这里我们为了达到效果,接入了RabbitMq,并且需要在接口中写发送消息,并且需要消费者常驻来消费消息。

首先在pom.xml新增RabbitMq的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

写一个RabbitMqConfig:

@Configuration
public class RabbitMqConfig {
    @Bean
    public Queue delCacheQueue() {
        return new Queue("delCache");
    }
}

添加一个消费者:

@Component
@RabbitListener(queues = "delCache")
public class DelCacheReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(DelCacheReceiver.class);

    @Autowired
    private StockService stockService;

    @RabbitHandler
    public void process(String message) {
        LOGGER.info("DelCacheReceiver收到消息: " + message);
        LOGGER.info("DelCacheReceiver开始删除缓存: " + message);
        stockService.delStockCountCache(Integer.parseInt(message));
    }
}

OrderController中新增接口:

/**
 * 下单接口:先更新数据库,再删缓存,删除缓存重试机制
 * @param sid
 * @return
 */
@RequestMapping("/createOrderWithCacheV4/{sid}")
@ResponseBody
public String createOrderWithCacheV4(@PathVariable int sid) {
    int count;
    try {
        // 完成扣库存下单事务
        count = orderService.createPessimisticOrder(sid);
        // 删除库存缓存
        stockService.delStockCountCache(sid);
        // 延时指定时间后再次删除缓存
        // cachedThreadPool.execute(new delCacheByThread(sid));
        // 假设上述再次删除缓存没成功,通知消息队列进行删除缓存
        sendDelCache(String.valueOf(sid));

    } catch (Exception e) {
        LOGGER.error("购买失败:[{}]", e.getMessage());
        return "购买失败,库存不足";
    }
    LOGGER.info("购买成功,剩余库存为: [{}]", count);
    return String.format("购买成功,剩余库存为:%d", count);
}

调用接口createOrderWithCacheV4

在这里插入图片描述

可以看到,我们先完成了下单,然后删除了缓存,并且假设延迟删除缓存失败了,发送给消息队列重试的消息,消息队列收到消息后再去删除缓存。

在这里插入图片描述



读取binlog异步删除缓存

这里我们使用阿里开源的canal来读取binlog进行缓存的异步删除。Canal用途很广,并且上手非常简单,我们在下一篇单独做一下介绍。


猜你感兴趣





教你从0到1搭建秒杀系统-防超卖



教你从0到1搭建秒杀系统-限流



教你从0到1搭建秒杀系统-抢购接口隐藏与单用户限制频率



教你从0到1搭建秒杀系统-缓存与数据库双写一致



教你从0到1搭建秒杀系统-Canal快速入门(番外篇)



教你从0到1搭建秒杀系统-订单异步处理


更多文章请点击:

更多…


参考文章:

https://cloud.tencent.com/developer/article/1574827

https://www.jianshu.com/p/2936a5c65e6b

https://www.cnblogs.com/rjzheng/p/9041659.html

https://www.cnblogs.com/codeon/p/8287563.html

https://www.jianshu.com/p/0275ecca2438

https://www.jianshu.com/p/dc1e5091a0d8

https://coolshell.cn/articles/17416.html



版权声明:本文为weixin_43525722原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。