SpringBoot中使用RabbitMQ

  • Post author:
  • Post category:其他




SpringBoot中使用RabbitMQ



一、搭建初始环境

  1. 引入依赖
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置配置文件

RabbitMq的相关配置可以参考RabbitProperties类

spring:
  rabbitmq:
    host: xxx
    port: 5672
    username: admin
    password: admin
    virtual-host: / # 虚拟主机


RabbitTemplate

用来简化操作 使用时候直接在项目中注入即可使用

配置bean

@Configuration
public class rabbitconfig {


    @Bean
    public TopicExchange setop(){
        return  new TopicExchange("latte3.top",true,false);
    }


    @Bean
    public Binding binding1a(TopicExchange topicExchange,
                             Queue query5) {
        return BindingBuilder.bind(query5)
                .to(topicExchange)
                .with("com.*");
    }


    @Bean
    public  FanoutExchange fanoutExchange()
    {
        return  new FanoutExchange("latte3.fanout",true,false);
    }

    @Bean
    public  FanoutExchange dead()
    {
        return  new FanoutExchange("dead",true,false);
    }


    @Bean
    public  Queue query5(){
        return  new Queue("query5");
    }


    @Bean
    public Binding binding2(FanoutExchange fanoutExchange,Queue queue5 )
    {
        return  BindingBuilder.bind(queue5)
                .to(fanoutExchange);
    }
    @Bean
    public Queue queue5(){
        Map<String, Object> args = new HashMap<>();

        args.put("x-message-ttl",5000);
        return  new Queue("queue5",true,false,false,args);
    }
    @Bean
    public Binding binding1a3(FanoutExchange fanoutExchange,
                             Queue queue5) {
        return BindingBuilder.bind(queue5)
                .to(fanoutExchange);
    }

    @Bean
    public Queue queue7(){
        Map<String, Object> args = new HashMap<>();

        args.put("x-message-ttl",5000);
        args.put("x-dead-letter-exchange","dead");
        return  new Queue("queue7",true,false,false,args);
    }

    @Bean
    public Binding binding1a4(
                              Queue queue7) {
        return BindingBuilder.bind(queue7)
                .to(dead());
    }
}



二、 第一种hello world模型使用

@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testHello(){
  rabbitTemplate.convertAndSend("hello","hello world");
}

开发消费者

@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloCustomer {
    @RabbitHandler
    public void receive1(String message){
        System.out.println("message = " + message);
    }
}



三、第二种work模型使用

开发生产者

@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testWork(){
  for (int i = 0; i < 10; i++) {
    rabbitTemplate.convertAndSend("work","hello work!");
  }
}

开发消费者

@Component
public class WorkCustomer {
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message){
        System.out.println("work message1 = " + message);
    }
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message){
        System.out.println("work message2 = " + message);
    }
}


说明:默认在Spring AMQP实现中Work这种方式就是公平调度,如果需要实现能者多劳需要额外配置



四、Fanout 广播模型

开发生产者

@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testFanout() throws InterruptedException {
  rabbitTemplate.convertAndSend("logs","","这是日志广播");
}

开发消费者

@Component
public class FanoutCustomer {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(name="logs",type = "fanout")
    ))
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, //创建临时队列
            exchange = @Exchange(name="logs",type = "fanout")  //绑定交换机类型
    ))
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}



五、Route 路由模型

开发生产者

@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testDirect(){
  rabbitTemplate.convertAndSend("directs","error","error 的日志信息");
}

开发消费者

@Component
public class DirectCustomer {
    @RabbitListener(bindings ={
            @QueueBinding(
                    value = @Queue(),
                    key={"info","error"},
                    exchange = @Exchange(type = "direct",name="directs")
            )})
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }
    @RabbitListener(bindings ={
            @QueueBinding(
                    value = @Queue(),
                    key={"error"},
                    exchange = @Exchange(type = "direct",name="directs")
            )})
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}



六、Topic 订阅模型(动态路由模型)

开发生产者

@Autowired
private RabbitTemplate rabbitTemplate;
//topic
@Test
public void testTopic(){
  rabbitTemplate.convertAndSend("topics","user.save.findAll","user.save.findAll 的消息");
}

开发消费者

@Component
public class TopCustomer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    key = {"user.*"},
                    exchange = @Exchange(type = "topic",name = "topics")
            )
    })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    key = {"user.#"},
                    exchange = @Exchange(type = "topic",name = "topics")
            )
    })
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}

在这里插入图片描述



版权声明:本文为qq_43803285原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。