一、功能概述
DefaultMessageStore是RocketMQ底层存储对外层提供服务的窗口,它通过组织CommitLog、ConsumeQueue、IndexFile来完成RocketMQ存储的功能。本篇分析DefaultMessageStore提供的功能和启动过程的源码实现。
二、源码分析
1、MessageStore存储接口
首先,我们看一下DefaultMessage提供的功能窗口,DefaultMessageStore实现了MessageStore接口,MessageStore定义的接口大概可以分为以下几类:
2、DefaultMessageStore成员变量
private final MessageStoreConfig messageStoreConfig;
// CommitLog
private final CommitLog commitLog;
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
// 负责ConsumeQueue刷盘
private final FlushConsumeQueueService flushConsumeQueueService;
// 负责清理过期的CommitLog文件
private final CleanCommitLogService cleanCommitLogService;
// 负责清理过期的ConsumeQueue文件
private final CleanConsumeQueueService cleanConsumeQueueService;
private final IndexService indexService;
// 负责MappedFile的创建
private final AllocateMappedFileService allocateMappedFileService;
// 负责消息重放(通知ConsumeQueue和indexFile创建对应的索引、通知pull消息请求消息到达)
private final ReputMessageService reputMessageService;
// 负责消息的主从备份
private final HAService haService;
// 负责处理延迟消息队列
private final ScheduleMessageService scheduleMessageService;
// 负责打印当前存储状态
private final StoreStatsService storeStatsService;
// 直接内存暂存池
private final TransientStorePool transientStorePool;
private final RunningFlags runningFlags = new RunningFlags();
private final SystemClock systemClock = new SystemClock();
private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
private final BrokerStatsManager brokerStatsManager;
private final MessageArrivingListener messageArrivingListener;
private final BrokerConfig brokerConfig;
private StoreCheckpoint storeCheckpoint;
private final LinkedList<CommitLogDispatcher> dispatcherList;
//省略了部分成员变量
首先,DefaultMessageStore存储依赖三类文件:CommitLog、ConsumeQueue、IndexFile。
一个DefaultMessageStore持有一个CommitLog对象。
ConsumeQueue是以一个Map嵌套Map组织的,每个topic下的每个queue都对应一个ConsumeQueue,用来索引改Queue下消息在CommitLog中的位置。
IndexFile是由IndexService提供服务的,IndexService持有一个IndexFile的List。
DefaultMessageStore持有了一些Service对象:
这里只列出来各个Service的功能,具体实现会在后续文章中分析。
DefaultMessageStore还持有一个比较重要的对象:MessageArrivingListener。MessageArrivingListener作为一个消息到达的监听器,在消息写入后,会通知消息到达,触发向拉取消息的客户端写入数据。
3、DefaultMessageStore启动过程
DefaultMessageStore进入正常工作之前,需要经过创建对象、load、start这三个步骤。
DefaultMessageStore只有一个构造方法,其执行内容就是给一些成员变量赋值,并且调用了AllocateMappedFileService、IndexService的start()方。IndexService的start()方法是空的,没有什么内容。AllocateMappedFileService调用start()后,会创建一个线程,定时从队列中获取创建MappedFile文件的请求,然后根据请求内容创建MappedFile。关于MappedFile的内容,我们在前面分析CommitLog的文章中有分析过,这里不再解释。
创建完DefaultMessageStore后,需要做调用load()方法,load()方法的作用主要是从磁盘中加载CommitLog、ConsumeQueue、storeCheckpoint对象,并且恢复CommitLog,构建成员变量consumeQueueTable。下面是具体实现细节:
public boolean load() {
boolean result = true;
try {
// 通过temp文件判断上次退出是否是正常退出
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally")