SpringAMQP之队列和交换机

  • Post author:
  • Post category:其他



目录


安装配置


基础使用


WorkQueue


FanoutExchange


DirectExchange


安装配置

我们在使用RabbitMQ需要先在Docker中拉取RabbitMQ使用下面命令拉取

systemctl start docker #启动docker
docker pull rabbitmq:3-management
#安装这个版本的rabbitmq

这里我之前安装了最新的RabbitMQ发现打开不了网站,我用了这个版本的就打开了

在拉取镜像成功后,使用创建容器的命令创建RabbitMQ容器

docker run \
-e RABBITMQ_DEFAULT_USER=demo \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name rq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management

这样就算安装完成了RabbitMQ,并且我们登陆RabbitMQ

  • channels:操作MQ的工具
  • exchange:路由消息到队列中(交换机)
  • queue:缓存消息(队列)
  • virtual host:虚拟主机,是对queue\exchange等资源的逻辑分组

我们现在开始使用Java来发起最基本的publisher->equeue->consume,发布给队列,队列取值这种效果。

首先先导入依赖和配置yml文件

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
  rabbitmq:
    host: 192.168.3.26
    port: 5672
    virtual-host: /
    username: demo
    password: 123456

写一个测试发布的,在这之前我们需要,先创建一个队列(注意:这里已经使用了SpringBootTest,所以如果在同一个包下,如果有SpringbootApplicaiton的注解需要先注释掉,不然是会报错的)

基础使用


@RunWith(SpringRunner.class)
@SpringBootTest
public class Publisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test1(){
        String queuename = "simple.queue";
        String message = "hello,spring";
        rabbitTemplate.convertAndSend(queuename,message);
    }
}

这样之后,我们在写一个消费者,用于接收队列中的数据(这个我们需要springboot启动后,一直做监听)消费者中配置一样的yml文件和依赖

@Component
public class Consum {
    @RabbitListener(queues="simple.queue")
    public void listenT(String msg){
        System.out.println("接收到消息:"+msg);
    }
}

我首先没有启动消费者的springboot,所以会在队列中看到有一条数据

然后我们启动消费者

成功的接收到了队列中的消息,队列的消息也没有了,已经传送过来了

WorkQueue

下面我们将讲解workQueue

workQueue是一个发布者,和多个消费者,但是当中会有预取机制,会将每一个传送来的值均匀分布在每一个消费者上。

我将发布的利用循环,让其发送5次信息

@RunWith(SpringRunner.class)
@SpringBootTest
@EnableRabbit
public class Publisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test1(){
        String queuename = "simple.queue";
        String message = "hello,springmq";
        for (int i=0;i<5;i++)
        {
            rabbitTemplate.convertAndSend(queuename,message);
        }
    }
}

消费者中使用两个监听的来获取同一个队列

@Component
public class Consum {
@RabbitListener(queues="simple.queue")
public void listenT1(String msg){
    System.out.println("demo1接收到消息:"+msg);
}
@RabbitListener(queues="simple.queue")
public void listenT2(String msg){
    System.out.println("demo2接收到消息:"+msg);
}
}

结果如下

这就是workQueue的作用,下面我们将讲解FanoutExchange

FanoutExchange

交换机在这里作用就是改变我们使用的传输数据的方式,不是通过发布到队列,然后到队列当中去取,而是通过发布者发布给交换机,又交换机交给不同的队列,再由消费者来调用。

我们更改代码,现在使用注解的方式来进行交换机和队列的创建,并拉取他们之间连接,这里我们首先先创建交换机和队列以及彼此的连接,这里我新建了一个类,和监听的分开,更符合微服务的思想

@Configuration
public class Exchange {
    @Bean
    public FanoutExchange FanoutExchange1(){
        return new FanoutExchange("demo.list");
    }
    @Bean
    public Queue fanoutqueue1(){
        return new Queue("fanout.queue1");
    }
    @Bean
    public Queue fanoutqueue2(){
        return new Queue("fanout.queue2");
    }
    @Bean
    public Binding fanoutbinding(Queue fanoutqueue1, FanoutExchange FanoutExchange1){
        return BindingBuilder.bind(fanoutqueue1).to(FanoutExchange1);
    }
    @Bean
    public Binding fanoutbinding2(Queue fanoutqueue2, FanoutExchange FanoutExchange1){
        return BindingBuilder.bind(fanoutqueue2).to(FanoutExchange1);
    }
}

