文章目录
0. 概述
本文旨在借助对
ProduceRequest
和
MetadataRequest
两种请求的请求链路的分析,得出kafka客户端网络模型的通用步骤。
1. KafkaProducer数据发送流程
1.1 宏观流程
-
确认数据要发送到的 topic 的 metadata 是否存在,没有则需要获取相应的 metadata;
metadata持有一些重要信息,如topic有哪些分区,分区的leader副本在哪个节点等等。 -
序列化 record 的 key 和 value;
-
获取该 record 要发送到的 partition,partition有以下几种选择策略:
- 指明partition时,直接将指明的值直接作为 partiton 值;
- 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
- 既没有 partition 值又没有 key 值的情况下,使用round-robin算法,生成随机数与 topic 可用的 partition 总数取余得到 partition 值
-
向 accumulator 中追加 record 数据,accumulator的数据结构如下:
ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches
。
这里写入record,其实是向
ProducerBatch
持有的
MemoryRecordsBuilder
所持有的
DataOutputStream
对象写入key和value的字节数组
-
如果追加完数据后,对应的RecordBatch已经达到了 batch.size 的大小,换句话说,batch的剩余空间不足以添加下一条 Record,则唤醒
sender
线程发送数据。
1.2 微观流程
2. 元数据更新流程
2.1 流程
2.2 元数据变化动态感知
2.2.1 强制更新
- 初始化SocketChannel失败时
- 发送消息时
-
NetworkClient#handleTimedOutRequests
处理请求超时的时候 -
NetworkClient#handleDisconnections
处理连接断开的时候
强制更新的方式都是将
Metadata
对象的
needUpdate
属性设置为true,然后
sender
线程在poll轮询是检测到该值为true,就会进行强制更新。
Metadata
public final class Metadata {
// 下面两个值用于计算下一次更新时间
private final long refreshBackoffMs;
private final long metadataExpireMs;
// 是否强制更新
private boolean needUpdate;
}
2.2.2 周期更新
根据
refreshBackoffMs
和
metadataExpireMs
计算下一次更新的时间。
3. Kafka客户端通信模型
根据元数据更新请求和发送数据的分析,我们总结出了Kafka客户端网络通信模型的通用步骤如下:
3.1 连接
调用NetworkClient#initiateConnect方法,针对每一个broker node建立一个socketChannel,并存入Selector#channel这个map缓存中,结构为
Map<String, KafkaChannel> channels
3.2 发送
-
将request请求插入NetworkClient#inFlightRequests队列头部,inFlightRequests为每个broker维护了一个双端队列,表示已发送但未收到broker返回的request请求;
-
调用NetworkClient#doSend方法,最终将请求封装成
send
对象赋值刚给
KafkaChannel
的
send
属性,同时在selector上注册channel的写事件;
3.3 读写
首先,调用NetworkClient#poll方法,方法一开始会阻塞在Selector#select方法,select要返回需要满足以下条件之一:
- 有建SelectionKey准备就绪,at least one channel is selected;
- 调用selector的wakeup方法,this selector’s {@link #wakeup wakeup} method is invoked;
- 当前线程被中断,the current thread is interrupted;
- select(timeout)超时,the given timeout period expires.
然后,从channel读取来自broker的返回,将返回封装成NetworkReceive对象,放入selector#completeReceive队列。
最后,将channel持有的send属性对象,通过channel发送出去,发送后存入selector#completedSend队列。
3.4 处理返回
调用NetworkClient#handleCompletedReceives方法处理来自于broker的返回。
遍历selector#completeReceive队列,并根据从NetworkClient#inFlightRequests尾部弹出NetworkReceive对应的request,根据request中的APIkey等信息,解析出NetworkReceive中的有效返回信息。
这里,对于某个channel而言,NetworkClient#inFlightRequests队列和selector#completeReceive队列是有线性关系的。inFlightRequests队列中最早发出的请求,也会优先被broker返回并存入completedReceive队列。
参考:
https://zhuanlan.zhihu.com/p/66192451
https://zhuanlan.zhihu.com/p/66190242
https://zhuanlan.zhihu.com/p/66193894