文章目录
    
    
    
    0. 概述
   
    本文旨在借助对
    
     ProduceRequest
    
    和
    
     MetadataRequest
    
    两种请求的请求链路的分析,得出kafka客户端网络模型的通用步骤。
   
    
    
    1. KafkaProducer数据发送流程
   
    
    
    1.1 宏观流程
   
     
   
- 
确认数据要发送到的 topic 的 metadata 是否存在,没有则需要获取相应的 metadata; 
 
 metadata持有一些重要信息,如topic有哪些分区,分区的leader副本在哪个节点等等。
- 
序列化 record 的 key 和 value; 
- 
获取该 record 要发送到的 partition,partition有以下几种选择策略: - 指明partition时,直接将指明的值直接作为 partiton 值;
- 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
- 既没有 partition 值又没有 key 值的情况下,使用round-robin算法,生成随机数与 topic 可用的 partition 总数取余得到 partition 值
 
- 
向 accumulator 中追加 record 数据,accumulator的数据结构如下: 
 
 ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches
 
 。
 
 这里写入record,其实是向
 
 ProducerBatch
 
 持有的
 
 MemoryRecordsBuilder
 
 所持有的
 
 DataOutputStream
 
 对象写入key和value的字节数组
 
   
- 
如果追加完数据后,对应的RecordBatch已经达到了 batch.size 的大小,换句话说,batch的剩余空间不足以添加下一条 Record,则唤醒 
 
 sender
 
 线程发送数据。
    
    
    1.2 微观流程
   
    
    
     
   
    
    
    2. 元数据更新流程
   
    
    
    2.1 流程
   
     
   
     
   
    
    
    2.2 元数据变化动态感知
   
    
    
    2.2.1 强制更新
   
- 初始化SocketChannel失败时
- 发送消息时
- 
     
 NetworkClient#handleTimedOutRequests
 
 处理请求超时的时候
- 
     
 NetworkClient#handleDisconnections
 
 处理连接断开的时候
    强制更新的方式都是将
    
     Metadata
    
    对象的
    
     needUpdate
    
    属性设置为true,然后
    
     sender
    
    线程在poll轮询是检测到该值为true,就会进行强制更新。
   
Metadata
public final class Metadata {
	// 下面两个值用于计算下一次更新时间
    private final long refreshBackoffMs;
    private final long metadataExpireMs;
    // 是否强制更新
    private boolean needUpdate;
}
    
    
    2.2.2 周期更新
   
    根据
    
     refreshBackoffMs
    
    和
    
     metadataExpireMs
    
    计算下一次更新的时间。
   
    
    
    3. Kafka客户端通信模型
   
根据元数据更新请求和发送数据的分析,我们总结出了Kafka客户端网络通信模型的通用步骤如下:
    
    
    3.1 连接
   
    调用NetworkClient#initiateConnect方法,针对每一个broker node建立一个socketChannel,并存入Selector#channel这个map缓存中,结构为
    
     Map<String, KafkaChannel> channels
    
    
    
    3.2 发送
   
- 
将request请求插入NetworkClient#inFlightRequests队列头部,inFlightRequests为每个broker维护了一个双端队列,表示已发送但未收到broker返回的request请求; 
- 
调用NetworkClient#doSend方法,最终将请求封装成 
 
 send
 
 对象赋值刚给
 
 KafkaChannel
 
 的
 
 send
 
 属性,同时在selector上注册channel的写事件;
    
    
    3.3 读写
   
首先,调用NetworkClient#poll方法,方法一开始会阻塞在Selector#select方法,select要返回需要满足以下条件之一:
- 有建SelectionKey准备就绪,at least one channel is selected;
- 调用selector的wakeup方法,this selector’s {@link #wakeup wakeup} method is invoked;
- 当前线程被中断,the current thread is interrupted;
- select(timeout)超时,the given timeout period expires.
然后,从channel读取来自broker的返回,将返回封装成NetworkReceive对象,放入selector#completeReceive队列。
最后,将channel持有的send属性对象,通过channel发送出去,发送后存入selector#completedSend队列。
    
    
    3.4 处理返回
   
调用NetworkClient#handleCompletedReceives方法处理来自于broker的返回。
遍历selector#completeReceive队列,并根据从NetworkClient#inFlightRequests尾部弹出NetworkReceive对应的request,根据request中的APIkey等信息,解析出NetworkReceive中的有效返回信息。
这里,对于某个channel而言,NetworkClient#inFlightRequests队列和selector#completeReceive队列是有线性关系的。inFlightRequests队列中最早发出的请求,也会优先被broker返回并存入completedReceive队列。
参考:
https://zhuanlan.zhihu.com/p/66192451
https://zhuanlan.zhihu.com/p/66190242
https://zhuanlan.zhihu.com/p/66193894
 