这里我创建两个队列和一个交换机。下面是监听类中添加对两种队列的监听。

@Component
public class Consum {

@RabbitListener(queues="simple.queue")
public void listenT1(String msg){
    System.out.println("demo1接收到消息:"+msg);
}
@RabbitListener(queues="simple.queue")
public void listenT2(String msg){
    System.out.println("demo2接收到消息:"+msg);
}
@RabbitListener(queues="fanout.queue1")
public void listenT3(String msg){
    System.out.println("demofanout1接收到消息:"+msg);
}
@RabbitListener(queues="fanout.queue2")
public void listenT4(String msg){
    System.out.println("demofanou2接收到消息:"+msg);
    }
}

发送就改变不是向队列发送,而是向交换机发送

@RunWith(SpringRunner.class)
@SpringBootTest
@EnableRabbit
public class Publisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test1(){
        String exchangename = "demo.list";
        String message = "hello,springmq";
        rabbitTemplate.convertAndSend(exchangename,"",message);
    }
}

在交换机和发送的消息中有一个“”,这个后面给大家说清楚,下面看结果

成功得到信息,然后讲解DirectExchange

DirectExchange

DirectExchange会将接受到消息根据规则路由到指定的Queue,当中重要的是使用bindingkey来控制,可以使用@RabbitListener声明Exange,Queue,Routingkey。

这里我们讲解了新的注解,就是我们可以直接在@RabbitListener中添加一些参数这样就可以不用使用@Bean的方法去一个个的声明了。从这里大家就可以看出来DirectExchange的不同点就在于使用的bindingkey,而这个值正是””。下面直接上代码

这边是消费者我们使用注解的形式进行开发,也是最简单的一种,注意因为上面我们使用的是finout,所以我们这里需要先删除一次交换机,再执行代码,让其自动创建新的交换机

@Component
public class Consum {

@RabbitListener(queues="simple.queue")
public void listenT1(String msg){
    System.out.println("demo1接收到消息:"+msg);
}
@RabbitListener(queues="simple.queue")
public void listenT2(String msg){
    System.out.println("demo2接收到消息:"+msg);
}
@RabbitListener(queues="fanout.queue1")
public void listenT3(String msg){
    System.out.println("demofanout1接收到消息:"+msg);
}
@RabbitListener(queues="fanout.queue2")
public void listenT4(String msg){
    System.out.println("demofanou2接收到消息:"+msg);
    }
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "demo.list", type = ExchangeTypes.DIRECT),
        key = {"red", "blue"}
    ))
public void listen5(String msg) {
    System.out.println("demodirect1接受到消息:" + msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "demo.list", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listen6(String msg) {
        System.out.println("demodirect2接受到消息:" + msg);
    }

}

发布者添加我们需要识别的bindkey就可以了


@RunWith(SpringRunner.class)
@SpringBootTest
@EnableRabbit
public class Publisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test1(){
        String exchangename = "demo.list";
        String message = "hello,springmq";
        rabbitTemplate.convertAndSend(exchangename,"red",message);
    }
}

这里我们先传red的,也就是两个都能收到

我们把当中bindingkey更改为blue也就是只有1能收到


@RunWith(SpringRunner.class)
@SpringBootTest
@EnableRabbit
public class Publisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test1(){
        String exchangename = "demo.list";
        String message = "hello,springmq";
        rabbitTemplate.convertAndSend(exchangename,"blue",message);
    }
}

这就是DirectExchange,后面还有一种是TopicExchange,这种的用法没有什么不同,就是增加了可以使用通配符来代替交换机的名称,只是偷偷懒,最后再讲一种:消息转换器



版权声明:本文为Demolist原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。