问题
-
@PostConstruct注解
spring会自动执行@PostConstruct -
springboot执行流程是什么?
扫描创建实例 –> 自动注入 –> @PostConstruct –> 后续流程(启动消费者) -
rabbitmq 中队列参数的意思
- RabbitAutoConfiguration 自动配置类,会根据这里的设置的参数,在服务器上创建队列
-
创建消费者的注解是什么?
@RabbitListener
@RabbitListener 创建一个消费者,启动一个消费者线程开始接收消息
每个@RabbitListener都会创建一个消费者
1)自动创建实例
2)自动注册成为消费者
3)自动开始接收消息
4)自动调用消息处理方法
pdShop 订单模块
添加空间,使用rabbitmq下自己的空间,需要在服务器上手动创建
1 拼多商城整合 rabbitmq —- 订单存储的解耦(流量削峰)
当用户下订单时,我们的业务系统直接与数据库通信,把订单保存到数据库中
当系统流量突然激增,大量的订单压力,会拖慢业务系统和数据库系统
我们需要应对流量峰值,让流量曲线变得平缓,如下图
为了进行流量削峰,引入rabbitmq消息队列,当购物系统产生订单后,可以把订单数据
发送到消息队列
;
而订单消费者应用从消息队列接收消息,并把订单保存到数据库
这样,当流量激增时,大量订单会暂存在rabbitmq中,而订单消费者可以从容地从消息队列慢慢接收订单,向数据库保存
1.1 生产者-发送订单
1.1.1 pom.xml依赖
spring提供了更方便的消息队列访问接口,它对RabbitMQ的客户端API进行了封装,使用起来更加方便
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.1.2 application.yml
添加Rabbitmq的连接消息
1.1.3 修改主程序 RunPdApp
在主程序中添加下面的方法创建Queue实例
当创建RabbitMQ连接和信道后,Spring的RabbitMQ工具会自动在服务器中创建队列,
代码在
RabbitAdmin.declareQueues()
方法中
1.1.4 修改 OrderServiceImpl
1.2 消费者-接收订单,并保存到数据库
1.2.1 复制一份消费者 pdweb ,pd-web项目复制为pd-order-consumer
1.2.2 修改 application.yml
端口修改成81
1.2.3 新建OrderConsumer 消费者类
package com.pd;
import com.pd.pojo.PdOrder;
import com.pd.service.OrderService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
//通过@RabbitListener注解配置来接收消息 ,需要指定队列名 ,spring封装后的rabbitmq API
/**
* -自动创建实例
* -自动注册成为消费者
* -自动开始接收消息
* -自动处理收到消息
*/
@RabbitListener(queues = "orderQueue")
public class OrderConsumer {
//注入业务
@Autowired
private OrderService orderService;
//指定处理消息的方法,在同一个类中只能设置一次
@RabbitHandler
public void receive(PdOrder pdOrder) throws Exception {
orderService.saveOrder(pdOrder);
}
}
1.2.4 修改OrderServiceImpl的saveOrder() 方法
1.3 测试 (注意操作顺序)
1.注册登录
2.添加地址
3.下订单
1 注册登录
2 添加地址
3 下订单
4 启动消费者(pd_web_consumer)
2 rabbitmq-spring整合
2.1 简单模式
2.1.1 Main
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import javax.annotation.PostConstruct;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
//配置helloworld队列参数
@Bean
public Queue helloworldQueue(){
return new Queue("helloworld",false,false,false);
}
@Autowired
private Producer p;
/*
springboot执行流程
扫描创建实例 --> 自动注入 --> @PostConstruct --> 后续流程(启动消费者)
spring会自动执行@PostConstruct
*/
@PostConstruct
public void test() {
//在新的线程中执行阻塞操作,避免影响spring主线程的执行
new Thread(()->{
try {
Thread.sleep(3000);//等待消费者启动后再发消息
} catch (InterruptedException e) {
// e.printStackTrace();
}
p.send();
}).start();
}
}
2.1.2 Producer
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private AmqpTemplate t;
public void send(){
t.convertAndSend("helloworld","Hello world!");
}
}
2.1.3 Consumer
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/*
@RabbitListener 创建一个消费者,启动一个消费者线程开始接收消息
每个@RabbitListener都会创建一个消费者
自动创建实例
自动注册成为消费者
自动开始接收消息
自动调用消息处理方法
*/
@Component
public class Consumer {
@RabbitListener(queues = "helloworld")
public void receive (String msg){
System.out.println("收到:"+msg);
}
}
2.2 工作模式
生产者
创建两个消费者测试
合理分发 手动Ack,qos=1
-
手动Ack
spring集成rabbitmq,默认就是手动Ack,spring会自动发送回执 -
qos=1
yml中添加 prefech参数 预抓取,spring设置的默认值是250 ,需要设置成1
消息持久化
-
队列持久化
-
消息的持久化 ,spring默认已添加了持久参数
2.3 广播模式(群发)
fanout交换机
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
//新建交换机
@Bean
public FanoutExchange logs(){
//非持久,不自动删除
return new FanoutExchange("logs",false,false);
}
@Autowired
private Producer p;
/*
springboot执行流程
扫描创建实例 --> 自动注入 --> @PostConstruct --> 后续流程(启动消费者)
spring会自动执行@PostConstruct
*/
@PostConstruct
public void test() {
//在新的线程中执行阻塞操作,避免影响spring主线程的执行
new Thread(()->{
while (true){
System.out.println("输入消息:");
String s=new Scanner(System.in).nextLine();
p.send(s);
}
}).start();
}
}
@Component
public class Producer {
@Autowired
private AmqpTemplate t;
public void send(String msg){
//广播模式第二个参数无效
t.convertAndSend("logs","",msg.getBytes());
}
}
//创建自己的队列,与交换机绑定
//随机命名,非持久,独占
@Component
public class Consumer {
@RabbitListener(bindings = @QueueBinding(
//设置 队列,spring会随机命名,非持久,独占,自动删除
value = @Queue,//(name = "",durable = "",autoDelete = ""),
//交换机 declare = "false" 不在这里自动创建交换机
exchange = @Exchange(name = "logs",declare = "false")
))
public void receive1 (String msg){
System.out.println("消费者1收到:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
//设置 队列,spring会随机命名,非持久,独占,自动删除
value = @Queue,//(name = "",durable = "",autoDelete = ""),
//交换机 declare = "false" 不在这里自动创建交换机
exchange = @Exchange(name = "logs",declare = "false")
))
public void receive2 (String msg){
System.out.println("消费者2收到:"+msg);
}
}
2.4 路由模式 (关键字)
direct 直连模式
Producer
Main
Consumer
2.5 主题模式 (通配符关键字)
Main
Consumer
测试