目录
安装配置
我们在使用RabbitMQ需要先在Docker中拉取RabbitMQ使用下面命令拉取
systemctl start docker #启动docker
docker pull rabbitmq:3-management
#安装这个版本的rabbitmq
这里我之前安装了最新的RabbitMQ发现打开不了网站,我用了这个版本的就打开了
在拉取镜像成功后,使用创建容器的命令创建RabbitMQ容器
docker run \
-e RABBITMQ_DEFAULT_USER=demo \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name rq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
这样就算安装完成了RabbitMQ,并且我们登陆RabbitMQ
- channels:操作MQ的工具
- exchange:路由消息到队列中(交换机)
- queue:缓存消息(队列)
- virtual host:虚拟主机,是对queue\exchange等资源的逻辑分组
我们现在开始使用Java来发起最基本的publisher->equeue->consume,发布给队列,队列取值这种效果。
首先先导入依赖和配置yml文件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.3.26
port: 5672
virtual-host: /
username: demo
password: 123456
写一个测试发布的,在这之前我们需要,先创建一个队列(注意:这里已经使用了SpringBootTest,所以如果在同一个包下,如果有SpringbootApplicaiton的注解需要先注释掉,不然是会报错的)
基础使用
@RunWith(SpringRunner.class)
@SpringBootTest
public class Publisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test1(){
String queuename = "simple.queue";
String message = "hello,spring";
rabbitTemplate.convertAndSend(queuename,message);
}
}
这样之后,我们在写一个消费者,用于接收队列中的数据(这个我们需要springboot启动后,一直做监听)消费者中配置一样的yml文件和依赖
@Component
public class Consum {
@RabbitListener(queues="simple.queue")
public void listenT(String msg){
System.out.println("接收到消息:"+msg);
}
}
我首先没有启动消费者的springboot,所以会在队列中看到有一条数据
然后我们启动消费者
成功的接收到了队列中的消息,队列的消息也没有了,已经传送过来了
WorkQueue
下面我们将讲解workQueue
workQueue是一个发布者,和多个消费者,但是当中会有预取机制,会将每一个传送来的值均匀分布在每一个消费者上。
我将发布的利用循环,让其发送5次信息
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableRabbit
public class Publisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test1(){
String queuename = "simple.queue";
String message = "hello,springmq";
for (int i=0;i<5;i++)
{
rabbitTemplate.convertAndSend(queuename,message);
}
}
}
消费者中使用两个监听的来获取同一个队列
@Component
public class Consum {
@RabbitListener(queues="simple.queue")
public void listenT1(String msg){
System.out.println("demo1接收到消息:"+msg);
}
@RabbitListener(queues="simple.queue")
public void listenT2(String msg){
System.out.println("demo2接收到消息:"+msg);
}
}
结果如下
这就是workQueue的作用,下面我们将讲解FanoutExchange
FanoutExchange
交换机在这里作用就是改变我们使用的传输数据的方式,不是通过发布到队列,然后到队列当中去取,而是通过发布者发布给交换机,又交换机交给不同的队列,再由消费者来调用。
我们更改代码,现在使用注解的方式来进行交换机和队列的创建,并拉取他们之间连接,这里我们首先先创建交换机和队列以及彼此的连接,这里我新建了一个类,和监听的分开,更符合微服务的思想
@Configuration
public class Exchange {
@Bean
public FanoutExchange FanoutExchange1(){
return new FanoutExchange("demo.list");
}
@Bean
public Queue fanoutqueue1(){
return new Queue("fanout.queue1");
}
@Bean
public Queue fanoutqueue2(){
return new Queue("fanout.queue2");
}
@Bean
public Binding fanoutbinding(Queue fanoutqueue1, FanoutExchange FanoutExchange1){
return BindingBuilder.bind(fanoutqueue1).to(FanoutExchange1);
}
@Bean
public Binding fanoutbinding2(Queue fanoutqueue2, FanoutExchange FanoutExchange1){
return BindingBuilder.bind(fanoutqueue2).to(FanoutExchange1);
}
}
这里我创建两个队列和一个交换机。下面是监听类中添加对两种队列的监听。
@Component
public class Consum {
@RabbitListener(queues="simple.queue")
public void listenT1(String msg){
System.out.println("demo1接收到消息:"+msg);
}
@RabbitListener(queues="simple.queue")
public void listenT2(String msg){
System.out.println("demo2接收到消息:"+msg);
}
@RabbitListener(queues="fanout.queue1")
public void listenT3(String msg){
System.out.println("demofanout1接收到消息:"+msg);
}
@RabbitListener(queues="fanout.queue2")
public void listenT4(String msg){
System.out.println("demofanou2接收到消息:"+msg);
}
}
发送就改变不是向队列发送,而是向交换机发送
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableRabbit
public class Publisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test1(){
String exchangename = "demo.list";
String message = "hello,springmq";
rabbitTemplate.convertAndSend(exchangename,"",message);
}
}
在交换机和发送的消息中有一个“”,这个后面给大家说清楚,下面看结果
成功得到信息,然后讲解DirectExchange
DirectExchange
DirectExchange会将接受到消息根据规则路由到指定的Queue,当中重要的是使用bindingkey来控制,可以使用@RabbitListener声明Exange,Queue,Routingkey。
这里我们讲解了新的注解,就是我们可以直接在@RabbitListener中添加一些参数这样就可以不用使用@Bean的方法去一个个的声明了。从这里大家就可以看出来DirectExchange的不同点就在于使用的bindingkey,而这个值正是””。下面直接上代码
这边是消费者我们使用注解的形式进行开发,也是最简单的一种,注意因为上面我们使用的是finout,所以我们这里需要先删除一次交换机,再执行代码,让其自动创建新的交换机
@Component
public class Consum {
@RabbitListener(queues="simple.queue")
public void listenT1(String msg){
System.out.println("demo1接收到消息:"+msg);
}
@RabbitListener(queues="simple.queue")
public void listenT2(String msg){
System.out.println("demo2接收到消息:"+msg);
}
@RabbitListener(queues="fanout.queue1")
public void listenT3(String msg){
System.out.println("demofanout1接收到消息:"+msg);
}
@RabbitListener(queues="fanout.queue2")
public void listenT4(String msg){
System.out.println("demofanou2接收到消息:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "demo.list", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listen5(String msg) {
System.out.println("demodirect1接受到消息:" + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "demo.list", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listen6(String msg) {
System.out.println("demodirect2接受到消息:" + msg);
}
}
发布者添加我们需要识别的bindkey就可以了
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableRabbit
public class Publisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test1(){
String exchangename = "demo.list";
String message = "hello,springmq";
rabbitTemplate.convertAndSend(exchangename,"red",message);
}
}
这里我们先传red的,也就是两个都能收到
我们把当中bindingkey更改为blue也就是只有1能收到
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableRabbit
public class Publisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test1(){
String exchangename = "demo.list";
String message = "hello,springmq";
rabbitTemplate.convertAndSend(exchangename,"blue",message);
}
}
这就是DirectExchange,后面还有一种是TopicExchange,这种的用法没有什么不同,就是增加了可以使用通配符来代替交换机的名称,只是偷偷懒,最后再讲一种:消息转换器