stage5 day07 拼多商城整合rabbitmq(订单流量削峰),Rabbitmq-spring boot整合

  • Post author:
  • Post category:其他



学习链接



问题

  1. @PostConstruct注解

    spring会自动执行@PostConstruct
  2. springboot执行流程是什么?

    扫描创建实例 –> 自动注入 –> @PostConstruct –> 后续流程(启动消费者)
  3. rabbitmq 中队列参数的意思

    在这里插入图片描述

    在这里插入图片描述
  4. RabbitAutoConfiguration 自动配置类,会根据这里的设置的参数,在服务器上创建队列
  5. 创建消费者的注解是什么?


    @RabbitListener



    @RabbitListener 创建一个消费者,启动一个消费者线程开始接收消息

    每个@RabbitListener都会创建一个消费者

    1)自动创建实例

    2)自动注册成为消费者

    3)自动开始接收消息

    4)自动调用消息处理方法



    在这里插入图片描述



pdShop 订单模块

在这里插入图片描述



添加空间,使用rabbitmq下自己的空间,需要在服务器上手动创建

在这里插入图片描述

在这里插入图片描述



1 拼多商城整合 rabbitmq —- 订单存储的解耦(流量削峰)

当用户下订单时,我们的业务系统直接与数据库通信,把订单保存到数据库中

当系统流量突然激增,大量的订单压力,会拖慢业务系统和数据库系统

我们需要应对流量峰值,让流量曲线变得平缓,如下图

在这里插入图片描述


为了进行流量削峰,引入rabbitmq消息队列,当购物系统产生订单后,可以把订单数据

发送到消息队列



而订单消费者应用从消息队列接收消息,并把订单保存到数据库


在这里插入图片描述


这样,当流量激增时,大量订单会暂存在rabbitmq中,而订单消费者可以从容地从消息队列慢慢接收订单,向数据库保存



1.1 生产者-发送订单



1.1.1 pom.xml依赖

spring提供了更方便的消息队列访问接口,它对RabbitMQ的客户端API进行了封装,使用起来更加方便

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>



1.1.2 application.yml

添加Rabbitmq的连接消息

在这里插入图片描述



1.1.3 修改主程序 RunPdApp

在主程序中添加下面的方法创建Queue实例

当创建RabbitMQ连接和信道后,Spring的RabbitMQ工具会自动在服务器中创建队列,

代码在

RabbitAdmin.declareQueues()

方法中

在这里插入图片描述



1.1.4 修改 OrderServiceImpl

在这里插入图片描述



1.2 消费者-接收订单,并保存到数据库



1.2.1 复制一份消费者 pdweb ,pd-web项目复制为pd-order-consumer

在这里插入图片描述



1.2.2 修改 application.yml

端口修改成81

在这里插入图片描述



1.2.3 新建OrderConsumer 消费者类

在这里插入图片描述

package com.pd;

import com.pd.pojo.PdOrder;
import com.pd.service.OrderService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
//通过@RabbitListener注解配置来接收消息 ,需要指定队列名 ,spring封装后的rabbitmq API
/**
 * -自动创建实例
 * -自动注册成为消费者
 * -自动开始接收消息
 * -自动处理收到消息
 */
@RabbitListener(queues = "orderQueue")
public class OrderConsumer {
    //注入业务
    @Autowired
    private OrderService orderService;

    //指定处理消息的方法,在同一个类中只能设置一次
    @RabbitHandler
    public void receive(PdOrder pdOrder) throws Exception {
        orderService.saveOrder(pdOrder);
    }
}



1.2.4 修改OrderServiceImpl的saveOrder() 方法

在这里插入图片描述



1.3 测试 (注意操作顺序)

1.注册登录

2.添加地址

3.下订单



1 注册登录

在这里插入图片描述



2 添加地址

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述



3 下订单

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述



4 启动消费者(pd_web_consumer)

在这里插入图片描述

在这里插入图片描述



2 rabbitmq-spring整合



2.1 简单模式



