二、RabbitMQ在某商城项目中的使用案例

  • Post author:
  • Post category:其他




一、在对商品增删改查的时候发送消息

在item-service中引入

amqp

依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在配置文件中配置相关信息

server:
  port: 8081
mybatis:
  type-aliases-package: com.leyou.item.pojo
spring:
  datasource:
    url: jdbc:mysql://localhost:3306/leyou
    username: root
    password: 123456
  application:
    name: item-service #将来会作为微服务的名称
  # 消息队列配置
  rabbitmq:
    host: 192.168.248.131
    virtual-host: /
    username: admin
    password: admin
    template:
      exchange: LEYOU.ITEM.EXCHANGE # 指定交换机的名称
eureka:
  client:
    service-url:
      defaultZone: http://localhost:10086/eureka
    register-with-eureka: true #服务提供方启动时,会监测该参数是否为true,true 注册给eureka
  instance:
    lease-renewal-interval-in-seconds: 5 #心跳时间,没5s给注册中心发一次
    lease-expiration-duration-in-seconds: 15 #过期时间

在GoodsService中添加发送消息的方法,要先注入amqpTemplate对象,这里的

routing key

,格式为

item.*



*

在要发送消息的方法中传过来,使用

try{}catch(){}

是为了防止发送消息出现异常,影响正常的数据库保存操作

private void sendMsg(String type, Long id) {
    try {
        this.amqpTemplate.convertAndSend("item." + type, id);
    } catch (AmqpException e) {
        e.printStackTrace();
    }
}

在新增商品时,需要发送消息,

rooting key



item.insert

,传递的参数为

insert

和新增商品的

id

@Transactional// 加事物,新增spu,新增spuDetail
public void saveGoods(SpuBo spuBo) {
    //1.新增spu,spuBo继承了spu
    //设置默认字段
    spuBo.setId(null);
    spuBo.setSaleable(true);
    spuBo.setValid(true);
    spuBo.setCreateTime(new Date());
    spuBo.setLastUpdateTime(spuBo.getCreateTime());
    this.spuMapper.insertSelective(spuBo);
    //2.新增spuDetail
    SpuDetail spuDetail = spuBo.getSpuDetail();
    spuDetail.setSpuId(spuBo.getId());
    this.spuDetailMapper.insertSelective(spuDetail);
    //3.新增sku和库存
    saveSkuAndStock(spuBo);

    // 发送新增商品的消息到消息队列:item.insert,就是rooting key
    sendMsg("insert", spuBo.getId());
}

修改商品时,同样也需要发送消息

@Transactional
public void updateGoods(SpuBo spuBo) {
    // 根据spuId查询以前sku
    Sku record = new Sku();
    record.setSpuId(spuBo.getId());
    List<Sku> skus = this.skuMapper.select(record);
    // 如果以前存在,则删除
    if(!CollectionUtils.isEmpty(skus)) {
        skus.forEach(sku -> {
            // 删除库存
            this.stockMapper.deleteByPrimaryKey(sku.getId());
        });
        // 删除sku
        Sku sku = new Sku();
        sku.setSpuId(spuBo.getId());
        this.skuMapper.delete(sku);
    }
    // 新增sku 和stock
    this.saveSkuAndStock(spuBo);
    // 更新spu
    spuBo.setLastUpdateTime(new Date());
    spuBo.setCreateTime(null);
    spuBo.setValid(null);
    spuBo.setSaleable(null);
    this.spuMapper.updateByPrimaryKeySelective(spuBo);
    // 更新spu详情
    this.spuDetailMapper.updateByPrimaryKeySelective(spuBo.getSpuDetail());
    // 发送消息到消息队列,更新商品
    sendMsg("update",spuBo.getId());
}



二、在页面静态化的微服务里接受消息,更新静态页面

在goods-web微服务中同样引入依赖,配置rabbitMQ的相关信息同上,只是这里不用再声明交换机的名称

/**
 * @auther Mr.Liao
 * @date 2019/5/17 10:02
 */
