RocketMQ存储之DefaultMessageStore(一)

  • Post author:
  • Post category:其他




一、功能概述

DefaultMessageStore是RocketMQ底层存储对外层提供服务的窗口,它通过组织CommitLog、ConsumeQueue、IndexFile来完成RocketMQ存储的功能。本篇分析DefaultMessageStore提供的功能和启动过程的源码实现。



二、源码分析



1、MessageStore存储接口

首先,我们看一下DefaultMessage提供的功能窗口,DefaultMessageStore实现了MessageStore接口,MessageStore定义的接口大概可以分为以下几类:

MessageStore方法分类



2、DefaultMessageStore成员变量

    private final MessageStoreConfig messageStoreConfig;
    // CommitLog
    private final CommitLog commitLog;

    private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;

    // 负责ConsumeQueue刷盘
    private final FlushConsumeQueueService flushConsumeQueueService;

    // 负责清理过期的CommitLog文件
    private final CleanCommitLogService cleanCommitLogService;

    // 负责清理过期的ConsumeQueue文件
    private final CleanConsumeQueueService cleanConsumeQueueService;

    private final IndexService indexService;

    // 负责MappedFile的创建
    private final AllocateMappedFileService allocateMappedFileService;

    // 负责消息重放(通知ConsumeQueue和indexFile创建对应的索引、通知pull消息请求消息到达)
    private final ReputMessageService reputMessageService;

    // 负责消息的主从备份
    private final HAService haService;

    // 负责处理延迟消息队列
    private final ScheduleMessageService scheduleMessageService;

    // 负责打印当前存储状态
    private final StoreStatsService storeStatsService;

    // 直接内存暂存池
    private final TransientStorePool transientStorePool;

    private final RunningFlags runningFlags = new RunningFlags();
    private final SystemClock systemClock = new SystemClock();

    private final ScheduledExecutorService scheduledExecutorService =
        Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
    private final BrokerStatsManager brokerStatsManager;
    private final MessageArrivingListener messageArrivingListener;
    private final BrokerConfig brokerConfig;

    private StoreCheckpoint storeCheckpoint;

    private final LinkedList<CommitLogDispatcher> dispatcherList;

	//省略了部分成员变量	

首先,DefaultMessageStore存储依赖三类文件:CommitLog、ConsumeQueue、IndexFile。

一个DefaultMessageStore持有一个CommitLog对象。

ConsumeQueue是以一个Map嵌套Map组织的,每个topic下的每个queue都对应一个ConsumeQueue,用来索引改Queue下消息在CommitLog中的位置。

IndexFile是由IndexService提供服务的,IndexService持有一个IndexFile的List。

DefaultMessageStore持有了一些Service对象:

Service

这里只列出来各个Service的功能,具体实现会在后续文章中分析。

DefaultMessageStore还持有一个比较重要的对象:MessageArrivingListener。MessageArrivingListener作为一个消息到达的监听器,在消息写入后,会通知消息到达,触发向拉取消息的客户端写入数据。



3、DefaultMessageStore启动过程

DefaultMessageStore进入正常工作之前,需要经过创建对象、load、start这三个步骤。

DefaultMessageStore只有一个构造方法,其执行内容就是给一些成员变量赋值,并且调用了AllocateMappedFileService、IndexService的start()方。IndexService的start()方法是空的,没有什么内容。AllocateMappedFileService调用start()后,会创建一个线程,定时从队列中获取创建MappedFile文件的请求,然后根据请求内容创建MappedFile。关于MappedFile的内容,我们在前面分析CommitLog的文章中有分析过,这里不再解释。

创建完DefaultMessageStore后,需要做调用load()方法,load()方法的作用主要是从磁盘中加载CommitLog、ConsumeQueue、storeCheckpoint对象,并且恢复CommitLog,构建成员变量consumeQueueTable。下面是具体实现细节:

    public boolean load() {
   
        boolean result = true;
        try {
   
        	// 通过temp文件判断上次退出是否是正常退出
            boolean lastExitOK = !this.isTempFileExist();
            log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally")



版权声明:本文为feijianke666原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。