RabbitMQ笔记

  • Post author:
  • Post category:其他


原文地址:

https://blog.csdn.net/u010827484/article/details/81222662

RabbitMQ 简介

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

RabbitMQ 安装

安装 RabbitMQ

root@ubuntu:~# uname -a
Linux ubuntu 4.15.0-24-generic #26~16.04.1-Ubuntu SMP Fri Jun 15 14:35:08 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux

安装erlang
   sudo apt-get install erlang-nox

安装RabbitMQ
   sudo apt-get install rabbitmq-server

查看 rabbitMq 运行状态命令:


sudo rabbitmqctl status

RabbitMQ架构分析

AMQP模型

AMQP模型

AMQP是一个异步消息传递所使用的应用层协议规范,AMQP客户端能够无视消息来源任意发送和接受消息,Broker提供消息的路由、队列等功能。Broker主要由Exchange和Queue组成:Exchange负责接收消息、转发消息到绑定的队列;Queue存储消息,提供持久化、队列等功能。AMQP客户端通过Channel与Broker通信,Channel是多路复用连接中的一条独立的双向数据流通道。

RabbitMQ进程模型

RabbitMQ进程模型

tcp_acceptor进程接收客户端连接,创建rabbit_reader、rabbit_writer、rabbit_channel进程。rabbit_reader接收客户端连接,解析AMQP帧;rabbit_writer向客户端返回数据;rabbit_channel解析AMQP方法,对消息进行路由,然后发给相应队列进程。rabbit_amqqueue_process是队列进程,在RabbitMQ启动(恢复durable类型队列)或创建队列时创建。rabbit_msg_store是负责消息持久化的进程。

在整个系统中,存在一个tcp_accepter进程,一个rabbit_msg_store进程,有多少个队列就有多少个rabbit_amqqueue_process进程,每个客户端连接对应一个rabbit_reader和rabbit_writer进程。

RabbitMQ流控

RabbitMQ可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应项恢复正常。除了这两个阈值,RabbitMQ在正常情况下还用流控(Flow Control)机制来确保稳定性。

Erlang进程之间并不共享内存(binaries类型除外),而是通过消息传递来通信,每个进程都有自己的进程邮箱。Erlang默认没有对进程邮箱大小设限制,所以当有大量消息持续发往某个进程时,会导致该进程邮箱过大,最终内存溢出并崩溃。

在RabbitMQ中,如果生产者持续高速发送,而消费者消费速度较低时,如果没有流控,很快就会使内部进程邮箱大小达到内存阈值,阻塞生产者(得益于block机制,并不会崩溃)。然后RabbitMQ会进行page操作,将内存中的数据持久化到磁盘中。

为了解决该问题,RabbitMQ使用了一种基于信用证的流控机制。消息处理进程有一个信用组{InitialCredit,MoreCreditAfter},默认值为{200, 50}。消息发送者进程A向接收者进程B发消息,每发一条消息,Credit数量减1,直到为0,A被block住;对于接收者B,每接收MoreCreditAfter条消息,会向A发送一条消息,给予A MoreCreditAfter个Credit,当A的Credit>0时,A可以继续向B发送消息。

amqqueue进程与Paging

消息的存储和队列功能是在amqqueue进程中实现。为了高效处理入队和出队的消息、避免不必要的磁盘IO,amqqueue进程为消息设计了4种状态和5个内部队列。

4种状态包括:alpha,消息的内容和索引都在内存中;beta,消息的内容在磁盘,索引在内存;gamma,消息的内容在磁盘,索引在磁盘和内存中都有;delta,消息的内容和索引都在磁盘。对于持久化消息,RabbitMQ先将消息的内容和索引保存在磁盘中,然后才处于上面的某种状态(即只可能处于alpha、gamma、delta三种状态之一)。

5个内部队列包括:q1、q2、delta、q3、q4。q1和q4队列中只有alpha状态的消息;q2和q3包含beta和gamma状态的消息;delta队列是消息按序存盘后的一种逻辑队列,只有delta状态的消息。所以delta队列并不在内存中,其他4个队列则是由erlang queue模块实现。

内部队列消息传递顺序

消息从q1入队,q4出队,在内部队列中传递的过程一般是经q1顺序到q4。实际执行并非必然如此:开始时所有队列都为空,消息直接进入q4(没有消息堆积时);内存紧张时将q4队尾部分消息转入q3,进而再由q3转入delta,此时新来的消息将存入q1(有消息堆积时)。

Paging就是在内存紧张时触发的,paging将大量alpha状态的消息转换为beta和gamma;如果内存依然紧张,继续将beta和gamma状态转换为delta状态。Paging是一个持续过程,涉及到大量消息的多种状态转换,所以Paging的开销较大,严重影响系统性能。

参数调优

RabbitMQ可优化的参数分为两个部分,Erlang部分和RabbitMQ自身。

IO_THREAD_POOL_SIZE:CPU大于或等于16核时,将Erlang异步线程池数目设为100左右,提高文件IO性能。

