rocketMq-消息存储-consumeQueue

  • Post author:
  • Post category:其他


前言

上篇文章介绍了,消息存储,把所有队列的消息存储在commitLog大文件,然而我们消费的时候是按照队列订阅的,如果要消费队列的一个消息,要去一个大文件(其实是多个物理文件,每个文件1G)去查找,不知道查到啥时候,因为无法定位这个消息在commitLog 哪里;所有才有了为每个队列建立索引,消息者根据queueId 去consumeQueue查找physicsOffset,在根据physicsOffset 找到消息

consumeQueue流程介绍

在broker 中,一个topic 下面的一个queue id 会对应一个consumeQueue,然后一个consumeQueue 会有一个mappedFileQueue,这mappedFileQueue其实就是一个集合,然后里面有一堆的MappedFile ,然后每个MappedFile映射的文件能存储30w条信息,每条占20个字节,大于400多m空间。

服务启动的时候,DefaultMessageStore,就会起一个线程按照consumeQueue 下最大的physicsOffset,去遍历commitLog后面的

上源码

遍历所有consumeQueue 取出最大physicalOffset ,作为入参开始追加consumeQueue 数据;


我们先看获取该文件的可读buffer

public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
        // 查找对应mappedFile
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
        if (mappedFile != null) {
            int pos = (int) (offset % mappedFileSize);
            //返回可读的buffer 结果
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
            return result;
        }

上面是查找该physicalOffset 对应的mappedFile

这个就是nio 的byteBuffer 特性,就是返回physicalOffset后面的所有内容

单线程不停的执行doReput 方法,这个方法先获取下个要读的文件,通过bytebuffer 获取该文件剩余未构建consumeQueue的内容,在遍历读取每个消息,把每个消息观通过观察模式doDispatch

其中有两个观察员一个就是本文章要讲的consumeQueue,还有一个就是下篇文章要讲的BuildIndex,接下来我们看CommitLogDispatcherBuildConsumeQueue

CommitLogDispatcherBuildConsumeQueue

重点是putmessagePositionInfo 方法

重点是存储,而存储核心代码putMessagePositionInfoWrapper#putMessagePositionInfo;直接看这个方法

只存储了3个属性,physicOffset( 拿到这个就可以很快找到消息内容),消息大小,tagsCode;

通过消息体解析出来的队列id * 尺寸(20)就能定位到那个mappFile 了

最后


mappedFile.appendMessage(this.byteBufferIndex.array());


这样就写入内存了,并没有刷新内存也不需要


总结

会有一个单线程不停的读取commitLog 数据,把解析到的physicOffset等消息,根据消息里的queueId去定位(定位逻辑:queueId*20)在哪个consumeQueue ,在进行追加;

消息者就可以通过topic queueId 定位一组consumeQueue,然后通过queue offset 定位到具体某个consumeQueue的physicOffset,根据physicOffset 就能很快定位到commitLog ,拿到对应的消息。



版权声明:本文为qq_30834791原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。