storePathRootDir=/cache1/rocketmq/broker/data
├── abort
├── checkpoint
├── commitlog
│ └── 00000000037580963840
├── config
│ ├── consumerFilter.json
│ ├── consumerFilter.json.bak
│ ├── consumerOffset.json
│ ├── consumerOffset.json.bak
│ ├── delayOffset.json
│ ├── delayOffset.json.bak
│ ├── subscriptionGroup.json
│ ├── subscriptionGroup.json.bak
│ ├── topics.json
│ └── topics.json.bak
├── consumequeue
│ ├── BenchmarkTest
│ ├── BenchmarkTestZj
│ └── ZJ_TEST
├── index
│ └── 20200720163452641
└── lock
public class StorePathConfigHelper {
public static String getStorePathConsumeQueue(final String rootDir) {
return rootDir + File.separator + "consumequeue";
}
public static String getStorePathConsumeQueueExt(final String rootDir) {
return rootDir + File.separator + "consumequeue_ext";
}
public static String getStorePathIndex(final String rootDir) {
return rootDir + File.separator + "index";
}
public static String getStoreCheckpoint(final String rootDir) {
return rootDir + File.separator + "checkpoint";
}
public static String getAbortFile(final String rootDir) {
return rootDir + File.separator + "abort";
}
public static String getLockFile(final String rootDir) {
return rootDir + File.separator + "lock";
}
public static String getDelayOffsetStorePath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "delayOffset.json";
}
}
abort文件
用于判断broker是否正常关闭,在broker启动时创建,关闭时删除,如果broker异常退出,则文件会一直存在,在启动时会走其他流程进行文件修复等
checkpoint文件
public StoreCheckpoint(final String scpPath) throws IOException {
File file = new File(scpPath);
MappedFile.ensureDirOK(file.getParent());
boolean fileExists = file.exists();
this.randomAccessFile = new RandomAccessFile(file, "rw");
this.fileChannel = this.randomAccessFile.getChannel();
this.mappedByteBuffer = fileChannel.map(MapMode.READ_WRITE, 0, MappedFile.OS_PAGE_SIZE);
if (fileExists) {
log.info("store checkpoint file exists, " + scpPath);
this.physicMsgTimestamp = this.mappedByteBuffer.getLong(0);
this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);
this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);
log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));
log.info("store checkpoint file logicsMsgTimestamp " + this.logicsMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp));
log.info("store checkpoint file indexMsgTimestamp " + this.indexMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));
} else {
log.info("store checkpoint file not exists, " + scpPath);
}
}
文件中有三个时间戳:
- physicMsgTimestamp:commitlog文件最后一次落盘时间
- logicsMsgTimestamp:consumequeue最后一次落盘时间
- indexMsgTimestamp:索引文件最后一次落盘时间
commitlog目录
存放消息实体,所有topic的消息都会通过追加的方式往commitlog文件中写入
consumequeue目录
逻辑队列,只有写入到这里的数据,才能够被消费者消费
index文件目录
存放索引文件,只有写入到这里的数据,才能够通过key或者msgId等进行查询
lock文件
锁文件,在consumer进行rebalance的时候,
config目录
consumerOffset.json
存放消费组消费的进度
{
"offsetTable":{
#Topic@ConsumeGroup:queue-0消费到offset=11,接下来要从offset=11开始取数据
"ZJ_TEST@C_ZJ_GROUP1":{0:11,1:12}
}
}
版权声明:本文为zhangjun039009原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。