目录
rocketmq和kafka对比
RocketMQ |
Kafka |
|
设计定位 |
非日志的可靠消息传送 例如:订单,交易,消息推送等 |
系统间的数据流管道 例如:日志收集(主要),监控数据等 |
开发语言 |
Java | Scala |
社区活跃度 |
中 | 高 |
持久化 |
磁盘文件 | 磁盘文件 |
集群管理 |
name server | zookeeper |
选主方式 |
不支持自动切换 | 从 ISR 中自动选举 leader |
主从切换 |
不支持自动切换 |
自动切换。N个副本,允许N-1个失效 |
数据可靠性 |
很好 | 很好 |
消息写入性能 |
很好 | 非常高 |
性能的稳定性 |
队列较多时性能稳定 | 队列/分区多时性能不稳定,明显下降 |
消息堆积能力 |
非常好 | 非常好 |
消息投递实时性 |
毫秒级。支持pull、push两种方式 | 毫秒级。由consumer轮询间隔决定 |
顺序消费 |
支持顺序消费 顺序消费情况下,消息失败时队列会暂停 |
支持顺序消费 一台broker宕机后,会产生消息乱序 |
延时消息 |
开源版支持18个级别 | 不支持 |
事务消息 |
支持 | 不支持 |
Broker端消息过滤 |
支持。通过tag过滤,类似子topic | 不支持 |
消息查询 |
支持根据MessageId、MessageKey、时间查询 |
不支持 |
… | … | … |
基本角色
角色名称 |
角色描述 |
NameServer |
注册中心,用于管理Broker,类似于zookeeper集群的功能;
无状态,没有负载均衡的概念, |
Broker |
一个broker就是一个rocketmq进程。是rocketmq的核心,负责存储消息、转发消息等; 存储消息的元数据,包括主题、消费者组、消费进度偏移量等; Broker启动过后,会定期地向NameServer发送心跳,以证明在线; |
Producer |
消息的发送者,每30秒从NameServer中获取Broker集群的信息; Producer之间无状态,互相之间不感知; |
Consumer |
消息的消费者,每30秒从NameServer中获取Broker集群的信息; 然后Consumer会向Broker Master发送心跳确定它是否在线; Consumer直接无状态,互相之间不感知; |
整体架构
Broker 启动时会向 NameServer 注册 Broker 的信息,包括主题、消费之偏移量、队列、ip port等。每个 NameServer 都会保存整个Broker集群的信息。
Producer 和 Consumer 启动时会也向 NameServer 注册,且每隔30秒从 NameServer 获取 Broker 的消息。
每个 topic 对应一个 commitlog,里面存储真正的消息内容。
每个 topic 默认包含4个队列,每个队列对应一个持久化文件queuelog,它存储的是每条消息在commitlog中的位置等信息。
每个 topic 会创建一个 consumerOffset.json,用于保存每个consumer消费的偏移量。这个偏移量是consumer消费完消息之后,主动上报给broker的,避免消息重复推送。这个offset偏移量也对应了它在 consumerqueue 中的位置,消费消息的时候也会根据它来找到 consumerqueue 中的记录。
Producer 发送消息时,会根据消息的总量,取模上集群中的队列数量,轮训地去发送(非顺序消息的情况下)。
Consumer 内部有一个负载均衡的算法,大致机理是根据消费者数量取模上队列的数量,来选择对于的队列。
各个模块之间的通信基于 Netty,默认使用 epoll 方式。
核心参数
Broker参数
参数名称 |
参数描述 |
brokerClusterName | 集群名称 |
brokerName | broker名字,做集群时,slave的brokerName必须和master相同 |
brokerId | broker为0时,表示master节点,其他都是slave |
deleteWhen | 例如:04。表示消息保存的时间超过期限后,在下次的凌晨4点删除 |
fileReservedTime | 默认:48小时。表示消息保存48小时 |
brokeRole |
集群中master和slave的消息同步方式; 有三个值:SYNC_MASTER, ASYNC_MASTER, SLAVE |
flushDiskType |
刷盘策略,将消息保存到磁盘中进行持久化 有两种方式:ASYNC_FLUSH (异步), SYNC_FLUSH (同步) ASYNC_FLUSH:先将消息保存到缓冲区中,有可能丢失数据 SYNC_FLUSH:消息实时保存到磁盘,主从都保存完成才算成功,性能差 |
autoCreateTopicEnable | 允许自动创建topic |
enablePropertyFilter | 支持消息过滤 |
storePathRootDir | 消息持久化文件存储的根路径 |
Producer参数
参数名称 |
默认值 |
参数描述 |
group | 无 | 发送者组 |
sendMessageTimeout | 3000 | 发送消息的超时时间 |
retryTimesWhenSendFailed | 2 | 发送失败的重试次数 |
maxMessageSize | 4MB | 消息的最大长度 |
Consumer参数
参数名称 |
默认值 |
参数描述 |
topic | 无 | 消息主题 |
consumerGroup | 无 | 消费者组。一条消息只能被同一个消费者组中的一个消费者消费。 |
selectorType | TAG | 消息过滤的方式:TAG、SQL92 |
selectorExpression | * | 具体的消息过滤逻辑 |
consumeMode | CONCURRENTLY | 消费模式:CONCURRENTLY、ORDERLY |
messageModel | CLUSTERING | 消息模式:CLUSTERING(集群)、BROADCASTING(广播) |
consumeThreadMax | 64 | 消费消息的最大线程数 |
maxReconsumeTimes | -1 | 消息重新消费的最大次数。-1相当于16 |
消息存储文件 commitLog
rocketmq 的消息存储在 commitLog 中,broker 默认会给 commitLog 申请
1G的磁盘空间
。这是为了保证
存储空间是有序的
。
如果 commitLog 已经满了,会继续创建第二个 commitLog,且它的文件名最后几位是上一个 commitLog 的最大偏移量 masOffset。查找的时候,如果第一个文件中没找到,那么会计算它的最大偏移量,获取下一个要查找的 commitLog 的名称。
每次申请1G的空间,虽然
浪费了点磁盘空间,但是换取了快速的查找
,非常值得!
为什么要申请连续存储空间呢?因为 rocketmq 中使用了
零拷贝技术
来读取文件,它可以极大地提提升读取文件的性能。而要用零拷贝技术,必须要连续的内存空间。
索引文件 IndexFile
索引文件是用来支持快速地查找消息的。
rocketmq 支持两种查询方式:根据key;根据time;
文件存储位置:store/index${fileName},fileName是以创建时的时间戳命名的。
单个 IndexFile 大小:40 + 500W*4 + 2000W *20 = 420000040 字节,约400MB。
一个 IndexFile 可以保存 2000W 个索引。
索引文件的结构由三部分组成 (磁盘空间是连续的):
索引文件头
(IndexHeader 40字节) +
槽位
(Slot 每个4字节) +
消息的索引内容
(条目,每个20字节)
IndexFile 的底层实现为 hash索引, 它是在文件系统中实现类似 HashMap 的结构。
索引文件头
:
固定40个字节
槽位
槽位可以看作是一个长度500W的数组。
根据 Message Key 计算槽位的位置:
- 计算 Message Key 的 hash 值:keyHash = hashcode(key)
- 得到槽位值 Slot:slotPos = keyHash % 槽位数(500W)
- 计算 Slot 的实际磁盘位置:IndexHeaderSize(40 Byte) + slotPos * hashSlotSize(4 Byte)
槽位Slot中存储的是最新条目的数值。例如此时一个key算出来的槽位是4,它是第100个条目,那么槽位中存的就是100。
既然取hash,那必然可能出现hash冲突,hash冲突时,条目使用链表来连接。如果此时第二个 Message Key 算出来的槽位也是4,假设它是第200个条目,那么此时槽位中的内容就更新为200。
这种数组加链表的方式,跟HashMap很相似。
消息的索引内容 (条目)
每个条目占用20个字节,它包含4个部分:
(key的hash值) + (commitLog偏移量) + (时间戳) + (上一个相同keyHash的条目的数值)
- KeyHash 是根据 Message Key 算出来的 hash 值;
- CommitLogOffset 保存的就是此 Message Key 的消息在 commitLog 中的位置;
- timestamp 用于按照时间来查询;
-
prevIndex 是指当两个不同的 Message Key 计算出来的槽位相同
(
hash碰撞
) 时,新的条目中存储上一个条目的数值。比如当第100条目和第200条目产生hash碰撞,则第200条目的prevIndex为100,第100条目的prevIndex为0,查询的时候先查新数据,如果不是我们想要的数据,一直查询到prevIndex为0为止。
slot相当于一个数组,而相同 slot 的条目,相当于是一个链表。最终通过
数组+链表
的形式,保存消息的索引,跟 HashMap 的实现原理相似。
由于每个 slot 和 条目是
定长
的,所以只要得到数值,就可以很方便地计算它的实际物理地址。
通过 Message Key 查询时,计算 slot 值,取出此slot对应的条目在commitLog上的消息即可 (可能有多个,这个时候只要
在客户端再根据 Message Key 匹配
一下就行了) 。
队列 consumerqueue
rocketmq 默认会给每个 topic 创建4个队列,每个队列有一个 queuelog。
queuelog 可以看成是一个数组,当中每个元素可以理解为数据库的一条记录,它包含三个字段:commitLogOffset, msgSize, tagsCode。
- commitLogOffset:消息在 commitlog 中的偏移量(开始的位置)
- msgSize:消息的大小,和 commitLogOffset 配合就可以获取整条消息的内容
- tagsCode:tag 的 hashcode
consumerqueue 可以看做是 commitlog 的索引文件。查找的时候就像数组那样根据索引下标查找元素,然后根据 commitLogOffset 和 msgSize,从 commitlog 中获取消息内容。
consumerqueue 中存储的内容是定长的,每条内容20个字节。commitLogOffset 8字节,msgSize 4字节,tagsCode 8字节。
Consumer 消费消息的时候,先从 consumerOffset.json 中获取待消费的消息 offset,然后从 consumequeue 中得到 commitLogOffset 和 msgSize,再去 commitlog 中读取消息内容。
通过 tag 过滤也是通过 consumerqueue 来实现的。消费的时候,先对 tag 做 hash运算,看跟 consumerqueue 中存储的 tagsCode 是否匹配,来进行过滤。
零拷贝刷盘
以文件下载为例,服务端要做的事情是:将磁盘中的文件不作修改地从已连接的 socket 发出去。
操作系统底层 I/O 过程如下图:
从图中可以看到,整个流程包含了
4次拷贝,以及2次用户空间和内核空间的切换
,比较耗费性能和CPU资源。零拷贝就是为了解决这种低效性。
零拷贝的 IO 流程:
用户空间映射页缓存,通知页缓存直接将数据共享到 Socket 缓冲区,这样就
避免了2次用户空间和内核空间的切换
,并且少了两次拷贝。这样可以很大程度地提升磁盘 IO 的性能,可以大致接近内存的速度。
要用到零拷贝技术,内存空间就必须是连续的,所以 rocketmq 在创建 commitlog 时直接申请 1G 的连续磁盘空间。
consumer消费消息方式
RocketMQ 是基于发布订阅模型的消息中间件。所谓的发布订阅就是说,consumer 订阅了 broker 上的某个 topic,当 producer 发布消息到 broker 上的该 topic 时,consumer 就能收到该条消息。
Pull (MQPullConsumer)
consumer 主动去 broker 中拉取消息,取消息的逻辑需要用户自己来写,而且需要管理 offset,比较复杂,使用 rocketmq 时一般不用这种方式。
Push (MQPushConsumer)
此模式下 broker 收到消息后会主动推送给 consumer。
对于PushConsumer,由用户注册 MessageListener 来消费消息。
RocketMQ 的 push 方式,在
底层是通过pull实现的
,并没有实现真正的 push。
consumer 开启向 broker 开启
长轮询
来批量拉取消息。它会不停地问 broker 有没有消息,有的话就拉过来,看起来就像是 broker 有了消息后 push 给 consumer 的。
消息过滤
TAG方式:Consumer 消费消息的时候,先从 consumerOffset.json 中获取待消费的消息在 consumequeue 中的 offset,根据 consumequeue 中保存的 tagsCode(tag对应的hashCode)进行过滤。然后根据 consumequeue 中的 commitLogOffset 和 msgSize,从 commitlog 中获取真正的消息内容,在客户端根据 tag 的真实内容再过滤一次,防止 hash 冲突导致它不是真正要消费的消息。
还有一种是 SQL92 方式。
rocketmq 还支持传入自定义的 java 类,把它的文件流传到服务器上,服务器上把文件流解析出来构造成一个类,这个类会被内部的引擎去执行。这样的话,我们按照需求,自己来写方法过滤消息。
顺序消息
顺序消息需要 Producer 和 Consumer 都保证顺序。Producer 需要保证消息被路由到正确的队列,Consumer 需要保证每个队列的数据只有一个线程消息。
rocketmq 默认给一个 topic 创建4个队列,要保证消息有序,只要将消息发送到同一个队列中去。所以取一个公共字段 (比如ID),对队列数量取模,然后发送到同一个队列中。
延时消息
常用场景:订单超时时取消订单。当下了一个订单后,就发送一个延时消息,比如30分钟。30分钟后处理次消息,里面去查看订单是否付款了,如果没付款就自动取消订单。
rocketmq 相比于其它消息中间件,其中一个优势就是支持延时消息。
开源版一共有18个延迟时间间隔:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。
要发送延迟消息,只要在发送消息时,设置消息延迟级别 (delayTimeLevel) 即可。
- 设置消息延迟级别等于0时,则该消息为非延迟消息。
- 设置消息延迟级别大于等于1且小于等于18时,消息延迟特定时间,如:设置消息延迟级别等于1,则延迟1s;设置消息延迟级别等于3,则延迟10s,以此类推。
- 设置消息延迟级别大于18时,则该消息延迟级别为18
延迟消息有额外的主题:SCHEDULE_TOPIC_XXXX
内部遍历所有延迟级别,为每个延迟级别创建定时任务,如果发现延迟的时间到了,就发送消息到真实的主题下。
又创建了一个定时任务,用于持久化每个队列消费的偏移量。持久化的频率由 flushDelayOffsetInterval 属性进行配置,默认为10秒。