hipe_compile:开启Erlang HiPE编译选项(相当于Erlang的jit技术),能够提高性能20%-50%。在Erlang R17后HiPE已经相当稳定,RabbitMQ官方也建议开启此选项。

queue_index_embed_msgs_below:RabbitMQ 3.5版本引入了将小消息直接存入队列索引(queue_index)的优化,消息持久化直接在amqqueue进程中处理,不再通过msg_store进程。由于消息在5个内部队列中是有序的,所以不再需要额外的位置索引(msg_store_index)。该优化提高了系统性能10%左右。

vm_memory_high_watermark:用于配置内存阈值,建议小于0.5,因为Erlang GC在最坏情况下会消耗一倍的内存。

vm_memory_high_watermark_paging_ratio:用于配置paging阈值,该值为1时,直接触发内存满阈值,block生产者。

queue_index_max_journal_entries:journal文件是queue_index为避免过多磁盘寻址添加的一层缓冲(内存文件)。对于生产消费正常的情况,消息生产和消费的记录在journal文件中一致,则不用再保存;对于无消费者情况,该文件增加了一次多余的IO操作。

示例:


单播:

file : communicate_agent.py

import pika
import sys

class RbmInstance(object):
    def __init__(self, ServerIP, Exchange, Queue, ExType='direct'):
        self.ServerIP = ServerIP
        self.Exchange = Exchange
        self.Queue = Queue
        self.ExType = ExType

    def CreateConnect(self):
        self.Connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.ServerIP))
        self.Channel = self.Connection.channel()
        self.Channel.exchange_declare(exchange=self.Exchange, exchange_type=self.ExType)

    def Send(self, Message):
        self.Channel.basic_publish(exchange=self.Exchange, routing_key=self.Queue, body=Message)

    def Receive(self, CallBack):
        if self.Queue is not None:
            result=self.Channel.queue_declare(self.Queue,exclusive=True)
        else:
            result=self.Channel.queue_declare(exclusive=True)
        queue_name=result.method.queue
        self.Channel.queue_bind(exchange=self.Exchange,queue=queue_name)
        self.Channel.basic_consume(CallBack, queue=queue_name)
        self.Channel.start_consuming()

    def Notify(self, Message):
        self.Channel.basic_publish(exchange=self.Exchange, routing_key='', body=Message)

    def Subscribe(self, CallBack):
        self.Receive(CallBack)
file : Send.py

from communicate_agent import RbmInstance as RbmDict

Sender = RbmDict('localhost', 'ex1', 'key1')
Sender.CreateConnect()
Sender.Send('demo 1')
file : Receive.py

from communicate_agent import RbmInstance as RbmDict

def cb_func(channel, method,properties,body):
    print('%r' %body)

Sender = RbmDict('localhost', 'ex1', 'key1')
Sender.CreateConnect()
Sender.Subscribe(cb_func)

Receive 持续接收队列消息,Send发送消息后Receive马上响应

单播


组播:

file : Send.py

from communicate_agent import RbmInstance as RbmDict

def cb_func(channel, method,properties,body):
    print('%r' %body)


Sender = RbmDict('localhost', 'ex1', 'key1', 'fanout')
Sender.CreateConnect()
Sender.Subscribe(cb_func)
file : Receive.py
from communicate_agent import RbmInstance as RbmDict

def cb_func(channel, method,properties,body):
    print('%r' %body)


Sender = RbmDict('localhost', 'ex1', 'key1', 'fanout')
Sender.CreateConnect()
Sender.Subscribe(cb_func)
file : Receive2.py
from communicate_agent import RbmInstance as RbmDict

def cb_func(channel, method,properties,body):
    print('%r' %body)


Sender = RbmDict('localhost', 'ex1', 'key2', 'fanout')
Sender.CreateConnect()
Sender.Subscribe(cb_func)

Send 发送消息,Receive和Receive2同时持续接收消息

组播


监听消息:

1、打开 Trace 功能:


rabbitmqctl trace_on

2、编写 Trace 代码:

import pika
import sys

"""
turn firehose on with "rabbitmqctl trace_on"
"""

if __name__ == "__main__":

    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()

    queue_name = "firehose"
    result = channel.queue_declare(queue=queue_name, exclusive=False)

    channel.queue_bind(exchange='amq.rabbitmq.trace',
                       queue=queue_name,
                       routing_key="#")

    print ' [*] Waiting for logs. To exit press CTRL+C'

    def callback(ch, method, properties, body):
        print " [x] %r:%r:%r:%r" % (
            method.routing_key, 
            properties.headers["node"],
            properties.headers["routing_keys"], 
            body,
            )

    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)

    channel.start_consuming()

RabbitMQ Trace


RabbitMQ 在线仿真

参考:


http://www.cnblogs.com/ccorz/p/5710098.html


https://www.cnblogs.com/barrywxx/p/6706227.html



版权声明:本文为u010827484原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。