1. MQ经常应用于哪些业务场景
文章目录
1.应用解耦
传统模式的缺点:
系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!
中间件模式的的优点:
将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。
2.异步处理
场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种
串行方式:
串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.
并行方式
将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。
假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,英爱是写入数据库后就返回.
(3)消息队列
引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理
传统模式的缺点:
一些非必要的业务逻辑以同步的方式运行,太耗费时间。
中间件模式的的优点:
将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度
流量消峰
流量削峰一般在秒杀活动中应用广泛
场景: 秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用
:
- 1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)
- 2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
传统模式的缺点:
- 1.并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常 中间件模式:
- 2.消息被MQ保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒1000个数据,这样慢慢写入数据库,这样就不会卡死数据库了。
MQ的缺点
1.
系统复杂性增加
。毕竟是增加了一个中间件MQ,那么系统变得更复杂,就是不可避免的。但是,与其说是系统复杂性增加,不如说是给相关开发人员带来的新的学习成本。但是,一项技术本身就是这样,学时很痛苦,学会了,它就会变成一把利剑,帮助您开疆辟土。
2.
系统可用性降低
。假设一个系统由若干个节点链式组成,每个节点出问题的概率是相同的,那么,20个节点的系统出问题的概率显然要高于10个节点的系统。所以,从这个角度来看,毕竟是增加了一个MQ中间件,出问题的概率显然会增大,系统可用性就会降低。
RabbitMQ
AMQP 与JMS 区别
- JMS是定义了统一接口,来对消息操作进行统一
- AMQP是基于一种协议 来统一数据交互的格式
- JMS限定了只能是java语言;AMQP是一种协议,不规定实现方式,所以是跨语言的
- JMS规定了两种消息模式,而AMQP的消息模式是更加丰富多样的
消息队列的产品
市场上常见的消息队列有如下:
- ActiveMQ:基于JMS
- ZeroMQ:基于C语言开发
-
RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
-
RocketMQ:基于JMS,阿里巴巴产品
-
Kafka:类似MQ的产品;分布式消息系统,高吞吐量
RabbitMQ 基础架构如下图:
- broker: 意为中间人 , 经纪人 接受和分发消息的应用
-
virtual host :虚拟机,类似于namespace 。当有多个用户同时访问一个rabbitMq server
时,可以划分出多个vhost,每个vhost可以创建自己的Exchange 交换机,与queue 消息队列 - connection : 将 publisher/consumer 与broker 进行 TCP 连接的组件。
- channel : 信道 由于 每一次 访问都创建一个connection 那么 当消息量大的时候 ,整个开销量时巨大的,因此 在 connection 内部建立多个轻量级的 Channel Connection ,每个 用户 或者 broker 都通过AMQP方法中 channel id 识别 channel ,因此 每个channel之间都是隔离,独立的。
- Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
- queue:队列 ,消息最终被送到这里等待 consumer 取走
- Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
RabbitMQ运转流程
在入门案例中:
- 生产者发送消息
- 生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;
- 声明队列并设置属性;如是否排它,是否持久化,是否自动删除;
- 将路由键(空字符串)与队列绑定起来;
- 发送消息至RabbitMQ Broker;
- 关闭信道;
- 关闭连接;
消费者接收消息
- 消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker
- 向Broker 请求消费相应队列中的消息,设置相应的回调函数;
- 等待Broker回应闭关投递响应队列中的消息,消费者接收消息;
- 确认(ack,自动确认)接收到的消息;
- RabbitMQ从队列中删除相应已经被确认的消息;
- 关闭信道;
-
关闭连接;
RabbitMQ工作模式(重要)
work queue 工作模式
相较于 入门的普通模式,多了一个或多个 消费者 ,共同消费一个 broker中的消息。
应用场景 :用于 任务多 任务重的 应用 可以提高处理消息的速度。
消息应答
为了保证发出去的信息不丢失,rabbitmq引进了消息应答机制,消费者在收受到消息,并消费了消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把消息删除了。
自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
消息应答的方法
channel.basicAck() 用于肯定确认
RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了
channel.basicNack() 用于否定确认
channel.basicReject()用于否定确认
与Channel.basicNack相比少一个参数
不处理该消息了直接拒绝,可以将其丢弃了
手动应答的代码
消息生产者
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = new String(scanner.next());
System.out.println("消费者启动等待消费消息....................");
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完成:"+message);
}
消费者代码
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
DeliverCallback deliverCallback =(consumerTag,delivery)->{
String receiveMessage = new String(delivery.getBody());
System.out.println("消费者接收到消息"+receiveMessage);
/**
* 确认应答
* 1.哪个消息
* 2.应答多个消息还是应答一个消息
* true:把当前消息和之前的消息一起应答
* false:只应答当前消息
*/
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
/**
* 拒绝确认收到应答
* 1.哪个消息
* 2.拒绝多个消息还是拒绝一个消息
* true:把当前消息和之前的消息一起拒绝
* false:只拒绝当前消息
* 3.是否重新入队
* true:消息重新入队
* false:丢弃消息或者进入到死信队列
*/
channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);
};
boolean autoAck =false;
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
System.out.println("消费者取消信息"+consumerTag);
});
RabbitMQ 持久化
如果 因为 rabbitMq 停掉而 删除了 队列和消息,这样就会影响工作效率,为了保证rabbitmq停掉,消息不丢失,就要告诉 rabbitmq这样做,将消息队列持久化,消息持久化。
队列和消息持久化
Channel channel = RabbitMqUtils.getChannel();
//让队列持久化
boolean durable=true;
channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
System.out.println("等待输入消息");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message=scanner.next();
// MessageProperties.PERSISTENT_TEXT_PLAIN消息持久化
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("消息发送完成:"+message);
}
不公平分发
(prefetchCount预取值)
在最开始的时候我们学习到RabbitMQ分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ并不知道这种情况它依然很公平的进行分发。
不公平分发部分代码
int prefetchCount=2;
channel.basicQos(prefetchCount);
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());
SleepUtils.sleep(30);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
System.out.println("消费者接收到的消息:"+receivedMessage);
};
=======================================================
int prefetchCount=3;
channel.basicQos(prefetchCount);
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());
SleepUtils.sleep(1);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
System.out.println("消费者接收到的消息:"+receivedMessage);
};
发布确认(重要)
简单来说,就是 生产者发送消息之后,如果精准的发送到,每一个队列,那么broker就会发送一个消息发布确认给生产者告诉生产者已经收到消息。消息生产者将信道设置成confirm模式 ,每一个经过信道的消息都有一个唯一的id。
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。
开启发布确认的方法
//开启发步确认
channel.confirmSelect();
//等待broker确认收到消息的一个方法
Boolean flag = channel.waitForConfirms();
单个消息发布确认代码
//开启发布确认 ,目的时为了发送消息更加安全
channel.confirmSelect();
//开始时间
long begin =System.currentTimeMillis();
for(int i=0;i<MESSAGE_COUNT;i++){
String message = "消息"+i;
channel.basicPublish("",queueName,null,message.getBytes());
//等待broker确认收到消息的一个方法
Boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("broker已经收到消息");
}
}
批量消息发布确认代码
//开启发布确认 ,目的时为了发送消息更加安全
channel.confirmSelect();
int batchSize=50;
int outStandingMessageCount=0;
long begin =System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息"+i;
channel.basicPublish("",queueName,null,message.getBytes());
outStandingMessageCount++;
if(batchSize==outStandingMessageCount) {
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("broker已经收到消息");
outStandingMessageCount = 0;
}
}
}
if(outStandingMessageCount>0){
boolean flag = channel.waitForConfirms();
System.out.println("broker已经收到消息");
outStandingMessageCount=0;
}
异步消息发布确认代码
channel.confirmSelect();
/**
* ConcurrentSkipListMap是线程安全的一个map
* 特点 1.它可以将序号与消息进行关联
* 2.只要给定序号 就会把小于等于当前序号的值作为一个map 提取出来 (调用 headMap()方法)
*/
ConcurrentSkipListMap<Long,String> outstandingConfirmMap = new ConcurrentSkipListMap<>();
/**
* 确认收到消息回调
* 1.sequenceNumber当前消息序号 (deliveryTag)
* 2.multiple处理一个还是多个消息
* true处理多个消息 处理deliveryTag序号之前的所以的消息
* false单个
*/
//确认发布消息回调函数,
ConfirmCallback ackCallback=(sequenceNumber,multiple)->{
if (multiple) {
//如果消息已经发布,就将发送的消息从outstandingConfirmMap中取出放入另一个Map中,然后将Map清空
ConcurrentNavigableMap<Long, String> comfirmedPartMap = outstandingConfirmMap.headMap(sequenceNumber);
comfirmedPartMap.clear();
}else{
//只处理签收当前sequenceNumber消息 将序号为sequenceNumber的已确认发布的消息移除
outstandingConfirmMap.remove(sequenceNumber);
}
};
未确认发布消息回调函数,
ConfirmCallback nackCallback=(sequenceNumber,multiple)->{
String message=outstandingConfirmMap.get(sequenceNumber);
System.out.println("序号为"+sequenceNumber+"的消息"+message+"需要重新发送");
};
channel.addConfirmListener(ackCallback,nackCallback);
long begin = System.currentTimeMillis();
for (int i = 0; i <MESSAGE_COUNT ; i++) {
String message ="消息"+i;
//将消息发送到 ConcurrentSkipListMap
outstandingConfirmMap.put(channel.getNextPublishSeqNo(),message);
channel.basicPublish("",queueName,null,message.getBytes());
}
异步发布确认,性能高 ,资源利用率高,相较于前两种更节省时间
交换机(Exchange)
rabbitmq实际上不会把消息直接发送到消息队列,而是先发送到交换机上,然后交换机会根据路由规则把消息发送到 不同的队列。这就得由交换机的类型来决定。
类型
(直接)direct (主题)topic (扇出)fanout (标题)header 无名交互机(默认为 “ ”)
临时队列的创建方法
String queueName = channel.queueDeclare().getQueue()
绑定binding
表示 交换机跟哪个队列进行绑定
Fanout
代码
生产者
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
消费者1
DeliverCallback deliverCallback =(consumerTag,delivery)->{
String receiveMessage = new String(delivery.getBody());
System.out.println(receiveMessage);
};
channel.basicConsume(queue,true,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+"消费者取消消费消息");
});
消费者2
DeliverCallback deliverCallback =(consumerTag,delivery)->{
String receiveMessage= new String(delivery.getBody());
File file = new File("G:\\file\\log.txt");
FileUtils.writeStringToFile(file,receiveMessage,"UTF_8",true);
};
channel.basicConsume(queue,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+"消费者取消消费信息");
});
Direct exchange介绍
上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志消息避免浪费磁盘空间。Fanout这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用direct这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey队列中去。
演示代码
生产者
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//创建多个bindingKey
Map<String,String> bindingKeyMap =new HashMap<>();
bindingKeyMap.put("info","普通的消息");
bindingKeyMap.put("warning","警告warning消息");
bindingKeyMap.put("error","错误erro消除");
//debug没有消费这接收这个消息 所有就丢失了
bindingKeyMap.put("debug","调试debug信息");
for(Map.Entry<String,String> bindingKeyEntry: bindingKeyMap.entrySet()){
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
System.out.println("消息发送完成:"+message);
}
消费者1
channel.queueBind(queueName,EXCHANGE_NAME,"info");
channel.queueBind(queueName,EXCHANGE_NAME,"warning");
System.out.println("C1消费者启动,把接受到的消息打印在控制台");
DeliverCallback deliverCallback=(consumerTag,delivery)->{
String receivedMessage = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("C1消费者接收路由"+routingKey+",消息:"+receivedMessage);
};
channel.basicConsume(queueName,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+"消费者取消消费消息");
});
消费者2
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"error");
System.out.println("C2消费者启动,把接受到的消息打印在控制台");
DeliverCallback deliverCallback=(consumerTag,delivery)->{
String receivedMessage = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("C2消费者接收路由"+routingKey+",消息:"+receivedMessage);
};
channel.basicConsume(queueName,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+"消费者取消消费消息");
});
Topics
在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的fanout交换机,而是使用了direct交换机,从而有能实现有选择性地接收日志。
尽管使用direct交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.base和info.advantage,某个队列只想info.base的消息,那这个时候direct就办不到了。这个时候就只能使用topic类型
实战代码
生产者
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
Map<String,String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit","被队列Q1Q2接收到");
bindingKeyMap.put("lazy.orange.elephant","被队列Q1Q2接收到");
bindingKeyMap.put("quick.orange.fox","被队列Q1接收到");
bindingKeyMap.put("lazy.brown.fox","被队列Q2接收到");
bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接收一次");
bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配Q2");
for (Map.Entry<String,String> bindingKeyEntry :bindingKeyMap.entrySet() ){
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
System.out.println("消息发送完成:"+message);
}
消费者1
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
System.out.println("C1消费者启动,把接受到的消息打印在控制台");
DeliverCallback deliverCallback= (consumerTag,delivery)->{
String receiveMessage =new String (delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("C1接受哪个队列"+queueName+"绑定键"+routingKey+",消息:"+receiveMessage);
};
channel.basicConsume(queueName,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+"消费者取消消费消息");
});
消费者2
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
//把队列和交换机进行绑定
channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
System.out.println("C2消费者启动,把接受到的消息存储在磁盘");
DeliverCallback deliverCallback=(consumerTag,delivery)->{
String receivedMessage = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
receivedMessage="C2接受哪个队列"+queueName+"绑定键"+routingKey+",消息:"+receivedMessage;
File file = new File("G:\\file\\log.txt");
FileUtils.writeStringToFile(file,receivedMessage,"UTF-8",true);
System.out.println(receivedMessage);
};
channel.basicConsume(queueName,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+"消费者取消消费消息");
});
死信队列(重要)
有些消息发送至队列由于某些原因无法被消费者消费,比如 消费者拒绝消费、消息过载、TTL超时,这时这些消息怎么处理呢,这时就需要一个死信队列,将这些信息发送到死信队列,
应用场景
:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的来源
- 消费者拒绝消费、
- 队列中的消息满了,消息过载、
-
TTL超时
TTL过期
生产者
String message = "info";
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("5000").build();
channel.basicPublish(NORMAL_EXCHANGE_NAME, "zhangsan", properties, message.getBytes());
System.out.println("消息发送完成:" + message);
消费者1
Channel channel = RabbitMqUtils.getChannel();
//声明并绑定死信队列与死信交互机
channel.queueDeclare(DEAD_QUEUE_NAME,false,false,false,null);
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueBind(DEAD_QUEUE_NAME,DEAD_EXCHANGE_NAME,"dead");
正常队列与死信交换机的绑定关系
Map<String,Object> deadLetterParams = new HashMap<>();
deadLetterParams.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
deadLetterParams.put("x-dead-letter-routing-Key","dead");
//正常队列和正常交换机绑定
channel.queueDeclare(NORAML_QUEUE_NAME,false,false,false,deadLetterParams);
channel.exchangeDeclare(NORAML_EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
channel.queueBind(NORAML_QUEUE_NAME,NORAML_EXCHANGE_NAME,"normal");
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println("消费者接收到的消息:"+receivedMessage);
};
channel.basicConsume(NORAML_QUEUE_NAME,true,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+"消费者取消消费消息");
});
消费者2
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2消费者启动等待消费消息....................");
DeliverCallback deliverCallback=(consumerTag,delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println("消费者接收到死信:"+receivedMessage);
};
channel.basicConsume(DEAD_QUEUE_NAME,true,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+"消费者取消消费消息");
});
队列达到最大长度
生产者
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
for (int i = 1; i < 11; i++) {
String message = "info"+i;
channel.basicPublish(NORMAL_EXCHANGE_NAME, "zhangsan", null, message.getBytes());
}
System.out.println("消息发送完成:");
消费者1
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1消费者启动等待消费消息....................");
//声明一个死信队列
channel.queueDeclare(DEAD_QUEUE_NAME,false,false,false,null);
//声明一个死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//死信队列与死信交换机的绑定关系
channel.queueBind(DEAD_QUEUE_NAME,DEAD_EXCHANGE_NAME,"lisi");
//正常队列与死信交换机的绑定关系
Map<String, Object> deadLetterParams=new HashMap<>();
deadLetterParams.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
deadLetterParams.put("x-dead-letter-routing-key","lisi");
//正常队列最大容纳6条消息
deadLetterParams.put("x-max-length",6);
//声明一个正常队列
channel.queueDeclare(NORAML_QUEUE_NAME,false,false,false,deadLetterParams);
//声明一个正常交换机
channel.exchangeDeclare(NORAML_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//正常队列与正常交换机的绑定关系
channel.queueBind(NORAML_QUEUE_NAME,NORAML_EXCHANGE_NAME,"zhangsan");
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println("消费者接收到的消息:"+receivedMessage);
};
channel.basicConsume(NORAML_QUEUE_NAME,true,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+"消费者取消消费消息");
});
消费者2
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2消费者启动等待消费消息....................");
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println("消费者接收到死信:"+receivedMessage);
};
channel.basicConsume(DEAD_QUEUE_NAME,true,deliverCallback,(consumerTag)->{
System.out.println(consumerTag+"消费者取消消费消息");
});
消息被拒
DeliverCallback deliverCallback=(consumerTag, delivery)->{
String receivedMessage = new String(delivery.getBody());
if(receivedMessage.equals("info5")){
System.out.println("c1接受到消息:"+receivedMessage+"并且拒绝签收了");
//不能重新入队 丢弃或者进入到死信队列
channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false);
}else {
System.out.println("消费者接收到的消息:"+receivedMessage);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
};
延迟队列
延迟队列 时为了存储子在指定时间消费的信息的对列 延迟队列内部时有序的
应用场景
1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
整合SpringBoot
1.创建一个空项目(spring initializr)
2.添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--RabbitMQ测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!--RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
3.修改配置文件
application.properties文件
spring.rabbitmq.host=8.129.162.52
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
4.添加Swagger配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket webApiConfig(){
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}
private ApiInfo webApiInfo(){
return new ApiInfoBuilder()
.title("rabbitmq接口文档")
.description("本文档描述了rabbitmq微服务接口定义")
.version("1.0")
.contact(new Contact("enjoy6288", "http://atguigu.com", "1551388580@qq.com"))
.build();
}
}
未完待续。。。。。。