一.场景
秒杀下单Flow,一个会员,只能下一单。
下图为秒杀下单的简单流程图,其中除了<异步操作>外,其它操作均为同步操作。
二.问题
1 前置业务校验(包括活动,商品库存和会员下单数量)如何实现。
2 扣减库存和创建订单的一致性如何处理。
3 补偿业务如何实现(库存回滚,订单过时)。
三.分析
1 秒杀场景的并发点可以分为两个,商品扣减库存和会员下单。商品扣减库存是全局的,而会员下单只针对单个会员。
2 高并发场景下,为了减少无效请求进入,应该尽量把校验操作放到前面。
3 因为秒杀场景的流量大,所以这里的校验不能直接查询数据库,涉及到库存的操作还需要保证一致性的问题。
4 静态数据基本不会更改,作为缓存进行校验相对简单,所以主要问题还是处理库存和下单的一致性。
四.方案
1. 业务校验应该放在扣减库存前,目的是过滤无效请求。
2. 活动状态和时间基本上是不会变,所以可以直接用缓存。可以把活动信息存储在redis的hashmap中,然后以当前时间为参数,采用lua脚本把活动的查询和校验串行执行,用于判断活动的有效性。
3. 如果每人只能下一单,那么在下单成功后,可以用bitmap记录会员ID,如果每人可以多单,那么可以用incrby+lua,把活动_商品_会员作为key,商品数量或者订单数量作为value,,前置业务校验对每个请求进行过滤,最后进来的只会是有效的请求。
4. 为了保证库存和订单的一致性和时效性,先扣减库存,然后再下单(这里需要对单个会员进行锁定),如果下单失败,需要提供库存回滚处理,在事务中先更新数据库,再decrby缓存。如果回滚失败,可以通过日志监控进行人工处理(这里考虑到一般失败的概率比较低)。
5. 订单过时的处理,可以用定时任务进行监控,如果时效性比较高的话可以用延时队列,取消订单后,通过mq来回滚库存(定时任务可以使用quartz或者elastic-job,延时队列可以用redis,或者其它mq作有序消费)。
五.伪代码
分布式锁代码(参考 Jedis 实现简单的分布式锁):
--因为秒杀场景的请求量比较大,所以这里会改成fast-fail机制,不进行轮循
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
String result = null;
Jedis jedis = null;
if (Thread.currentThread().isInterrupted()) {
evalUnLock(jedis);
throw new InterruptedException();
}
try {
jedis = jedisPool.getResource();
result = jedis.set(lockKey, lockUserId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, unit.toMillis(time));
} catch (Exception e) {
e.printStackTrace();
} finally {
jedis.close();
}
if (!LOCK_SUCCESS.equals(result)) {
return false;
} else {
return true;
}
}
商品库存扣减代码:
package test.cache;
import redis.clients.jedis.Jedis;
import java.util.Arrays;
public class ItemStock {
// 初始化测试库存量
public static void initKey(String key, String stock){
Jedis jedis = RedisUtil.getResource();
try {
jedis.set(key, stock);
} finally {
if (jedis != null){
jedis.close();
}
}
Long restNum = Long.valueOf((String)RedisUtil.get(key));
System.out.println("init:" + restNum);
}
static String script =
"local num = ARGV[1] \n" +
"local key = KEYS[1] \n" +
"local stock = redis.call('get',key) \n" +
"if stock - num >= 0 \n" +
"then redis.call('decrby',key, num) \n" +
"return 1 \n" + // 成功
"else \n" +
"return 0 \n" + //失败
"end";
//lua脚本实现扣减库存
public static Long deduct(String key , int num){
Jedis jedis = RedisUtil.getResource();
try {
Object re = jedis.evalsha(jedis.scriptLoad(script), Arrays.asList(key), Arrays.asList(num + ""));
System.out.printf("deduct:%d\n", re);
return (Long) re;
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null){
jedis.close();
}
}
return 0L;
}
// 用于停止扣减库存主线程
public static void stockStop(String key, Integer stopNum) {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Long restNum = Long.valueOf((String) RedisUtil.get(key));
System.out.printf("restNum:%d,stopNum:%d\n", restNum, stopNum);
if (restNum.intValue() == stopNum) {
return;
}
}
}
}
会员下单标志代码:
package test.cache;
import redis.clients.jedis.Jedis;
public class MemberOrder {
// key -> activityId + itemId
final static String ACTIVITY_ITEM_BIG_MAP_KEY = "%s_%s_BIT_MAP_KEY";
public static Boolean isMarkedStock(Long activityId, Long itemId, Long memberId){
Jedis jedis = RedisUtil.getResource();
try {
return jedis.getbit(String.format(ACTIVITY_ITEM_BIG_MAP_KEY, activityId.toString(), itemId.toString()), memberId);
} finally {
jedis.close();
}
}
public static Boolean markStock(Long activityId, Long itemId, Long memberId, boolean isMarked){
Jedis jedis = RedisUtil.getResource();
try {
return jedis.setbit(
String.format(ACTIVITY_ITEM_BIG_MAP_KEY, activityId.toString(), itemId.toString()),
memberId,
isMarked);
} finally {
jedis.close();
}
}
}
下单业务代码:
package test.cache;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
public class SecondKillOrder {
// activity_id + item_id + member_id
private final static String LOCK_KEY = "%s_%s_%s";
// activity_id + item_id
public final static String STOCK_KEY = "%s_%s_stock";
// 只允许会员下一单,并且每单3个库存
public static void orderFlow(Long activityId, Long itemId, Long memberId, Integer buyNum){
// 校验会员是否已经存在未过时订单
if (MemberOrder.isMarkedStock(activityId, itemId, memberId)){
System.out.println("您已经存在秒杀订单");
}
// 初始化分布式锁 lock activity_id, item_id, member_id
String uuid = UUID.randomUUID().toString();
Lock lock = new RedisDistributedLock(
RedisUtil.getPool(),
String.format(LOCK_KEY, activityId, itemId, memberId),
uuid
);
try {
if (!lock.tryLock()){
return;
}
// TODO 订单或者商品校验
Thread.sleep(200);
// 扣减库存 + 标志会员
if (!cacheOpt(activityId, itemId, memberId, buyNum, true)){
return;
}
// TODO 创建订单
Thread.sleep(300);
} catch (Exception e){
e.printStackTrace();
// 回滚
cacheOpt(activityId, itemId, memberId, -buyNum, false);
} finally {
lock.unlock();
}
}
private static boolean cacheOpt(Long activityId, Long itemId, Long memberId, int i, boolean b) {
Long res = ItemStock.deduct(String.format(STOCK_KEY, activityId, itemId), i);
if (res == 1){// 库存扣减成功
MemberOrder.markStock(activityId, itemId, memberId, b);
return true;
} else {// 库存扣减失败
return false;
}
}
}
这里创建订单和cacheOpt的顺序可以适当调整,为了可以过滤无效重复请求,先创建订单,那么需要考虑如果扣减库存和标志会员操作后应用挂掉的情况,这时候需要作一些补偿工作;为了确保一致性,先扣减库存和标志会员操作,后创建订单,这样就避免补偿工作,但是在性能方面可能不如前面的方案。
测试代码:
package test.cache;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
public class SecondKillTest {
public static final String STOCK_KEY = SecondKillOrder.STOCK_KEY;
public static final String STOCK_NUM = "1000";
public static final int THREAD_NUM = 200;
// key -> activityId + itemId + memberId
public final static String LOCK_KEY = "%s_%s_%s";
public static void main(String[] args) {
// 扣减库存测试
// stockTest();
// 分布式锁测试
// lockTest();
// 下单流程测试
orderTest();
}
private static void orderTest() {
int i = THREAD_NUM;
Long activityId = 123456789L;
Long itemId = 987456123L;
Integer buyNum = 3;
ItemStock.initKey(String.format(STOCK_KEY, activityId.toString(), itemId.toString()), STOCK_NUM);
batchFunc(countDownLatch -> {
Long memberId = Math.abs(new Random().nextLong() % 10);
System.out.println("memberId:" + memberId);
countDownLatch.await();
SecondKillOrder.orderFlow(activityId, itemId, memberId, buyNum);
}, THREAD_NUM);
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void lockTest() {
batchFunc(countDownLatch -> onceLock(countDownLatch), THREAD_NUM);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void stockTest() {
String key = String.format(STOCK_KEY, "test", "test");
ItemStock.initKey(key, STOCK_NUM);
batchFunc((countDownLatch)->{
countDownLatch.await();
ItemStock.deduct(key,3);
}, THREAD_NUM);
ItemStock.stockStop(key, Integer.valueOf(STOCK_NUM) - THREAD_NUM*3);
}
// 一次锁操作
public static void onceLock(CountDownLatch countDownLatch){
final Long ACTIVITY_ID = 123456789L;
final Long ITEM_ID = 12345678901L;
final Long MEMBER_ID = 456123789L;
String uuid = UUID.randomUUID().toString();
RedisDistributedLock lock = new RedisDistributedLock(
RedisUtil.getPool(),
String.format(LOCK_KEY, ACTIVITY_ID, ITEM_ID, MEMBER_ID),
uuid
);
try {
if (countDownLatch != null){
countDownLatch.await();
}
if (!lock.tryLock()){
return;
}
Thread.sleep(200);
System.out.println(Thread.currentThread().getName() + ":get");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
//并发测试代码
private static void batchFunc(Func func, int threadNum) {
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
new Thread(() -> {
try {
func.execute(countDownLatch);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
countDownLatch.countDown();
}
return;
}
@FunctionalInterface
interface Func {
void execute(CountDownLatch countDownLatch) throws InterruptedException;
}
}
redis连接管理类代码可参考 redis 事务与Lua脚本
以上代码只是简单实现了下单流程,还有异步操作流程,补偿流程没有实现。
六.总结
以上为对服务端的秒杀流程的总结。这里涉及到的抵御请求洪流的一般用缓存(redis)。至于如果出现缓存都无法抵御的请求量的话,面对这种情况就必须预先准备好一些降级和限流措施,或者不想丢失数据的话可以用mq对会员请求进行蓄洪。