前言
上篇文章介绍了,消息存储,把所有队列的消息存储在commitLog大文件,然而我们消费的时候是按照队列订阅的,如果要消费队列的一个消息,要去一个大文件(其实是多个物理文件,每个文件1G)去查找,不知道查到啥时候,因为无法定位这个消息在commitLog 哪里;所有才有了为每个队列建立索引,消息者根据queueId 去consumeQueue查找physicsOffset,在根据physicsOffset 找到消息
consumeQueue流程介绍
在broker 中,一个topic 下面的一个queue id 会对应一个consumeQueue,然后一个consumeQueue 会有一个mappedFileQueue,这mappedFileQueue其实就是一个集合,然后里面有一堆的MappedFile ,然后每个MappedFile映射的文件能存储30w条信息,每条占20个字节,大于400多m空间。
服务启动的时候,DefaultMessageStore,就会起一个线程按照consumeQueue 下最大的physicsOffset,去遍历commitLog后面的
上源码
遍历所有consumeQueue 取出最大physicalOffset ,作为入参开始追加consumeQueue 数据;
我们先看获取该文件的可读buffer
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
// 查找对应mappedFile
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
//返回可读的buffer 结果
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
上面是查找该physicalOffset 对应的mappedFile
这个就是nio 的byteBuffer 特性,就是返回physicalOffset后面的所有内容
单线程不停的执行doReput 方法,这个方法先获取下个要读的文件,通过bytebuffer 获取该文件剩余未构建consumeQueue的内容,在遍历读取每个消息,把每个消息观通过观察模式doDispatch
其中有两个观察员一个就是本文章要讲的consumeQueue,还有一个就是下篇文章要讲的BuildIndex,接下来我们看CommitLogDispatcherBuildConsumeQueue
CommitLogDispatcherBuildConsumeQueue
重点是putmessagePositionInfo 方法
重点是存储,而存储核心代码putMessagePositionInfoWrapper#putMessagePositionInfo;直接看这个方法
只存储了3个属性,physicOffset( 拿到这个就可以很快找到消息内容),消息大小,tagsCode;
通过消息体解析出来的队列id * 尺寸(20)就能定位到那个mappFile 了
最后
mappedFile.appendMessage(this.byteBufferIndex.array());
这样就写入内存了,并没有刷新内存也不需要
总结
会有一个单线程不停的读取commitLog 数据,把解析到的physicOffset等消息,根据消息里的queueId去定位(定位逻辑:queueId*20)在哪个consumeQueue ,在进行追加;
消息者就可以通过topic queueId 定位一组consumeQueue,然后通过queue offset 定位到具体某个consumeQueue的physicOffset,根据physicOffset 就能很快定位到commitLog ,拿到对应的消息。