Kafka源码剖析:Kafka客户端网络通信模型

  • Post author:
  • Post category:其他




0. 概述

本文旨在借助对

ProduceRequest



MetadataRequest

两种请求的请求链路的分析,得出kafka客户端网络模型的通用步骤。



1. KafkaProducer数据发送流程



1.1 宏观流程

在这里插入图片描述

  1. 确认数据要发送到的 topic 的 metadata 是否存在,没有则需要获取相应的 metadata;

    metadata持有一些重要信息,如topic有哪些分区,分区的leader副本在哪个节点等等。

  2. 序列化 record 的 key 和 value;

  3. 获取该 record 要发送到的 partition,partition有以下几种选择策略:

    • 指明partition时,直接将指明的值直接作为 partiton 值;
    • 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
    • 既没有 partition 值又没有 key 值的情况下,使用round-robin算法,生成随机数与 topic 可用的 partition 总数取余得到 partition 值
  4. 向 accumulator 中追加 record 数据,accumulator的数据结构如下:

    ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches



    这里写入record,其实是向

    ProducerBatch

    持有的

    MemoryRecordsBuilder

    所持有的

    DataOutputStream

    对象写入key和value的字节数组

    在这里插入图片描述

  5. 如果追加完数据后,对应的RecordBatch已经达到了 batch.size 的大小,换句话说,batch的剩余空间不足以添加下一条 Record,则唤醒

    sender

    线程发送数据。



1.2 微观流程

在这里插入图片描述

在这里插入图片描述



2. 元数据更新流程



2.1 流程

在这里插入图片描述

在这里插入图片描述



2.2 元数据变化动态感知



2.2.1 强制更新
  1. 初始化SocketChannel失败时
  2. 发送消息时

  3. NetworkClient#handleTimedOutRequests

    处理请求超时的时候

  4. NetworkClient#handleDisconnections

    处理连接断开的时候

强制更新的方式都是将

Metadata

对象的

needUpdate

属性设置为true,然后

sender

线程在poll轮询是检测到该值为true,就会进行强制更新。

Metadata

public final class Metadata {
	// 下面两个值用于计算下一次更新时间
    private final long refreshBackoffMs;
    private final long metadataExpireMs;
    // 是否强制更新
    private boolean needUpdate;
}


2.2.2 周期更新

根据

refreshBackoffMs



metadataExpireMs

计算下一次更新的时间。



3. Kafka客户端通信模型

根据元数据更新请求和发送数据的分析,我们总结出了Kafka客户端网络通信模型的通用步骤如下:



3.1 连接

调用NetworkClient#initiateConnect方法,针对每一个broker node建立一个socketChannel,并存入Selector#channel这个map缓存中,结构为

Map<String, KafkaChannel> channels



3.2 发送

  • 将request请求插入NetworkClient#inFlightRequests队列头部,inFlightRequests为每个broker维护了一个双端队列,表示已发送但未收到broker返回的request请求;

  • 调用NetworkClient#doSend方法,最终将请求封装成

    send

    对象赋值刚给

    KafkaChannel



    send

    属性,同时在selector上注册channel的写事件;



3.3 读写

首先,调用NetworkClient#poll方法,方法一开始会阻塞在Selector#select方法,select要返回需要满足以下条件之一:

  • 有建SelectionKey准备就绪,at least one channel is selected;
  • 调用selector的wakeup方法,this selector’s {@link #wakeup wakeup} method is invoked;
  • 当前线程被中断,the current thread is interrupted;
  • select(timeout)超时,the given timeout period expires.

然后,从channel读取来自broker的返回,将返回封装成NetworkReceive对象,放入selector#completeReceive队列。

最后,将channel持有的send属性对象,通过channel发送出去,发送后存入selector#completedSend队列。



3.4 处理返回

调用NetworkClient#handleCompletedReceives方法处理来自于broker的返回。

遍历selector#completeReceive队列,并根据从NetworkClient#inFlightRequests尾部弹出NetworkReceive对应的request,根据request中的APIkey等信息,解析出NetworkReceive中的有效返回信息。

这里,对于某个channel而言,NetworkClient#inFlightRequests队列和selector#completeReceive队列是有线性关系的。inFlightRequests队列中最早发出的请求,也会优先被broker返回并存入completedReceive队列。

参考:

https://zhuanlan.zhihu.com/p/66192451

https://zhuanlan.zhihu.com/p/66190242

https://zhuanlan.zhihu.com/p/66193894



版权声明:本文为a1240466196原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。