文章目录
一、概述
RocketMQ 的核心模块主要包括如下几个部分
-
acl
:Access Control List,权限控制链表,RocketMQ 的权限控制模块; -
broker
:消息队列的服务器实体,接收客户端请求,处理消息分发,存储消息数据等; -
client
:RocketMQ 的客户端包括消息的生产者和消费者; -
common
:公共包; -
dev、distribution
:一些开发脚本和部署脚本和配置,不多赘述; -
axample
:示例代码 -
filter
:过滤器模块,包含 RocketMQ 的过滤消息的实现; -
logappender
:日志追加实现的相关模块; -
logging
:日志实现模块; -
namesrv
:命名服务器的实现模块,类似注册中心,注册订阅的 topic; -
openmessaging
:消息开放的标准 -
remoting
:远程通信模块,基于 Netty -
srvutil
:服务器工具类; -
store
:消息存储的具体实现; -
test
:测试相关; -
tools
:工具类相关,包括监控等;
二、路由中心 NameServer
RocketMQ 的逻辑部署图如上所示,NameServer 就类似一个注册中心,Broker 就是具体管理消息的服务器实体,在启动的时候向 NameServer 注册,消息生产者在发送消息之前从 NameServer 获取 Broker 服务器地址列表,然后根据负载均衡选择一台具体的 Broker 发送消息;另外 NameServer 与每台 Broker 服务器保持长连接,并每 30 s,发送一个心跳包检测 Broker 是否存活,,如果宕机则删除对应服务器。
启动流程
先从 NameServer 的启动作为入口,了解 NameServer 的实现:
- 首先,需要加载对应的配置,包括 NameServer 对应的配置(对于 RocketMQ 的一些业务配置)和 Netty 服务器的相关配置(一些网络参数的配置),另外,如果有命令行输入的配置也会进行加载;
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
if (commandLine.hasOption('c'))
// ...
if (commandLine.hasOption('p'))
// ...
- 然后根据之前的配置,创建对应的 NameServer 控制器,并进行初始化,初始化的工作,包括对一些组件的初始化以及创建两个定时任务,分别对 Broker 进行不活跃的剔除,以及KV配置的打印,都是十秒触发一次;
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
boolean initResult = controller.initialize();
public boolean initialize() {
// 加载 KV 配置
this.kvConfigManager.load();
// 创建 Netty 网络传输的服务器
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 创建网络传输线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册处理器
this.registerProcessor();
/**
* 创建了两个定时任务
* (1)十秒扫描一次 Broker ,移除下线的 Broker
* (2)十秒打印一次 KV 的配置
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// 进行一些 TLS 安全配置
}
return true;
}
- 之后会注册一个 JVM 的钩子函数,在 JVM 关闭之前即时关闭控制器,然后启动控制器;这也是一种优雅关闭线程池的方式,给 JVM 注册钩子函数,在 JVM 进程关闭之前,关闭线程池释放资源。
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
controller.start();
路由注册和故障剔除
NameServer 主要作用是为消息的生产者和消费者提供关于主题 Topic 的路由信息,以及管理 Broker 节点。
首先我们需要了解 NameServer 里面存了什么,在 RouteInfoManager 中存取了主要的路由信息。
NameServer 利用HashMap保存了路由表的相关信息,并用一个读写锁保证线程安全,类比 Nacos 使用 ConcurrentHashMap 保存注册的服务信息。
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
- topicQueueTable:某个 Topic 所对应的所有队列数据,消息发送时,根据该路由表和负载均衡策略找到对应的队列;
- brokerAddrTable:保存 Broker 的基本信息;
- clusterAddrTable:存储 Broker 的集群信息;
- brokerLiveTable:Broker 状态信息的记录,每次新的心跳检测会更新该表;
- filterServerTable:过滤服务器的信息,用于消息过滤;
注册
路由注册的具体实现通过 Broker 与 NameServer 的心跳功能实现,Broker 启动的时候向 NameServer 发送心跳包,NameServer 就会更新 brokerLiveTable 中 Broker 的 lastUpdateTimestamp 字段,然后每十秒扫描一次所有数据,如果超过 120s 没有收到心跳包则删除该 Broker 并关闭与其的长连接。
具体的实现逻辑就是 Broker 启动的时候,会启动很多定时任务的线程池,有一个就是发送心跳包的作用,发送到 NameServer 中之后如果NameServer 包含该Broker 就更新心跳包时间,如果没有则保存相关的信息到缓存中。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
删除
以上已经提到,在 NameServer 中有一个定时任务就是扫描 Broker 上一次发送心跳包的时间,如果大于 120s 没有发送则剔除,另一种情况就是Broker 自己下线会调用 unregisterBroker 方法。
而删除的方法也很简单,就是以上提到的几个 HashMap 缓存中剔除对应内容。
路由发现
RocketMQ 的路由发现不是实时的,路由的改变 NameServer 不会主动通知客户端,而需要客户端自己定时进行拉取。
拉取会得到一个如下实体,包括顺序消息的配置、该 Topic 对应的队列信息、该 Topic 对应 Broker 信息,以及对应的消息过滤服务器,得到这些信息之后,会对本地缓存进行更新。
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
. . .
}
三、消息发送
RocketMQ 发送普通消息有三种方式:
-
可靠同步发送(sync)
:发送者执行发送消息 API,同步等待消息服务器返回结果; -
可靠异步发送(async)
:发送者发送消息后返回,注册返回结果的回调函数; -
不可靠单向发送(oneway)
:只管发送消息,不管返回结果;
了解如何发送消息之前,可以了解一下消息是什么样的:
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
// 对应的 Topic
private String topic;
// 消息的标志
private int flag;
/**
* 扩展的配置属性,主要包括:
* (1)tag:消息标签,用于过滤;
* (2)keys:消息的索引键,帮助快速查找
* (3)waitStoreMsgOK:发送消息后是否等待消息存储完成后再返回
* (4)delayTimeLevel:消息延迟的级别,用于定时消息或者消息重试
*/
private Map<String, String> properties;
// 消息体
private byte[] body;
// 事务的 ID
private String transactionId;
// 省略所有设置属性的方法
}
消息发送流程
消息发送的步骤主要包括:验证消息、查找路由、消息发送;
1. 验证消息
对于消息的发送,首先会来到 DefaultProducer 的 send 方法,进行消息的验证以及Topic 的设置。
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
Validators.checkMessage(msg, this);
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
对消息的校验主要包括Topic 是否为空,消息体是否为空,消息长度不为0,并且不超过默认的最大长度maxMessageSize,默认为 4M。
2. 查找Topic路由
消息校验合法之后,需要获取主题的路由信息,才能找到具体的 Broker 服务器。
主要通过该方法获取:
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
该方法会先在本地缓存中获取路由信息,如果获取不到,则去 NameServer 服务器查询,还是没有的话通过默认的 createTopicKey 获取,如果还无法获取,根据 autoCreateTopicEnable 的配置项,如果是 true,则自动创建并返回,如果为 false,则找不到 Topic 会抛出异常。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 先从缓存中获取
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 获取不到就去 NameServer 服务器获取
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 获取到就返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 获取不到就再通过默认 createTopicKey 获取,获取不到如果允许字段创建,则会创建 topic,否则抛出异常
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
3. 选择队列
得到对应Topic的路由之后,就可以对具体的队列进行选择。
主要通过该方法实现:
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
由于消息的发送采用重试机制,所以会根据 sendLatencyFaultEnable 的值分为两种情况:
sendLatencyFaultEnable=false:默认机制
默认情况不是失败的重试,就通过轮询来选择队列,失败重试就选择轮询到不是上一条失败的队列,但是这种方法的问题在于,如果 Broker 宕机导致的发送失败,下一条队列大概率还是和之前的队列处于一个 Broker,容易造成再次的发送失败。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
/**
* 如果不是失败重试,则直接选择队列(就是采用简单的轮询)
*
* int index = this.sendWhichQueue.incrementAndGet();
* int pos = Math.abs(index) % this.messageQueueList.size();
*/
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
// 如果是失败的重试,就需要规避选到上次的队列
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
sendLatencyFaultEnable=true:故障延迟机制
还是通过轮询寻找队列,选择队列之前会检查该队列是否可用。
之后关于故障延迟的维护目前还没看懂,之后再补充。
// 还是通过轮询获取队列
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 判断该 MQ 的 Broker 是否可用,可用则直接返回
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
4. 消息发送
选择了具体的消息队列,就可以发送消息了, 发送消息主要在 sendKernelImpl 函数中实现。
该函数包括了6个参数,并且发送的流程主要包括了五大步:
- 获取 Broker 的网络地址
- 为消息做一些标识
- 注册需要的钩子函数,类似 AOP
- 封装发送消息的请求数据包
- 根据发送的模式发送消息
private SendResult sendKernelImpl(final Message msg, // 具体要发送的消息
final MessageQueue mq, // 要发送到的消息队列
final CommunicationMode communicationMode, // 发送的类型,同步异步还是单向
final SendCallback sendCallback, // 发送完的回调函数
final TopicPublishInfo topicPublishInfo, // Topic 对应信息
final long timeout) // 发送超时时间 throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
/**
* 第一步:
* 获取对应Broker的具体网络地址,从缓存中找没有则去 NameServer 中更新,还没有则抛出异常
*/
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
/**
* 第二步:
* 为消息分配唯一 ID 以及标注一些标记(压缩,事务等)
*/
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
/**
* 第三步:
* 注册一些检验和发送消息的钩子函数
*/
if (hasCheckForbiddenHook()) {
// 省略具体步骤
}
if (this.hasSendMessageHook()) {
// 省略具体步骤
}
/**
* 第四步:
* 构建请求包
* 主要包括:生产者组,Topic名称,默认Topic key,Topic 默认队列数,
* 队列 ID。消息系统标记,消息发送时间,flag,消息扩展属性,重试次数,以及是否为重复消息
*/
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
// 省略省略封装请求首部
/**
* 第五步:
* 按照对应的发送方式发送消息
*/
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
/**
* 异步发送注册回调函数
* 会进行并发控制,默认为 65535
* 另外,重试的入口在收到服务端的响应时发生,所以网络异常造成的丢包则无法进行重试
*/
case ONEWAY:
/**
* 单向发送不做处理
*/
case SYNC:
/**
* 同步方式同步等待 sendResult 返回
* 超过失败重试次数会 加入到延迟队列中
*/
default:
assert false;
break;
}
// 执行后置钩子函数
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
}
// 省略异常处理
}
}
四、消息存储
大部分 MQ 需要通过持久存储来增加系统的高可用性。
存储文件概述
RocketMQ 主要包括如下三种持久化文件:
-
CommitLog
:消息存储文件,所有消息 Topic 对应的消息都存储在 CommitLog 文件中; -
ConsumeQueue
:消息消费队列,消息达到 CommitLog 文件后,将异步转发到消息消费队列,供消息消费者消费,为了加快检索速度和节省磁盘空间,ConsumerQueue条目不会存储消息的全量信息,只把消息存储在 CommitLog 中。 -
IndexFile
:消息索引文件,主要存储消息的键和偏移量的对应关系,用于提高根据主题与消息队列检索消息的速度;
消息存储的具体服务实现类是 DefaultMessageStore ,包含了大部分对存储文件相关操作的 API,它的主要属性如下,具体 API 之后会慢慢展现。
public class DefaultMessageStore implements MessageStore {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
/**
* 消息存储的配置
*/
private final MessageStoreConfig messageStoreConfig;
/**
* CommitLog 文件的实现
*/
private final CommitLog commitLog;
/**
* 具体消费队列映射的缓存表
* 一个 Topic 对应好几个 Queue,一个 queueId 对应一个消费队列
*/
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
/**
* 消费队列文件的刷新线程
*/
private final FlushConsumeQueueService flushConsumeQueueService;
/**
* 清除 CommitLog 的服务类
*/
private final CleanCommitLogService cleanCommitLogService;
/**
* 清楚 ConsumeQueue 的服务类
*/
private final CleanConsumeQueueService cleanConsumeQueueService;
/**
* 索引文件实现类
*/
private final IndexService indexService;
/**
* 分配映射文件服务类
*/
private final AllocateMappedFileService allocateMappedFileService;
/**
* 消息分发服务类,根据 CommitLog 构建 ConsumerQueue 和 IndexFile 文件
*/
private final ReputMessageService reputMessageService;
/**
* 高可用服务类
*/
private final HAService haService;
/**
* 延时消息服务类
*/
private final ScheduleMessageService scheduleMessageService;
/**
* 存储状态服务类
*/
private final StoreStatsService storeStatsService;
/**
* 消息堆内存缓存,提供一种内存锁定,保证其中的内容不会被交换到磁盘交换区
*/
private final TransientStorePool transientStorePool;
/**
* 消息拉取长轮询机制消息到达的监听器
*/
private final MessageArrivingListener messageArrivingListener;
/**
* Broker 服务器的配置
*/
private final BrokerConfig brokerConfig;
/**
* 刷盘监测点
*/
private StoreCheckpoint storeCheckpoint;
/**
* CommitLog 文件转发请求
*/
private final LinkedList<CommitLogDispatcher> dispatcherList;
更新消费队列和索引文件
因为 ConsummeQueue 和 IndexFile 不存储消息的具体数据,而都是基于 CommitLog 构建的,所以当 CommitLog 进行更新的时候,需要及时将消息更新到上述两个文件中,RocketMQ 通过开启一个 ReputMessageService 的线程来准实时转发更新事件。
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
很显然可以从 run 方法看出,每过 1 ms 就会调用一次 doReput 方法,更新一次消息。
private void doReput() {
// 省略一些校验
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
// . . .
// 返回指定偏移量之后的消息
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
// 遍历每条消息内容
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 省略一些校验
// 真正对消息进行更新就是在该 doDispatch 方法中
DefaultMessageStore.this.doDispatch(dispatchRequest);
// 省略分发完之后对于结果的处理
}
}
在 RePut 方法中,回去出所有消息,遍历调用 doDispatch 方法去真正的分发并更新。
doDispatch 具体有如下两个实现,分别对应了对于 ConsummeQueue 和 IndexFile 的更新。
- 对于 ConsummeQueue 的更新就是取出更新请求对应的消息 Topic 和队列 ID 找到对应的 ConsummeQueue 然后写入具体的数据进行更新。
- 对于 IndexFile 的更新也类似,就是封装对应的具体数据,然后进行一些检测之后加入到文件中。
但是因为对于 CommitLog 的更新和之后 ConsummeQueue 以及 IndexFile 的更新并不是同步完成的,所以在 Broker 异常宕机下线的条件下,可能产生数据的不一致问题。所以 RocketMQ 需要对文件进行恢复来保证最终一致性。
首先就需要对是否是异常宕机退出进行判断,主要在下面这段逻辑中实现,因为 Broker 启动的时候注册的 JVM 钩子函数会删除 abort 文件,所以只需要检测是否存在 abort 文件,就可以判断是否是异常宕机。如果是异常退出就可以存在数据不一致需要修复。
private boolean isTempFileExist() {
String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
File file = new File(fileName);
return file.exists();
}
之后的恢复会根据 CheckPoint 的检查刷盘点来得知从什么位置开始同步。之后对于文件的刷盘以及过期删除等和类似 MySQL Redis 的实现都大体相同,对于磁盘的异步刷新,以及每天凌晨4点删除过期文件,或者在磁盘空间不足的时候删除,文件的默认过期时间为 72 小时。
五、消息消费
消息被成功发送到队列中之后需要考虑的就是消息如何被消费,包括消息如何被消费如何被拉取,以及 RocketMQ 的高级特性,顺序消息,消息过滤等等。
概述
RocketMQ 的消费者如以下继承关系:
对于以上接口的方法基本都能望文生义了,所以直接介绍默认的实现 DefaultMQPushConsumer。
主要属性:
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
/**
* 具体消费者方法的一些实现
*/
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
/**
* 消费者属于的组
*/
private String consumerGroup;
/**
* 消息的消费模式,分为集群模式和广播模式
* 默认为集群模式
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
/**
* 消费策略,从什么位置开始消费,有几种不同的模式,在 ConsumeFromWhere 常量中定义
*/
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
/**
* 集群模式队列的分配策略
*/
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
/**
* 消费者对于 Topic 的订阅关系
*/
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
/**
* 消息的监听器
*/
private MessageListener messageListener;
/**
* 存储消费进度
*/
private OffsetStore offsetStore;
/**
* 消费者最小的线程数
*/
private int consumeThreadMin = 20;
/**
* 消费者最大线程数,因为任务队列是无界队列,所以该参数没用
*/
private int consumeThreadMax = 20;
/**
* Threshold for dynamic adjustment of the number of thread pool
*/
private long adjustThreadPoolNumsThreshold = 100000;
/**
* 并发处理消息时的最大跨度,如果超过,则会延迟发送消息
*/
private int consumeConcurrentlyMaxSpan = 2000;
/**
* 默认每个队列缓存的队列个数
*/
private int pullThresholdForQueue = 1000;
/**
* 拉取消息的间隔时间,0表示拉取完一个任务就继续拉取
*/
private long pullInterval = 0;
/**
* 并发消费时一次可消费的最大条数
*/
private int consumeMessageBatchMaxSize = 1;
/**
* 批量拉取消息的条数
*/
private int pullBatchSize = 32;
/**
* 是否能在拉取的时候更新订阅信息
*/
private boolean postSubscriptionWhenPull = false;
/**
* Whether the unit of subscription group
*/
private boolean unitMode = false;
/**
* 最大重试次数
* In concurrently mode, -1 means 16;
* In orderly mode, -1 means Integer.MAX_VALUE.
*/
private int maxReconsumeTimes = -1;
/**
* 延迟将队列的消息提交到消费者线程
*/
private long suspendCurrentQueueTimeMillis = 1000;
/**
* 最大的消息阻塞在消费者的时间
*/
private long consumeTimeout = 15;
在消费者启动的时候,也就是调用 DefaultMQPushConsumerImpl 的 start 方法,会首先保存订阅的信息,并实例化一些交互客户端,然后初始化消费进度,如果是集群模式则保存在 Broker ,如果是广播模式则保存在每一个消费端。最后开启消息消费的线程池。
消息拉取
消息消费主要包括两种模式:广播模式和集群模式,广播模式比较简单,就是所有消费者都需要去队列中拉取消息,集群模式则每个消费组存在多个消费者以及一个 Topic 包含多个队列,对于主题的订阅基于消费组,并且会将主题下的所有队列根据分配策略分配给不同的具体消费者,一个队列只能分配给一个消费者,但是一个消费者可以消费多个队列,也就是一个队列的消息只能被某一个消费者接受到,
RocketMQ 使用一个单独的线程 PullMessageService 来负责消息的拉取,既然是一个线程,核心逻辑肯定就定义在 run 方法中:
很容易看出主要就包括两步:① 从拉取消息请求的阻塞队列中得到拉取请求;②根据请求去拉取消息;
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
所以消息拉取的关键就是这个 PullRequest,这个拉取请求是什么,以及这个拉取请求什么时候会产生。
PullRequest 的属性主要包括消费者的消费组,需要拉取的消息队列,拉取之后保存的队列,以及下一个拉取的偏移量和是否被锁定。
这个 processQueue 是消息队列在消费者端的快照重现,每次 PullMessageService 从 Broker 拉取消息之后就会存放到该队列中,如何提交到消费者的线程池进行消费。
public class PullRequest {
private String consumerGroup;
private MessageQueue messageQueue;
private ProcessQueue processQueue;
private long nextOffset;
private boolean previouslyLocked = false;
}
之后消息的拉取主要分为三个重要的步骤:
- 第一步,需要拉取消息的客户端封装拉取的请求,也就是将 PullRequest 变成真正的拉取;
方法的实现在 DefaultMQPushConsumerImpl.pullMessage() 方法中,主要做的事是对环境状态做一些变更,对流控做一些限制,为消息拉取系统做一些标记,之后就会通过 BrokerId 和 BrokerName 获取 Broker 的具体地址,进行消息的拉取。
- 第二步,Broker 接受到拉取请求就会组装消息,具体的实现在 PullMessageProcessor.processRequest(…)方法中,就是根据具体信息对消息进行封装。
- 第三步,消息拉取的客户端收到拉取回的消息之后进行处理,其实也是很多信息的封装。
整个流程从架构上看不复杂,但是核心源码却很复杂,大多是一些偏移量的校验,一些标记的处理来保证可用。
RocketMQ 消息的拉取采用的是长轮询机制,消费者客户端拉取消息的请求到达服务器之后如果没有对应消息会被挂起,然后 RocketMQ 本身每 5s 检测一次是否有新消息到达,有的话唤醒线程检查是否是感兴趣的消息,如果是,则从 CommitLog 中提取并返回,如果不是,会挂起等待超时,并发起下一次轮询,默认一次是 15s。
将消息拉取到 ProcessQueue 处理队列之后就可以提交到消费者客户端的线程池进行消费,并会维护消费的进度,以及在消费之后发送 ACK 的响应方便进行重发或者延时等操作。
定时消息
定时消息指的是消息发送到 Broker 之后,不立即被消费而是等到特定的时间再消费,类似于延时队列,例如可以在订单定时退单的时候使用。由于为了保证性能,RocketMQ 并不支持任意时间的延时,而是在特定的时间中进行选择。包括 1s 5s 10s 30s 1m 等等。
延时消息主要通过 ScheduleMessageService 来实现。
public class ScheduleMessageService extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// 第一次延时调度的默认时间
private static final long FIRST_DELAY_TIME = 1000L;
// 每次延时调度之后,再放入调度池的延时时间
private static final long DELAY_FOR_A_WHILE = 100L;
// 异常之后的等待时间
private static final long DELAY_FOR_A_PERIOD = 10000L;
// 存储延时级别
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<Integer, Long>(32);
// 存储每个延时级别的消费进度
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
// 消息存储实现
private final DefaultMessageStore defaultMessageStore;
// . . .
}
具体实现的调用链路包括 load -> start,也就是先调用 load 方法进行一些信息的加载,然后调用 start 开启延时的调度。
每种延时级别会对应一个定时任务,第一次会先调度一次,然后之后每过延迟时间一次会进行一次调度。
public void start() {
// CAS 设置启动标记
if (started.compareAndSet(false, true)) {
// 加载具体信息,包括
super.load();
this.timer = new Timer("ScheduleMessageTimerThread", true);
// 遍历所有延时等级,每个延时等级对应一个定时任务轮询
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 每 10s 进行一次进度的持久化
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) {
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
然后具体分析定时任务 DeliverDelayedMessageTimerTask 是什么。该类的核心逻辑在 run 方法中,主要依靠 executeOnTimeup 方法来实现定时逻辑。
- 第一步,会通过 Topic 和 ID 找到对应延时主题的消息队列;
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
- 第二步:根据偏移量找到有效的处于该队列的消息;
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
- 第三步:遍历得到的ConsummeQueue里对应的 CommitLog 中的消息;
- 第四步:得到消息之后会构建真实的消息实体,并转发到真正的 Topic 中供消费端消费,并清除该定时消息;
- 最后:构建下一次延迟时间的定时任务,结束本次的调度。
顺序消息
RocketMQ 只能支持局部消息顺序消费,即一个消费队列中的消息被顺序消费,如果需要全局的顺序,则可以将 Topic 配置为一个队列,例如对于 binlog 的订阅。
所以对于发送顺序消息,只需要一个线程同步的往同一个队列中发送消息,就可以保证消费者的顺序消费;
而对于顺序消费,就比较复杂,特别是在集群模式。
- 因为集群模式,消息的拉取可能会分配到不同的队列,所以分配到新队列拉取消息之前,会尝试对该队列加锁,只有加锁成功才会继续进行拉取任务,否则需要等待其他队列释放该队列的锁;
- 如果该消息队列没有被锁定,就会构建一个 PullRequest 延迟三秒之后放入到拉取任务队列。
- 另外集群模式消费者会有一个 20s 执行一次的定时任务,给所有自己的队列加锁,如果加锁成功则可以对消息进行拉取和消费。
- 在消费者消费的时候,因为会通过线程池消费,所以消费的时候还会尝试获取 objLock 才能消费消息。
- 也就是通过锁来保证队列的独占同步访问。
六、事务消息
RocketMQ 事务消息的实现原理基于两阶段提交和定时事务状态回查来决定消息最终是提交还是回滚。总的大体流程是先开启一个事务,向 MQ 中发送一条 Prepare 的消息,该消息没有提交之前是看不见的,然后执行本地的事务,如果本地事务执行成功,就可以将消息发送,如果失败则回滚消息,会通过定时任务来回查本地事务的执行状况。
我觉得和先执行本地事务再同步发送消息的区别在于对于消息进行了持久化保证不丢失。
事务的消息的发送主要依靠 TransactionMQProducer 类发送,主要包括两个属性:
/**
* 执行异步事务状态回查的线程池
*/
private ExecutorService executorService;
/**
* 事务的监听器
* 主要实现了对本地事务状态的执行和回查
*
* (1)执行本地事务方法,重写该方法执行本地事务
* LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
*
* (2)回查事务消息方法
* LocalTransactionState checkLocalTransaction(final MessageExt msg);
*/
private TransactionListener transactionListener;
继续分析该类对于事务消息的发送流程,对于事务消息的发送是在 DefaultMQProducerImpl.sendMessageInTransaction(…) 方法。主要分为三个大步:
第一步:
做一些校验以及设置消息发送信息步骤。
第二步:
对消息队列发送 prepare 消息,该 prepare 消息是发送到主题为 RMQ_SYS_TRANS_HALF_TOPIC 中,也就是提交之前不能被原消费者消费,提交之后会从该队列中删除加入到预先定义的 Topic,类似于定时消息。再根据发送的结果,做进步操作。
- 如果发送成功,则执行本地事务,该方法职责是记录事务消息的本地事务状态,例如存储该消息 ID 和状态到数据库中,或者再加入业务事务需求,将这两个操作放在一个事务中,保证原子性;
- 如果没有发送成功,则设置事务状态为回滚。
第三步:
结束具体事务,并根据上一步得到事务最终的结果以及执行对应操作。
- 如果本地事务执行成功,则将该 prepare 消息从原 Topic 中取出,构建消息加入到实现需要放置的 Topic 并把原 Topic 加入到 RMQ_SYS_OP_TRANS_HALF_TOPIC 表示该消息已经被处理过;
- 如果本地事务执行失败,则不恢复原主题,并加入到处理过消息的 Topic;
本地事务状态的回查使用的是 BrokerController 内部的 TransactionalMessageCheckService 线程定时去检查 RMQ_SYS_TRANS_HALF_TOPIC 中的消息,默认频率为 1 分钟。检查使用的方法还是在 transactionListener 中实现的 checkLocalTransaction 方法。然后根据检查的结果回滚或者提交。
七、总结
感觉 RocketMQ 和 RabbitMQ 的区别还是挺大的,包括整体的架构,消息的发送拉取方式等,RocketMQ 通过 NameServer 来对消息队列进行集中路由的管理注册,通过 Broker 来对队列进行具体的管理,还提供了顺序消息,延时消息,事务消息等机制,对于一些 ACK 等的内容以及消息重发的内容还存有一些疑惑没有理清楚,之后还会继续学习。