原文地址:
https://blog.csdn.net/u010827484/article/details/81222662
RabbitMQ 简介
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
RabbitMQ 安装
安装 RabbitMQ
root@ubuntu:~# uname -a
Linux ubuntu 4.15.0-24-generic #26~16.04.1-Ubuntu SMP Fri Jun 15 14:35:08 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
安装erlang
sudo apt-get install erlang-nox
安装RabbitMQ
sudo apt-get install rabbitmq-server
查看 rabbitMq 运行状态命令:
sudo rabbitmqctl status
RabbitMQ架构分析
AMQP模型
AMQP是一个异步消息传递所使用的应用层协议规范,AMQP客户端能够无视消息来源任意发送和接受消息,Broker提供消息的路由、队列等功能。Broker主要由Exchange和Queue组成:Exchange负责接收消息、转发消息到绑定的队列;Queue存储消息,提供持久化、队列等功能。AMQP客户端通过Channel与Broker通信,Channel是多路复用连接中的一条独立的双向数据流通道。
RabbitMQ进程模型
tcp_acceptor进程接收客户端连接,创建rabbit_reader、rabbit_writer、rabbit_channel进程。rabbit_reader接收客户端连接,解析AMQP帧;rabbit_writer向客户端返回数据;rabbit_channel解析AMQP方法,对消息进行路由,然后发给相应队列进程。rabbit_amqqueue_process是队列进程,在RabbitMQ启动(恢复durable类型队列)或创建队列时创建。rabbit_msg_store是负责消息持久化的进程。
在整个系统中,存在一个tcp_accepter进程,一个rabbit_msg_store进程,有多少个队列就有多少个rabbit_amqqueue_process进程,每个客户端连接对应一个rabbit_reader和rabbit_writer进程。
RabbitMQ流控
RabbitMQ可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应项恢复正常。除了这两个阈值,RabbitMQ在正常情况下还用流控(Flow Control)机制来确保稳定性。
Erlang进程之间并不共享内存(binaries类型除外),而是通过消息传递来通信,每个进程都有自己的进程邮箱。Erlang默认没有对进程邮箱大小设限制,所以当有大量消息持续发往某个进程时,会导致该进程邮箱过大,最终内存溢出并崩溃。
在RabbitMQ中,如果生产者持续高速发送,而消费者消费速度较低时,如果没有流控,很快就会使内部进程邮箱大小达到内存阈值,阻塞生产者(得益于block机制,并不会崩溃)。然后RabbitMQ会进行page操作,将内存中的数据持久化到磁盘中。
为了解决该问题,RabbitMQ使用了一种基于信用证的流控机制。消息处理进程有一个信用组{InitialCredit,MoreCreditAfter},默认值为{200, 50}。消息发送者进程A向接收者进程B发消息,每发一条消息,Credit数量减1,直到为0,A被block住;对于接收者B,每接收MoreCreditAfter条消息,会向A发送一条消息,给予A MoreCreditAfter个Credit,当A的Credit>0时,A可以继续向B发送消息。
amqqueue进程与Paging
消息的存储和队列功能是在amqqueue进程中实现。为了高效处理入队和出队的消息、避免不必要的磁盘IO,amqqueue进程为消息设计了4种状态和5个内部队列。
4种状态包括:alpha,消息的内容和索引都在内存中;beta,消息的内容在磁盘,索引在内存;gamma,消息的内容在磁盘,索引在磁盘和内存中都有;delta,消息的内容和索引都在磁盘。对于持久化消息,RabbitMQ先将消息的内容和索引保存在磁盘中,然后才处于上面的某种状态(即只可能处于alpha、gamma、delta三种状态之一)。
5个内部队列包括:q1、q2、delta、q3、q4。q1和q4队列中只有alpha状态的消息;q2和q3包含beta和gamma状态的消息;delta队列是消息按序存盘后的一种逻辑队列,只有delta状态的消息。所以delta队列并不在内存中,其他4个队列则是由erlang queue模块实现。
消息从q1入队,q4出队,在内部队列中传递的过程一般是经q1顺序到q4。实际执行并非必然如此:开始时所有队列都为空,消息直接进入q4(没有消息堆积时);内存紧张时将q4队尾部分消息转入q3,进而再由q3转入delta,此时新来的消息将存入q1(有消息堆积时)。
Paging就是在内存紧张时触发的,paging将大量alpha状态的消息转换为beta和gamma;如果内存依然紧张,继续将beta和gamma状态转换为delta状态。Paging是一个持续过程,涉及到大量消息的多种状态转换,所以Paging的开销较大,严重影响系统性能。
参数调优
RabbitMQ可优化的参数分为两个部分,Erlang部分和RabbitMQ自身。
IO_THREAD_POOL_SIZE:CPU大于或等于16核时,将Erlang异步线程池数目设为100左右,提高文件IO性能。
hipe_compile:开启Erlang HiPE编译选项(相当于Erlang的jit技术),能够提高性能20%-50%。在Erlang R17后HiPE已经相当稳定,RabbitMQ官方也建议开启此选项。
queue_index_embed_msgs_below:RabbitMQ 3.5版本引入了将小消息直接存入队列索引(queue_index)的优化,消息持久化直接在amqqueue进程中处理,不再通过msg_store进程。由于消息在5个内部队列中是有序的,所以不再需要额外的位置索引(msg_store_index)。该优化提高了系统性能10%左右。
vm_memory_high_watermark:用于配置内存阈值,建议小于0.5,因为Erlang GC在最坏情况下会消耗一倍的内存。
vm_memory_high_watermark_paging_ratio:用于配置paging阈值,该值为1时,直接触发内存满阈值,block生产者。
queue_index_max_journal_entries:journal文件是queue_index为避免过多磁盘寻址添加的一层缓冲(内存文件)。对于生产消费正常的情况,消息生产和消费的记录在journal文件中一致,则不用再保存;对于无消费者情况,该文件增加了一次多余的IO操作。
示例:
单播:
file : communicate_agent.py
import pika
import sys
class RbmInstance(object):
def __init__(self, ServerIP, Exchange, Queue, ExType='direct'):
self.ServerIP = ServerIP
self.Exchange = Exchange
self.Queue = Queue
self.ExType = ExType
def CreateConnect(self):
self.Connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.ServerIP))
self.Channel = self.Connection.channel()
self.Channel.exchange_declare(exchange=self.Exchange, exchange_type=self.ExType)
def Send(self, Message):
self.Channel.basic_publish(exchange=self.Exchange, routing_key=self.Queue, body=Message)
def Receive(self, CallBack):
if self.Queue is not None:
result=self.Channel.queue_declare(self.Queue,exclusive=True)
else:
result=self.Channel.queue_declare(exclusive=True)
queue_name=result.method.queue
self.Channel.queue_bind(exchange=self.Exchange,queue=queue_name)
self.Channel.basic_consume(CallBack, queue=queue_name)
self.Channel.start_consuming()
def Notify(self, Message):
self.Channel.basic_publish(exchange=self.Exchange, routing_key='', body=Message)
def Subscribe(self, CallBack):
self.Receive(CallBack)
file : Send.py
from communicate_agent import RbmInstance as RbmDict
Sender = RbmDict('localhost', 'ex1', 'key1')
Sender.CreateConnect()
Sender.Send('demo 1')
file : Receive.py
from communicate_agent import RbmInstance as RbmDict
def cb_func(channel, method,properties,body):
print('%r' %body)
Sender = RbmDict('localhost', 'ex1', 'key1')
Sender.CreateConnect()
Sender.Subscribe(cb_func)
Receive 持续接收队列消息,Send发送消息后Receive马上响应
组播:
file : Send.py
from communicate_agent import RbmInstance as RbmDict
def cb_func(channel, method,properties,body):
print('%r' %body)
Sender = RbmDict('localhost', 'ex1', 'key1', 'fanout')
Sender.CreateConnect()
Sender.Subscribe(cb_func)
file : Receive.py
from communicate_agent import RbmInstance as RbmDict
def cb_func(channel, method,properties,body):
print('%r' %body)
Sender = RbmDict('localhost', 'ex1', 'key1', 'fanout')
Sender.CreateConnect()
Sender.Subscribe(cb_func)
file : Receive2.py
from communicate_agent import RbmInstance as RbmDict
def cb_func(channel, method,properties,body):
print('%r' %body)
Sender = RbmDict('localhost', 'ex1', 'key2', 'fanout')
Sender.CreateConnect()
Sender.Subscribe(cb_func)
Send 发送消息,Receive和Receive2同时持续接收消息
监听消息:
1、打开 Trace 功能:
rabbitmqctl trace_on
2、编写 Trace 代码:
import pika
import sys
"""
turn firehose on with "rabbitmqctl trace_on"
"""
if __name__ == "__main__":
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
queue_name = "firehose"
result = channel.queue_declare(queue=queue_name, exclusive=False)
channel.queue_bind(exchange='amq.rabbitmq.trace',
queue=queue_name,
routing_key="#")
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r:%r:%r" % (
method.routing_key,
properties.headers["node"],
properties.headers["routing_keys"],
body,
)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
参考: