如何在rabbitmq中实现一个生产者,多个消费者,多个消费者都能收到同一条消息
场景:用户登录,邀请其它用户进行视频会议,收到邀请的用户进入会议
rabbitmq实现思路:
选型:发布订阅模式(Publish/Subscribe)
一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。
这种情况下,我们有四种交换机可供选择,分别是:
- Direct
- Fanout
- Topic
- Header
由于消费者的数量不固定,所以要动态生成临时队列,无法指定routingkey因此选fanout模式
FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用
代码实现:
1.pom文件引入rabbitmq依赖
<!-- rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置文件
server:
port: 9091
spring:
application:
name: rabbitmq
# rabbitmq配置
rabbitmq:
host: 192.168.8.142
port: 5672
username: admin
password: admin
virtual-host: my_vhost
3.constant类
package com.anychat.rabbitmqtest.constant;
/**
* @author Liby
* @date 2022-05-05 10:02
* @description:
* @version:
*/
public class RabbitmqConstant {
public static final String MEETING_FANOUT_EXCHANGE = "meeting_exchange";
}
4.用户实体类
package com.anychat.rabbitmqtest.entity;
/**
* @author Liby
* @date 2022-05-06 09:39
* @description:
* @version:
*/
public class User {
private Integer userId;
private String username;
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public User(Integer userId, String username) {
this.userId = userId;
this.username = username;
}
}
5.工具类
package com.anychat.rabbitmqtest.util;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @author Liby
* @date 2022-04-28 10:27
* @description:
* @version:
*/
public class RabbitmqUtil {
@Autowired
private static RabbitTemplate rabbitTemplate;
public static Channel getChannel() {
Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
return channel;
}
}
6.消费者类
package com.anychat.rabbitmqtest.consumer;
import cn.hutool.core.util.StrUtil;
import com.anychat.rabbitmqtest.constant.RabbitmqConstant;
import com.anychat.rabbitmqtest.entity.User;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author Liby
* @date 2022-04-25 11:18
* @description:消费者,动态创建临时队列
* @version:
*/
@Slf4j
@Component
public class FanoutConsumer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void createQueue(User user) {
//创建信道
Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
try {
//声明一个交换机与生产者相同
channel.exchangeDeclare(RabbitmqConstant.MEETING_FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
//获取一个随机的队列名称,使用默认方式,产生的队列为临时队列,在没有消费者时将会自动删除
String queueName = channel.queueDeclare().getQueue();
//用户Id与队列名绑定
ConcurrentHashMap<String, Integer> userQueueMap = new ConcurrentHashMap<>();
userQueueMap.putIfAbsent(queueName, user.getUserId());
//关联 exchange 和 queue ,因为是广播无需指定routekey,routingKey设置为空字符串
// channel.queueBind(queue, exchange, routingKey)
channel.queueBind(queueName, RabbitmqConstant.MEETING_FANOUT_EXCHANGE, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
//对信息进行操作
String message = new String(body, "UTF-8");
if (StrUtil.isNotBlank(message)) {
String[] receiveIds = message.split(",");
Integer userId = userQueueMap.get(queueName);
for (String id : receiveIds) {
if (userId.equals(Integer.valueOf(id))) {
log.info("用户{}收到入会邀请", id);
}
}
}
}
};
//true 自动回复ack
channel.basicConsume(queueName, true, consumer);
} catch (Exception ex) {
}
}
}
7.controller类
package com.anychat.rabbitmqtest.controller;
import com.anychat.rabbitmqtest.constant.RabbitmqConstant;
import com.anychat.rabbitmqtest.consumer.FanoutConsumer;
import com.anychat.rabbitmqtest.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author Liby
* @date 2022-04-24 16:34
* @description:生产者
* @version:
*/
@RestController
@Slf4j
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private FanoutConsumer fanoutConsumer;
/**
* 模拟用户登录后,创建一个临时队列,与该用户绑定
*/
@PostMapping("/login")
public String login(){
//模拟三个用户登录
int userNum=3;
for (int i = 0; i < userNum; i++) {
//用户绑定临时队列,并监听队列
fanoutConsumer.createQueue(new User(i, "用户" + i));
log.info("用户{}登录成功",i);
}
return "用户登录成功";
}
@PostMapping("/meeting")
public String meeting(){
String message="1,2";
log.info("邀请用户{}进入会议",message);
//发送消息,要求userId为2和3的用户进入会议
rabbitTemplate.convertAndSend(RabbitmqConstant.MEETING_FANOUT_EXCHANGE,"",message);
return "发送成功";
}
}
postman分别调用login和meeting两个接口
可以看到日志打印
版权声明:本文为weixin_43757027原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。