RocketMQ-store目录结构分析

  • Post author:
  • Post category:其他


storePathRootDir=/cache1/rocketmq/broker/data
├── abort
├── checkpoint
├── commitlog
│   └── 00000000037580963840
├── config
│   ├── consumerFilter.json
│   ├── consumerFilter.json.bak
│   ├── consumerOffset.json
│   ├── consumerOffset.json.bak
│   ├── delayOffset.json
│   ├── delayOffset.json.bak
│   ├── subscriptionGroup.json
│   ├── subscriptionGroup.json.bak
│   ├── topics.json
│   └── topics.json.bak
├── consumequeue
│   ├── BenchmarkTest
│   ├── BenchmarkTestZj
│   └── ZJ_TEST
├── index
│   └── 20200720163452641
└── lock
public class StorePathConfigHelper {

    public static String getStorePathConsumeQueue(final String rootDir) {
        return rootDir + File.separator + "consumequeue";
    }

    public static String getStorePathConsumeQueueExt(final String rootDir) {
        return rootDir + File.separator + "consumequeue_ext";
    }

    public static String getStorePathIndex(final String rootDir) {
        return rootDir + File.separator + "index";
    }

    public static String getStoreCheckpoint(final String rootDir) {
        return rootDir + File.separator + "checkpoint";
    }

    public static String getAbortFile(final String rootDir) {
        return rootDir + File.separator + "abort";
    }

    public static String getLockFile(final String rootDir) {
        return rootDir + File.separator + "lock";
    }

    public static String getDelayOffsetStorePath(final String rootDir) {
        return rootDir + File.separator + "config" + File.separator + "delayOffset.json";
    }
}



abort文件

用于判断broker是否正常关闭,在broker启动时创建,关闭时删除,如果broker异常退出,则文件会一直存在,在启动时会走其他流程进行文件修复等



checkpoint文件

public StoreCheckpoint(final String scpPath) throws IOException {
    File file = new File(scpPath);
    MappedFile.ensureDirOK(file.getParent());
    boolean fileExists = file.exists();

    this.randomAccessFile = new RandomAccessFile(file, "rw");
    this.fileChannel = this.randomAccessFile.getChannel();
    this.mappedByteBuffer = fileChannel.map(MapMode.READ_WRITE, 0, MappedFile.OS_PAGE_SIZE);

    if (fileExists) {
        log.info("store checkpoint file exists, " + scpPath);
        this.physicMsgTimestamp = this.mappedByteBuffer.getLong(0);
        this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);
        this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);

        log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", "
            + UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));
        log.info("store checkpoint file logicsMsgTimestamp " + this.logicsMsgTimestamp + ", "
            + UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp));
        log.info("store checkpoint file indexMsgTimestamp " + this.indexMsgTimestamp + ", "
            + UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));
    } else {
        log.info("store checkpoint file not exists, " + scpPath);
    }
}

文件中有三个时间戳:

  1. physicMsgTimestamp:commitlog文件最后一次落盘时间
  2. logicsMsgTimestamp:consumequeue最后一次落盘时间
  3. indexMsgTimestamp:索引文件最后一次落盘时间



commitlog目录

存放消息实体,所有topic的消息都会通过追加的方式往commitlog文件中写入



consumequeue目录

逻辑队列,只有写入到这里的数据,才能够被消费者消费



index文件目录

存放索引文件,只有写入到这里的数据,才能够通过key或者msgId等进行查询



lock文件

锁文件,在consumer进行rebalance的时候,



config目录



consumerOffset.json

存放消费组消费的进度

{
    "offsetTable":{
        #Topic@ConsumeGroup:queue-0消费到offset=11,接下来要从offset=11开始取数据
        "ZJ_TEST@C_ZJ_GROUP1":{0:11,1:12}
    }
}



版权声明:本文为zhangjun039009原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。