Rabbitmp

  • Post author:
  • Post category:其他

1.ampq消息协议你是怎么理解的?
2.rabbitmq常见的消息模式有哪些?
3.rabbitmq在项目中你是怎么用的?
4.rabbitmq如何保证消息在投递的过程中不被丢失
5.如何解决消息重复消费的问题(幂等性的问题)
6.如何解决消息堆积的问题?(自己在网上进一步补充完整)
7.什么是死信队列,造成死信队列的原因是什么? 如何处理死信消息?
8.如何实现消息限流?

1.ampq消息协议你是怎么理解的?

AMQP(高级消息队列协议)是一个进程间传递异步消息的网络协议。
AMQP规范中的核心的概念:

  • Message
  • List item
  • Virtual Host
  • Exchange
  • Queue
  • Binding
  • Routing Key
  • Binding Key
  • Exchange Type
  • Publisher/Producer
  • Subscriber/Consumer
  • Connection
  • Channel
    工作流程:
    publisher(发布者)——>Exchange(交换机)——>Queue(队列)——>Consumer(消费者)
    (——————–AMQP实体——————)
    工作过程:
    发布者(Publisher)发布消息(Message),经由交换机(Exchange)。
    交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。
    最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

2.rabbitmq常见的消息模式有哪些?

1.简单模式:
2.work模式
3.订阅模式
4.路由模式
5.通配符模式
在这里插入图片描述

3.rabbitmq在项目中你是怎么用的?

消息的发送方:
1.创建项目并导入pom文件
2.yml文件的配置

xc-service-manage-media:
  upload-location: D:/code/xue-cheng/video/
  mq:
    routingkey-media-video: routingkey_media_video

3.新建一个配置类,在消息发送方配置交换机和声明路由routingkey
4.编写发送消息的方法,并发送消息
消息的消费方
1.yml文件的配置

xc-service-manage-media:
  mq:
    queue-media-video-processor: queue_media_video_processor
    routingkey-media-video: routingkey_media_video
  video-location: D:/code/xue-cheng/video/
  ffmpeg-path: D:/code/xue-cheng/Tools/ffmpeg/ffmpeg-20180227-fa0c9d6-win64-static/bin/ffmpeg.exe

2.新建一个配置类,绑定队列到交换机
3.监听信息并处理

4.rabbitmq如何保证消息在投递的过程中不被丢失

把自动改为手动

5.如何解决消息重复消费的问题(幂等性的问题)

可以通过让消费消息的操作具备幂等性来解决。幂等指的是多次执行与一次执行产生的影响相同。
具体:
利用数据库的唯一约束实现幂等。因为唯一约束可以保证不重复插入数据,或者通过 Redis 的 setnx 命令来实现数据库的唯一约束,因 为 setnx 指的是 insert if not exist,也能防止插入重复数据。

6.如何解决消息堆积的问题?(自己在网上进一步补充完整)

用Shovel

  • 概念:当某个队列的消费堆积严重时,比如超过某个设定的阀值,就可以通过Shovel将队列消息移交到别的一个集群。
  • 当检测到一个队列出现严重的消息堆积的时候,比如可以通过/api/queue/vhost/name接口获取到队列的消息个数超过2000w或者消息的占用
  • 大小超过10g的时候,就启用shovel1将队列queue中的消息转发到备份集群的队列queue2。
  • 当检测到队列queue的消息个数低于100w的时候或者大小小于1GB就停止shovel1,然后等queue慢慢处理剩余的堆积。
  • 当queue的消息个数低于10W或者大小小于100MB时,就开启shovel2将队列queue2中暂存的消息返还给对queue。
  • 当检测到队列queue消息个数大于100W或者大小高于1GB就将shovel2停掉。
    网上一般是:(这种只能是远水止不了近渴)
  • 修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停止
  • 临时建立好原先10倍或者20倍的queue数量
  • 然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立
  • 好的10倍数量的queue
  • 接着临时征用10倍机器来部署consumer,每一批consumer消费一个临时queue的数据
  • 这种做法相当于临时将queue资源和consumer资源扩大了10倍,以正常的10倍速度

7.什么是死信队列,造成死信队列的原因是什么? 如何处理死信消息?

字面上理解为无法被消费的消息,但一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;
对rabbitmq来说,产生死信的来源大致有如下几种:

  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
  • 消息TTL过期 队列达到最大长度(队列
  • 满了,无法再添加数据到mq中)
    – 死信的处理方式
    丢弃,如果不是很重要可以选择丢弃
    记录死信入库,然后做后续的业务分析或者处理

8.如何实现消息限流?

我们只需要配置一下 rabbitmq.listener.simple 下的 prefetch 属性即可,
语义即为:如果队列中有两条以上未签收的消息,则不进行新的消息消费。
发送完显示我们系统中有三条 Ready 消息,这代表这三条消息还在队列中没有消费端去消费,
这时我打开消费端进行消费但依旧不进行签收。
unacked=2,ready=1,这就代表有两条消息在服务端消费了但是没有签收,还有一条消息还在队列中没有往服务端推送,
因为我们设置了 prefetch=2,所以现在队列的最大同时在消费的消息数量为 2,通过此种方式,我们就完成了消费限流。


版权声明:本文为ng_elza原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。