源码分析RocketMQ之broker-消息ACK机制

  • Post author:
  • Post category:其他


Consumer启动后,会请求broker,broker接收请求网络

RemotingCommand 消息传输过程中对数据内容的封装结构,主要属性

code:请求操作码,应答方根据不同的请求码进行不同的业务处理

language:请求方实现的语言

version:版本

opaque:相当于requestId,在同一个连接上的不同请求标识码,与响应消息中相对应

flag:区分普通RPC还是oneWay RPC标识

remark:传输自定义文本信息

extFields:请求自定义扩展信息

customHeader:自定义头,不进行序列化

body:消息主体的二进制字节数据内容

NettyRemotingServer 服务端Netty start如下处理 ServerBootstrap

serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {


@Override

public void initChannel(SocketChannel ch) throws Exception {


ch.pipeline()

.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)

.addLast(defaultEventExecutorGroup,

encoder,

new NettyDecoder(),

//心跳

new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),

connectionManageHandler,//连接管理

serverHandler//真正干活的是这个线程池,有连接来的时候boss处理,channel有事件发生的时候 selecter将事件读出来,然后交给EventExecutor来处理

);

}

});

NettyRemotingClient 客户端Netty start如下处理 Bootstrap

bootstrap.handler(new ChannelInitializer<SocketChannel>() {


@Override

public void initChannel(SocketChannel ch) throws Exception {


ChannelPipeline pipeline = ch.pipeline();

if (nettyClientConfig.isUseTLS()) {


if (null != sslContext) {


pipeline.addFirst(defaultEventExecutorGroup, “sslHandler”, sslContext.newHandler(ch.alloc()));

log.info(“Prepend SSL handler”);

} else {


log.warn(“Connections are insecure as SSLContext is null!”);

}

}

pipeline.addLast(

//使用这个线程组来处理processor

defaultEventExecutorGroup,

new NettyEncoder(),

new NettyDecoder(),

//心跳

new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),

new NettyConnectManageHandler(),//链接管理

new NettyClientHandler());//处理消息的接收

}

});

NettyEncoder和NettyDecoder 是对RemotingCommand的编码解码,解析消息头,解析出消息内容,粘包拆包

BrokerController的构造方法创建PullMessageProcessor 通过remotingServer注册PULL_MESSAGE 消息拉取请求

processRequest负责对consumer的消费请求进行处理

1、创建响应头RemotingCommand

2、读取消息响应头

3、解码拉取消息的请求头拓展字段

4、权限、订阅关系、topic是否存在、队列有效性、消费者组是否存在、广播模式检查

5、DefaultMessageStore.getMessage

入参:

group:消费者组名

topic:消息主题

queueId:消息队列ID

offset:拉取的消费队列偏移量

maxMsgNums:一次拉取消息条数,默认32条

messageFilter:消息过滤器

1、获取commitLog文件中的最大偏移量

2、根据topic、queueId获取消息消费队列ConsumeQueue

3、获取此消费队列最小偏移量和最大偏移量

4、根据需要拉取消息的偏移量与队列最大最小偏移量对比

1、maxOffset为0表示队列中没有消息

2、offset < minOffset 需要的offset比队列中的最新offset小,设置下次拉取最小的offset,

3、offset == maxOffset 表示超出一个 返回状态:OFFSET_OVERFLOW_ONE,offset 保持不变

4、offset > maxOffset 表示超出,OFFSET_OVERFLOW_BADLY,计算下一次拉取拉取的开始偏移量

5、offset 大于minOffset 并小于maxOffset,正常情况

5、从consuequeue 中从当前 offset 到当前 consueque 中最大可读消息内存

6、获取最大过滤消息字节数,max(16000, maxMsgNums * 20) 最低16000 因为有消息过滤机制,可能不满足

指定拉取的消息数,尽量满足返回这么多条消息

7、循环拉取消息

1、bufferConsumeQueue消费队列中获取commitLog的偏移量、消息总长度、tag hashcode

2、如果拉取到的消息偏移量小于下一个要拉取的物理偏移量,跳过该条消息

3、检查offsetPy,拉取的偏移量是否在磁盘上,maxOffsetPy-offsetPy > memory 的话,memory = 物理内存 * 这个比例

说明 offsetPy 这个偏移量的消息已经从内存中置换到磁盘中了

4、判断本次拉取任务是否完成

5、执行消息过滤

6、从commitLog文件中根据偏移量和消息大小读取消息 commitLog.getMessage

7、将读取到的消息结果添加到结果集

8、设置下次拉取任务开始nextBeginOffset offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)

9、如果是超过了常驻内存设置下次拉取从从服务器拉取

6、根据 response.getCode() 分别作出不同处理

1、SUCCESS 更新broker统计信息

isTransferMsgByHeap:是否在heap内存汇总直接转换,间获取到的byteBuffer在heap内存汇总转换为字节数组

普通IO进行转换,将bytebuffer转成字节数组,设置到response中去,使用堆内存来处理的

通过channel写入fileRegion

2、PULL_NOT_FOUND

1、brokerAllowSuspend:构建消息拉取是的拉取标记,默认true

2、是否支持长轮训,不支持设置1000ms作为下次拉取消息的等待时间

3、创建pullRequest 提交给ullRequestHoldService 线程去调度,触发消息拉取

4、设置response = null,此次调用不会向客户端输出任何字节,客户端网络请求客户端的读事件不会触发,不会触发对响应结果的处理,处于等待状态

7、判断它这次拉取消息请求里面带没带着消费offset,如果带着的话,就找到ConsumerOffset组件,然后更新一下消费offset

消息消费者除了定时任务5更新一下消费进度,还可以通过拉取消息的时候带着消费offset,进行消费进度的更新



版权声明:本文为liangshf520原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。