消息分发方式
Work Queues——工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
原本的一个消费者变成了现在的多个工作线程
注意!一个消息只能被处理一次,不能处理多次
轮询分发消息
多个工作线程从队列中接收消息的时候采用的是轮询的方式
抽取工具类
获取信道
public class RabbitMqUtil {
//获取连接
public static Channel getChannel() throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//连接到RabbitMQ的队列
factory.setHost("43.143.166.162");
//用户名
factory.setUsername("guest");
//密码
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//获取该连接的信道
Channel channel = connection.createChannel();
return channel;
}
}
开启两个工作线程
工作线程1代码
- 先启动该线程
public class Worker01 {
//队列名称
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtil.getChannel();
/**
* @description: 消费者消费消息
* @Param:
* 1. 消费哪个队列
* 2. 消费成功之后是否要自动应答 true代表自动应答
* 3. 消费者成功消费的回调
* 4. 消费者未成功消费的回调
*/
//接收消息
System.out.println("w1等待接收消息");
channel.basicConsume(
QUEUE_NAME,
true,
(consumerTag,message)->{
System.out.println(consumerTag+"接收到的消息:"+new String(message.getBody()));
},
consumerTag ->{
System.out.println(consumerTag+"消费者取消消费!");
}
);
}
}
- 配置该类使其允许多个线程同时执行
- 更改一下输出,使其更明了
// System.out.println("w1等待接收消息");
System.out.println("w2等待接收消息");
- 再次启动该类
开启任务线程
public class Task01 {
//队列名称
//ctrl+shift+u 小写变大写
public static final String QUEUE_NAME ="hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtil.getChannel();
/**
* @description: 根据信道声明一个队列
* @Param:
* 1. 队列名称
* 2. 队列里面的消息是否持久化,默认存储在内存中
* 3. 该队列是否只供一个消费者进行消费(是否进行消息共享),true可以多个消费者消费
* 4. 是否自动删除,最后一个消费者断开连接之后,该队列是否自动删除。true自动删除,false不删除
* 5. 其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//从控制台接收信息
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()) {
String message = scanner.nextLine();
/**
* @description: 发送一个消息
* @Param:
* 1. 发送到哪个交换机
* 2. 路由的key值是哪个,本次是队列的名称
* 3. 其他参数信息
* 4. 发送的消息
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成");
}
}
}
在控制台输入:
AA
BB
CC
DD
发现两个工作线程轮询的被分发消息:
不公平分发消息
在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮询分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个
消费者 1
处理任务的速度非常快,而另外一个
消费者 2
处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活。
为了避免这种情况,
在消费者中消费之前
,我们可以设置参数
channel.basicQos(1);
//不公平分发
//prefetch:预取值为1,也就是该消费者最多只能接收一个消息,该消息处理完之后才能接收其余消息
//这样就能保证谁的处理能力强,谁就会接收处理更多的消息
int prefetchCount = 1;
channel.basicQos(prefetchCount);
//采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelC
意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,
我目前只能处理一个 任务
,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完 成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加 新的 worker 或者改变其他存储任务的策略。
此时出现以下情况,处理能力强的C1处理的消息数量远多于处理能力弱的C2
预取值(prefetch)
带权的消息分发
本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息,另外来自消费 者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能
限制此缓冲区的大小
,
以避免缓冲区里面无限制的未确认消息问题
。这个时候就可以通过使用 basic.qos 方法设 置“预取计数”值来完成的。
该值定义
Channel上
允许的
未确认消息的最大数量
。一旦数量达到配置的数量, RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知 这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。
通常,增加预取将提高 向消费者传递消息的速度。
虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗
(随机存取存储器)。应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范 围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。
预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。
也就是说:
prefetchcount越大吞吐量越高,但是消费者的内存消耗越高,有消息堆积导致服务器宕机的风险
prefetchcount越小安全性越高,但是吞吐量变小。