2.1.1 Main

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }

    //配置helloworld队列参数
    @Bean
    public Queue helloworldQueue(){
        return new Queue("helloworld",false,false,false);
    }


    @Autowired
    private Producer p;
    /*
    springboot执行流程
    扫描创建实例 --> 自动注入 --> @PostConstruct --> 后续流程(启动消费者)

    spring会自动执行@PostConstruct
     */

    @PostConstruct
    public void test() {
        //在新的线程中执行阻塞操作,避免影响spring主线程的执行
        new Thread(()->{
            try {
                Thread.sleep(3000);//等待消费者启动后再发消息
            } catch (InterruptedException e) {
//                e.printStackTrace();
            }
            p.send();
        }).start();


    }
}



2.1.2 Producer

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {
    @Autowired
    private AmqpTemplate t;

    public void send(){
        t.convertAndSend("helloworld","Hello world!");
    }
}



2.1.3 Consumer

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/*
@RabbitListener 创建一个消费者,启动一个消费者线程开始接收消息
每个@RabbitListener都会创建一个消费者

自动创建实例
自动注册成为消费者
自动开始接收消息
自动调用消息处理方法
 */
@Component
public class Consumer {
    @RabbitListener(queues = "helloworld")
    public void receive (String msg){
        System.out.println("收到:"+msg);
    }
}



2.2 工作模式

在这里插入图片描述



生产者

在这里插入图片描述



创建两个消费者测试

在这里插入图片描述

在这里插入图片描述



合理分发 手动Ack,qos=1

  1. 手动Ack

    spring集成rabbitmq,默认就是手动Ack,spring会自动发送回执
  2. qos=1

    yml中添加 prefech参数 预抓取,spring设置的默认值是250 ,需要设置成1

    在这里插入图片描述



消息持久化

  1. 队列持久化

    在这里插入图片描述

  2. 消息的持久化 ,spring默认已添加了持久参数



2.3 广播模式(群发)

fanout交换机

@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }

    //新建交换机
    @Bean
    public FanoutExchange logs(){
        //非持久,不自动删除
        return new FanoutExchange("logs",false,false);
    }

    @Autowired
    private Producer p;
    /*
    springboot执行流程
    扫描创建实例 --> 自动注入 --> @PostConstruct --> 后续流程(启动消费者)

    spring会自动执行@PostConstruct
     */

    @PostConstruct
    public void test() {
        //在新的线程中执行阻塞操作,避免影响spring主线程的执行
        new Thread(()->{
            while (true){
                System.out.println("输入消息:");
                String s=new Scanner(System.in).nextLine();
                p.send(s);
            }
        }).start();
    }
}
@Component
public class Producer {
    @Autowired
    private AmqpTemplate t;

    public void send(String msg){
        //广播模式第二个参数无效
        t.convertAndSend("logs","",msg.getBytes());
    }

}
//创建自己的队列,与交换机绑定
//随机命名,非持久,独占

@Component
public class Consumer {
    @RabbitListener(bindings = @QueueBinding(
            //设置 队列,spring会随机命名,非持久,独占,自动删除
            value = @Queue,//(name = "",durable = "",autoDelete = ""),
            //交换机 declare = "false" 不在这里自动创建交换机
            exchange = @Exchange(name = "logs",declare = "false")
    ))
    public void receive1 (String msg){
        System.out.println("消费者1收到:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            //设置 队列,spring会随机命名,非持久,独占,自动删除
            value = @Queue,//(name = "",durable = "",autoDelete = ""),
            //交换机 declare = "false" 不在这里自动创建交换机
            exchange = @Exchange(name = "logs",declare = "false")
    ))
    public void receive2 (String msg){
        System.out.println("消费者2收到:"+msg);
    }
}

在这里插入图片描述



2.4 路由模式 (关键字)

direct 直连模式



Producer

在这里插入图片描述



Main

在这里插入图片描述

在这里插入图片描述



Consumer

在这里插入图片描述

在这里插入图片描述



2.5 主题模式 (通配符关键字)



Main

在这里插入图片描述



Consumer

在这里插入图片描述



测试

在这里插入图片描述



版权声明:本文为qq_45736735原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。