如何在rabbitmq中实现一个生产者,多个消费者,多个消费者都能收到同一条消息

  • Post author:
  • Post category:其他




如何在rabbitmq中实现一个生产者,多个消费者,多个消费者都能收到同一条消息

场景:用户登录,邀请其它用户进行视频会议,收到邀请的用户进入会议

rabbitmq实现思路:

选型:发布订阅模式(Publish/Subscribe)

一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。

这种情况下,我们有四种交换机可供选择,分别是:

  • Direct
  • Fanout
  • Topic
  • Header

由于消费者的数量不固定,所以要动态生成临时队列,无法指定routingkey因此选fanout模式

FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用

代码实现:

1.pom文件引入rabbitmq依赖

 <!-- rabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.配置文件

server:
  port: 9091
spring:
  application:
    name: rabbitmq
  # rabbitmq配置
  rabbitmq:
    host: 192.168.8.142
    port: 5672
    username: admin
    password: admin
    virtual-host: my_vhost

3.constant类

package com.anychat.rabbitmqtest.constant;

/**
 * @author Liby
 * @date 2022-05-05 10:02
 * @description:
 * @version:
 */

public class RabbitmqConstant {
    public  static final String  MEETING_FANOUT_EXCHANGE = "meeting_exchange";
}

4.用户实体类

package com.anychat.rabbitmqtest.entity;

/**
 * @author Liby
 * @date 2022-05-06 09:39
 * @description:
 * @version:
 */

public class User {
    private Integer userId;
    private String username;

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public User(Integer userId, String username) {
        this.userId = userId;
        this.username = username;
    }
}

5.工具类

package com.anychat.rabbitmqtest.util;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * @author Liby
 * @date 2022-04-28 10:27
 * @description:
 * @version:
 */

public class RabbitmqUtil {
    @Autowired
    private static RabbitTemplate rabbitTemplate;

    public static Channel getChannel() {
        Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
        return channel;

    }


}

6.消费者类

package com.anychat.rabbitmqtest.consumer;

import cn.hutool.core.util.StrUtil;
import com.anychat.rabbitmqtest.constant.RabbitmqConstant;
import com.anychat.rabbitmqtest.entity.User;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author Liby
 * @date 2022-04-25 11:18
 * @description:消费者,动态创建临时队列
 * @version:
 */
@Slf4j
@Component
public class FanoutConsumer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void createQueue(User user) {

        //创建信道
        Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);

        try {
            //声明一个交换机与生产者相同

            channel.exchangeDeclare(RabbitmqConstant.MEETING_FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
            //获取一个随机的队列名称,使用默认方式,产生的队列为临时队列,在没有消费者时将会自动删除
            String queueName = channel.queueDeclare().getQueue();
            //用户Id与队列名绑定
            ConcurrentHashMap<String, Integer> userQueueMap = new ConcurrentHashMap<>();
            userQueueMap.putIfAbsent(queueName, user.getUserId());
            //关联 exchange 和 queue ,因为是广播无需指定routekey,routingKey设置为空字符串
            // channel.queueBind(queue, exchange, routingKey)
            channel.queueBind(queueName, RabbitmqConstant.MEETING_FANOUT_EXCHANGE, "");
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    //对信息进行操作
                    String message = new String(body, "UTF-8");
                    if (StrUtil.isNotBlank(message)) {
                        String[] receiveIds = message.split(",");
                        Integer userId = userQueueMap.get(queueName);
                        for (String id : receiveIds) {
                            if (userId.equals(Integer.valueOf(id))) {
                                log.info("用户{}收到入会邀请", id);
                            }

                        }

                    }

                }
            };
            //true 自动回复ack
            channel.basicConsume(queueName, true, consumer);
        } catch (Exception ex) {
        }
    }
}


7.controller类

package com.anychat.rabbitmqtest.controller;

import com.anychat.rabbitmqtest.constant.RabbitmqConstant;
import com.anychat.rabbitmqtest.consumer.FanoutConsumer;
import com.anychat.rabbitmqtest.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author Liby
 * @date 2022-04-24 16:34
 * @description:生产者
 * @version:
 */
@RestController
@Slf4j
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private FanoutConsumer fanoutConsumer;
    /**
    * 模拟用户登录后,创建一个临时队列,与该用户绑定
    */
    @PostMapping("/login")
    public String login(){
        //模拟三个用户登录
        int userNum=3;

        for (int i = 0; i < userNum; i++) {
            //用户绑定临时队列,并监听队列
            fanoutConsumer.createQueue(new User(i, "用户" + i));
            log.info("用户{}登录成功",i);
        }
        return "用户登录成功";

    }

    @PostMapping("/meeting")
    public String meeting(){
        String message="1,2";
        log.info("邀请用户{}进入会议",message);
        //发送消息,要求userId为2和3的用户进入会议
        rabbitTemplate.convertAndSend(RabbitmqConstant.MEETING_FANOUT_EXCHANGE,"",message);
        return "发送成功";

    }
}

postman分别调用login和meeting两个接口

可以看到日志打印
在这里插入图片描述



版权声明:本文为weixin_43757027原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。