消息队列简介
消息队列,英文名MessageQueue,简而言之就是把消息先存放放在一个队列里,然后通过交换机在合适的时机发出合适的量到接收消息的用户那里,通常会用这张图来解释其工作原理
其中发布订阅的就是生产者,而右侧接收消息的就是消费者,中间的队列里管存放,但不管分配,因为消息是消费者从中取出去的,一条消息被消费掉后就不再存在里面(这里只指普通类型消息队列,其他类型另算)。
环境配置
要实现消息队列,首先要准备安装环境,除了编译器外,要安装erlang和rabbitMQ,其中erlang是提供语言环境的
首先去官网下载两个安装包,
这里是erlang官网http://www.erlang.org/downloads
下载后安装都是默认选项,最后还需要配置环境变量,在系统变量和环境变量里各添加一条
以上两个都要根据自己的安装目录配置
如果配置好的话,在cmd里打erl可以看到信息
然后安装rabbitMQ
这是官网
https://www.rabbitmq.com/install-windows.html#installer,注意安装时以管理员身份运行
也是全部默认安装就可以
安装完,进入rabbitMQ的sbin目录下,在cmd输入
rabbitmq-plugins.bat enable rabbitmq_management
就可以在浏览器里可视化管理rabbitMQ了
然后在windows菜单里启动start服务
打开浏览器访问http://localhost:15672/
看到如图界面,用户名密码都是guest
看到这个界面说明配置好了
编写工作队列
消息队列的实现需要创建一个生产者,几个消费者,这几个角色都各自用一个项目实现,而消息队列用spring的相关包就可以实现其功能
首先我们在Idea里建立一个maven项目,作为父项目,什么也不用勾选,空白的就可以
然后在其中建一个Producer项目,几个Consumer项目,也是都不勾选
这是项目目录
在父项目的pom.xml里添加
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.0.4.RELEASE</version>
<configuration>
<fork>true</fork>
<includeSystemScope>true</includeSystemScope>
</configuration>
</plugin>
</plugins>
</build>
这些依赖从上到下依次是:父项目依赖设置,集成了tomcat的web项目依赖,rabbitmq客户端依赖,统一打包依赖(包括jdk版本和maven打包)
生产者项目
然后编写生产者的application.yml(resource目录下)
接着是配置spring启动类,正常模板样式没什么改动,注意在根包目录下,确保管理到所有包
然后是配置类,需要在其中创建出消息队列并注入容器中
@Configuration
public class RabbitMQConfig {
@Bean
public Queue createQueue(){
return new Queue("first-queue");
}
}
以及控制器,在其中注入rabbittemplate并用其指定路由key,将消息回显到前台
@RestController
@RequestMapping("/")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("sendMsg")
public String sendMsg(@RequestParam String message){
rabbitTemplate.convertAndSend("first-queue", message);
return "消息:“" + message + "”已发送!";
}
}
编写后项目目录
消费者
首先配置两个application.yml
B的内容和A差不多,只有端口号不同
A和B的启动类也没什么特别的
接着写两个项目的组件层
将监听器注册并指定消息队列key,从中获取通道名和正文内容
@Component
public class ConsumerA {
@RabbitListener(queues = "first-queue")
public void receiveMsg(String msg, Channel channel, Message message){
System.out.println("消费者1channel通道编号为:" + channel.getChannelNumber());
System.out.println("消费者1收到消息:" + new String(message.getBody()));
}
}
B也差不多
@Component
public class ConsumerB {
@RabbitListener(queues = "first-queue")
public void receiveMsg(String msg, Channel channel, Message message){
System.out.println("消费者2channel通道编号为:" + channel.getChannelNumber());
System.out.println("消费者2收到消息:" + new String(message.getBody()));
}
}
到这里编写完成
测试
打开15072端口的rm主页,通常这里是空的,这里是之前实验留下的,但注意,右边messages下三个参数都是0,就是没有
向网址栏里填写如下信息,发送消息到后台,回显到前台
这里已经有了内容
启动消费者A项目,控制台收到消息
内容被A消费掉了就没了
如果再发,再启动消费者B,B就会收到消息,之后如果再发,会轮流发给AB,类似负载均衡的方式,这是简单的消息队列,其他方式以后更新