7-2 交易验证优化
用户风控策略优化:策略缓存模型化
活动校验策略优化:引入活动发布流程,模型缓存化,紧急下线能力
7-4 活动缓存库存方案一
我们不需要整个item表的所有商品都串行 只需要做活动的那个商品串行就行 所以可以将表锁改为行锁 提高性能
该sql语句在itemId字段有索引的前提下会加上行锁 若无索引则会锁定表
update item_stock
set stock = stock – #{amount}
where item_id = #{itemId} and stock >= #{amount}
给item_stock的item_id字段添加唯一索引 以添加行锁
UNIQUE INDEX
item_id_index
(
item_id
) USING BTREE
此时串行减库存的操作依旧无法避免 为性能瓶颈 需要对库存行锁进行优化:
1.扣减库存缓存化(在内存中操作代替在磁盘中操作)
(1)活动发布同步库存进缓存
PromoService.java
public interface PromoService {
PromoModel getPromoByItemId(Integer itemId);
//活动发布
void publishPromo(Integer promoId);
}
PromoServiceImpl.java
@Autowired
private ItemService itemService;
@Override
public void publishPromo(Integer promoId) {
//通过活动id获取活动
PromoDO promoDO = promoDOMapper.selectByPrimaryKey(promoId);
if (promoDO.getItemId() == null || promoDO.getItemId().intValue() == 0) {
return;
}
ItemModel itemModel = itemService.getItemById(promoDO.getItemId());
//将库存同步到redis内
//实际中 在此同步的瞬间可能由于新的订单而导致库存不一致 可在业务逻辑中下架商品解决
redisTemplate.opsForValue().set("promo_item_stock_"+itemModel.getId(),itemModel.getStock());
}
ItemController.java
@Autowired
private PromoService promoService;
//手动发布活动
@RequestMapping(value = "/publishpromo", method = {RequestMethod.GET})
@ResponseBody
public CommonReturnType publishpromo(@RequestParam(name = "id") Integer id) {
promoService.publishPromo(id);
return CommonReturnType.create(null);
}
(2)下单交易减缓存库存
2.异步同步数据库(内存会丢 缓存不靠谱)
3.库存数据库最终一致性保证
7-5 活动缓存库存方案二
异步消息队列rocketmq
高性能,高并发,分布式消息中间件
典型应用场景︰分布式事务,异步解耦
7-7 rocketmq安装
wget https://dlcdn.apache.org/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip --no-check-certificat
chmod -R 777 *
unzip rocketmq-all-4.9.4-bin-release.zip
在启动和测试之前需要修改默认JVM大小
runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"
tool.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn64m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
linux:
启动名称服务器
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
启动代理
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
发送和接收消息
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
关闭服务器
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
win:
启动名称服务器
.\bin\mqnamesrv.cmd
启动代理
.\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
发送和接收消息
.\bin\tools.cmd org.apache.rocketmq.example.quickstart.Producer
.\bin\tools.cmd org.apache.rocketmq.example.quickstart.Consumer
7-8 缓存库存接入异步化(上)
添加依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
在application.properties中添加配置
mq.nameserver,addr=39.103.232.42:9876
mq.topicname=stock
7-9 缓存库存接入异步化(下)
ItemServiceImpl.java
@Override
@Transactional
public boolean decreaseStock(Integer itemId, Integer amount) throws BusinessException {
//int affectedRow = itemStockDOMapper.decreaseStock(itemId, amount);
long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue() * -1);
if (result >= 0) {
//更新库存成功
boolean mqResult = mqProducer.asyncReduceStock(itemId,amount);
if (!mqResult) {
redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue());
}
return true;
} else {
//更新库存失败
redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue());
return false;
}
}
MqProducer.java
@Component
public class MqProducer {
private DefaultMQProducer producer;
@Value("${mq.nameserver,addr}")
private String nameAddr;
@Value("${mq.topicname}")
private String topicName;
@PostConstruct
public void init() throws MQClientException {
//mq producer的初始化
producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr(nameAddr);
producer.start();
}
//同步库存扣减消息
public boolean asyncReduceStock(Integer itemId, Integer amount) {
Map<String, Object> bodyMap = new HashMap<>();
bodyMap.put("itemId",itemId);
bodyMap.put("amount",amount);
Message message = new Message(topicName,"increase",
JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8")));
try {
producer.send(message);
} catch (MQClientException e) {
e.printStackTrace();
return false;
} catch (RemotingException e) {
e.printStackTrace();
return false;
} catch (MQBrokerException e) {
e.printStackTrace();
return false;
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
return true;
}
}
MqConsumer.java
@Component
public class MqConsumer {
private DefaultMQPushConsumer consumer;
@Value("${mq.nameserver,addr}")
private String nameAddr;
@Value("${mq.topicname}")
private String topicName;
@Autowired
private ItemStockDOMapper itemStockDOMapper;
@PostConstruct
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer("stock_consumer_group");
consumer.setNamesrvAddr(nameAddr);
consumer.subscribe(topicName,"*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//实现库存真正到数据库内扣减的逻辑
Message msg = msgs.get(0);
String jsonString = new String(msg.getBody());
Map<String, Object> map = JSON.parseObject(jsonString,Map.class);
Integer itemId = (Integer) map.get("itemId");
Integer amount = (Integer) map.get("amount");
itemStockDOMapper.decreaseStock(itemId,amount);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}