@Component
public class GoodsListener {
    @Autowired
    private GoodsHtmlService goodsHtmlService;
    /**
     * 新增和更新商品的时候都要从新生成静态页面
     * @param id
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "LEYOU.ITEM.SAVE.QUEUE", durable = "true"),
            exchange = @Exchange(value = "LEYOU.ITEM.EXCHANGE", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),
            key = {"item.insert","item.update"}
    ))
    public void save(Long id){
        if (id == null){
            return;
        }
        //调用createHtml()方法,根据spuId生成静态页面
        this.goodsHtmlService.createHtml(id);
    }

    /**
     * 删除商品时,删除对应的静态页面的方法
     * @param id
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "LEYOU.ITEM.DELETE.QUEUE",durable = "true"),
            exchange = @Exchange(value = "LEYOU.ITEM.EXCHANGE",ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),
            key = {"item.delete"}
    ))
    public void delete(Long id){
        if (id == null){
            return;
        }
        this.goodsHtmlService.deleteHtml(id);
    }
}



三、在搜索微服务中更新索引库的数据

引入依赖、配置rabbitMQ,添加监听消息的方法

@Component
public class GoodsListener {
    @Autowired
    private SearchService searchService;

    /**
     * 监听消息队列的消息,更新索引库中的数据
     * @param id
     * @throws IOException
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "LEYOU.SEARCH.SAVE.QUEUE",durable = "true"),
            exchange = @Exchange(value = "LEYOU.ITEM.EXCHANGE",ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),
            key = {"item.insert","item.update"}
    ))
    public void save(Long id) throws IOException {
        if (id == null){
            return;
        }
        this.searchService.save(id);

    }

    /**
     * 监听删除的消息
     * @param id
     * @throws IOException
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "LEYOU.SEARCH.DELETE.QUEUE",durable = "true"),
            exchange = @Exchange(value = "LEYOU.ITEM.EXCHANGE",ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),
            key = {"item.delete"}
    ))
    public void delete(Long id) throws IOException {
        if (id == null){
            return;
        }
        this.searchService.delete(id);
    }
}

下面service中的save方法抛出的异常,抛到上面

GoodsListener

中,这里的save又将异常抛出,则spring框架会根据是否有异常,来决定是否确认消息(手动ACK)

/**
 * 根据消息队列获取消息,保存新增的商品
 * @param id
 * @throws IOException
 */
public void save(Long id) throws IOException {
    // 根据商品Id远程调用服务从数据库中查询处相关的商品信息
    Spu spu = this.goodsClient.querySpuById(id);
    // 构建成索引库所需的数据模型
    Goods goods = this.buildGoods(spu);
    // 添加到索引库中
    this.goodsRepository.save(goods);
}

/**
 * 根据消息队列获取消息,删除一个商品
 * @param id
 */
public void delete(Long id) {
    this.goodsRepository.deleteById(id);
}



四、用户注册时发送短信验证码

在user-service微服务中

/**
 * 生成验证码,发送到手机,并保存到redis中
 * @param phone
 */
public void sendVerifyCode(String phone) {
    if (StringUtils.isBlank(phone)){
        return;
    }
    // 生成验证码
    String code = NumberUtils.generateCode(6);
    // 发送消息到rabbitMQ, sms服务消费消息, 给手机发送短信验证码
    HashMap<String, String> msg = new HashMap<>();
    msg.put("phone",phone);
    msg.put("code", code);
    this.amqpTemplate.convertAndSend("LEYOU.SMS.EXCHANGE","verifyCode.sms", msg);
    // 把验证码保存到redis中
    this.redisTemplate.opsForValue().set(KEY_PREFIX+phone, code, 5, TimeUnit.MINUTES);
}

在短信微服务中,编写监听消息发送短信的方法

@Component
public class SmsListener {
    @Autowired
    private SmsUtils smsUtils;
    @Autowired
    private SmsProperties smsProperties;
    /**
     * 发送短信验证码
     * @param msg
     * @throws ClientException
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "LEYOU.SMS.QUEUE",durable = "true"),
            exchange = @Exchange(value = "LEYOU.SMS.EXCHANGE",ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),
            key = {"verifyCode.sms"}
    ))
    public void sendSms(Map<String, String> msg) throws ClientException {
        if (CollectionUtils.isEmpty(msg)){
            return;
        }
        String phone = msg.get("phone");
        String code = msg.get("code");
        if (StringUtils.isNoneBlank(phone) && StringUtils.isNoneBlank(code)){
            this.smsUtils.sendSms(phone, code, this.smsProperties.getSignName(),this.smsProperties.getVerifyCodeTemplate());
        }
    }
}



版权声明:本文为a2231476020原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。