1、Kafka特点与定位
1.1、Kafka简介
- kafka的设计定位是“分布式事件流平台”,适用于所有产生事件或者基于事件触发运转的业务场景,而消息的发布订阅可以理解为事件流的一种场景,所以kafka在设计上更多的定位于底层能力、吞吐量、稳定性的保证
- kafka是一个分布式流式处理平台,它以高吞吐、可持久化、可水平拓展、支持流数据处理等多种特性而被广泛使用,这与它所扮演的三个角色密不可分:
消息系统。Kafka 和传统的消息系统都具备消息解耦、冗余存储、流量削峰、缓冲、异步通信、拓展性、可恢复性等功能,与此同时,kafka还提供了回溯消费等拓展功能。
存储系统。kafka把消息持久化到磁盘,相比于内存存储的系统,能更有效地防止消息丢失。而且基于kafka的多副本和磁盘持久化机制,kafka具备较强的容错性。
流式处理平台。kafka不仅可以作为有效的消息和存储系统,还提供了完整的流式处理类库,如窗口、连接、变换、聚合等操作,且已经与多个平台(如spark)整合。
1.2、典型应用场景
-
异步通信
- 将业务中属于非核心或不重要的流程部分,使用消息异步通知的方式发给目标系统,这样主业务流程无需同步等待其他系统的处理结果,从而达到系统快速响应的目的。如事件中心。
-
错峰流控与流量削峰
- 在大型系统中,上下游系统处理能力存在差异,处理能力高的上游系统突发流量可能会对处理能力低的某些下游系统造成冲击,需要提高系统的可用性的同时降低系统实现的复杂性。消息队列提供亿级消息堆积能力,当流量洪峰突然来袭时,可以通过队列服务堆积信息,在下游系统有能力处理消息的时候再处理,避免下游订阅系统因突发流量崩溃。
-
日志同步。Kafka设计初衷就是为了应对大量日志传输场景
- 日志大数据,日志计算及检索支持(全文检索与全链路追踪)
- binlog同步
- 事件回溯
- 事件流处理
1.3、几种MQ横向对比
2、Kafka整体架构
一个典型的 kafka 集群如上图所示,包括zookeeper、broker、producer和consumer
- zookeeper,负责管理集群的元数据;
- broker,负责接收producer发出的消息并保存,并提供给consumer 消费;
- producer,消息生产者,发送消息到 broker;
- consumer,消息消费者,从 broker 拿消息进行消费。
2.1、broker & zookeeper
broker 是 kafka 集群中的枢纽,消息的流转和保存等操作就是在这里发生的,也是前述分布式和横向拓展的基础。broker 可以理解为一个物理概念,大多数情况下可以将每一台 kafka 服务器都可以看成是一个 broker,无论是 producer还是consumer,消息的最终交互就是在 broker 上进行的,且每一个 broker 都有自己唯一的 id,通过这个 id 来相互区分。不同的 broker 之间组成一个分布式集群,zookeeper负责维护集群的元数据信息。
2.2、 topic & partition & replica
对于消息系统来说,真正与生产者和消费者打交道的是消息队列,而无关具体的broker,为此 kafka 引入了 topic/partition/replica 这三个概念。
topic(主题)
- 可以理解为消息队列的名称,是一个逻辑概念。producer和consumer直接的交互对象就是topic,producer往 topic 中发送消息,consumer 从 topic 中消费消息。
partition(分区)
- topic 的物理分区,存在于 broker 上。单个 topic 可以对应多个 partition,这也就实现了分布式消息系统的上层基础,producer 往 topic 中发送消息的时候是给多个 partition 中的一个发送消息,同一个 topic 的不同 partition 上存储的往往是不同的消息,这样当发送消息量增加的时候就可以分布到不同的 partition 上,由此实现 producer 端的高吞吐量。另一方面,consumer 消费消息的时候也可以从不同的 partition 上获取消息,多个消费者还会进行负载均衡,实现消费端的高吞吐。
replica(副本)
- 为了防止 partition 单点故障而存在。通常一个 partition 可以对应多个 replica,其中包括一个 leader 和若干 follower,leader 负责读写请求,follower 负责同步 leader 上数据。前述说消息发到 topic 的某个 partition 上,其实不够准确,应该说发送到 topic 的某个 partition 的 leader replica 上(消费者也同理),当 leader replica 宕机的时候 follower replica 就会提升为 leader replica ,保障整个 kafka 集群继续有效运转。这些信息也会保存在 zookeeper 中。
至此,kafka集群 server 部分就算分析完了,简单总结一下。
-
zookeeper
- 维护整个集群的元数据信息
-
broker
- kafka 集群的物理基础
-
topic
- 消息队列的抽象,kafka server 与 client(生产者和消费者)交互的逻辑概念
-
partition
- topic 的物理分区,存在于 broker 上,且单个 topic 可以对应多个 partition。broker 保证了物理分布式的基础,partition 则保证了逻辑分布式的基础,进而实现高吞吐。
-
replica
- 防止 partition 单点故障而存在的多个 partition 副本,以一主多从的形式存在。leader 副本负责而处理客户端的请求,follower 副本同步 leader 副本的数据。
3、生产者-Producer
从组成结构上来看,Producer 由主线程和 Sender 线程协调完成消息发送,且主线程发出的消息并不会立刻经由 Sender 线程发出,而会暂存于消息累加器 RecordAccumulator 中,Sender 线程从RecordAccumulator 中获取消息并组装成请求 Request 通过 Selector 发送至 kafka 集群,以下逐一分析。
为了确保消息发送成功,Producer 也需要等待kafka服务端的响应,目前存在3个等级的响应确认:
-
acks=0
- 发送后不论成功与否都立即返回,可靠性较低;
-
acks=1
- 发送后等待 leader 副本写入即返回,可靠性较高,在leader写入消息之后还没来得及同步到follower 时leader宕机,此时会发生消息丢失;
-
ack=-1/(ALL)
- 发送后需要等待全部副本写入才返回,可靠性极高,但是对应的吞吐量会严重下降。而且如果此时 follower 宕机了或者网络阻塞导致迟迟拿不到响应,会进一步影响性能。
为了解决这个问题,Kafka 引入 ISR(In-Sync Replicas)的概念。ISR 是一个副本集合,集合中包括所有与 leader 副本满足一定程度同步的副本(也包括leader副本自身),处于 ISR 集合中的副本如果落后leader副本太多或者宕机就会被剔除 ISR 集合,同样集合外的副本如果跟 leader 副本同步了也会加入 ISR 集合,当前同步程度是通过副本上一次与 leader 副本同步的时间与当前时间差值来决定的。
当ack=-1时并不要求所有 follower 副本都完成同步,只需要 ISR 集合中的副本完成同步即可,因为 ISR 集合中的副本可以认为与 leader 副本保持较好的同步,而且 ISR 会根据 follower 副本状态进行更新,因此整个同步过程并不会花费很长时间。此外,为了防止 ISR 变成只有 leader 副本的极端情况,可以设置 ISR 集合最少元素个数来进一步保证可靠性。
- 除了用于ack确认,leader副本宕机之后也会默认选择 ISR 集合中的副本来作为新的leader,以此保障消息尽可能少的丢失,在后续 consumer 消费的过程中也会起到相关作用。
4、消费者-comsumer
4.1、Push or Pull
首先强调一点,Push 和 Pull 没有绝对的优劣之分(如果有,那就应该不存在这个分歧了),这里只是客观的分析 Pull 方式的优劣。
- Pull 优势
- 缓解消息系统服务器端压力
首先 Pull 是客户端主动发起的一个动作,相比于 kafka server 去 Push,Consumer 主动 Pull 能缓解 server 的压力,尤其是当有大量客户端的时候,如果采取 Push 的方式,那么 server 需要维护每一个 Consumer 对应订阅的 topic 消息及其进度,并按需给对应的 Consumer 推送相应的消息,这明显不利于高吞吐。
2. Consumer 按需消费
Consumer 主动 Pull 也可以结合自身的性能决定消费快慢,如果 Consumer 性能强大,完全可以拉去更多的消息进行消费,反过来当 Consumer 性能受限的时候也可以少量的拉取消息。而如果采取 server Push 的方式,那么无论 Consumer 性能如何都必须被动接收(当然这一点并不绝对,RabbitMQ 可以控制每次最多给 Consumer 发送一条消息,待 Consumer 消费完成再发下一条消息)。尤其是当 Consumer 性能足够强的时候, server 端无法推送足够多的消息,此时也就难以实现高吞吐量。而采取 Pull 的方式,除了 server 带宽等因素,Consumer 完全可以通过横向拓展实现高吞吐量。
- Pull 劣势
- 时效性
既然是 Consumer 端主动 Pull 的方式来获取消息,那么消费到达的时刻并不一定跟 Consumer Pull 的时刻吻合,此时消息到达 kafka server 端之后必须要等待 Consumer 的下一次 Pull 才能被消费。举一个极端点的例子,Consumer 每1秒 Pull 一次消息,那么即便是这 1s 的前 100ms 到达的消息也只能等到这一秒到达之后才会被消费,时效性大打折扣(当然,实际上 Consumer 并不会这么僵硬)。与此相反的是 RabbitMQ,RabbitMQ 采取的是 Push 的方式,而且消息的延时是在 10^-6s 级别(参考:
Kafka专题讲解(一):Kafka的特点与定位
)。
从上面的分析可以看出,Pull 方式是 Kafka 实现高吞吐的一个重要基础,但 Pull 方式并非有绝对的优势,时效性就是一个劣势。kafka 的设计初衷是为了实现高吞吐,为此选择 Pull 的方式也是可以理解的。
4.2、Consumer & Consumer Group
kafka,由于消息被持久化了,因此理论上可以反复消费而无需重复发送,而 kafka 也正是这么做的。对于 kafka 中的每一个 topic,最基本的消费单元是消费者组,而不是单个的消费者。消费者组可以理解为一组消费者集合,且拥有相同的 group.id,不同消费者组之间对于 topic 的消费互不影响。如果需要重新消费,以一个全新的 consumer group 身份消费即可。前面所说的 RabbitMQ 案例可以理解为 kafka 的一个特例,即所有的 Consumer 都在同一个默认的全局 group 里面,所以当他们消费的时候是共同消费一个队列上的所有消息。
4.3、分配策略&再平衡
前面说到 kafka 中消费者组是最基本的消费单元,且一个消费者组中可以包含多个消费者。当他们去消费某个 topic 时,其实是去消费该 topic 对应的多个 partition 上面的消息,那么多个 Consumer 与多个 partition 该如何对应呢?目前 kafka 提供了以下几种分配策略。
(1)RangeAssignor
RangeAssignor也是默认的分配策略,其中最核心就是 assign 方法,其代码如下所示
- 第一个方框创建分配结果,是一个 map 结构,key 表征 Consumer,value 就是该 Consumer 分配的 partition。
- 第二个方框中提取 topic 及其对应的 Consumer,通常为了提升消费能力,一个 topic 允许对应多个 Consumer,并对该 topic 对应的 Consumer 进行了排序。
- 第三个方框中就是分配的算法核心了。partition 总数除以 Consumer 总数表示每个 Consumer 分配到的 partition 数量,但是不一定会整除,所以接下来用 partition 总数对 Consumer 总数取模,余数就表示对应的前面若干个 Consumer 会额外分配 1 个partition。为了更直观的说明分配过程,假设总共有 8 个partition,5个Consumer,那么每个Consumer会分配 8/5=1 个partition,另外 8%5=3 表示前面的3个Consumer会额外分配1个partition,所以最终分配结果是前面3个Consumer各分配2个partition,后面2个Consumer各分配1个partition。
-
第四个方框中表示具体的partition分配过程。根据第三步计算出每个Consumer 需要分配的 partition 数量,分配的时候将相邻的若干个 partition 分配给对应的 Consumer。仍然沿用第三步中的例子,分配结果如下:
- Consumer01 对应 partition01 & partition02
- Consumer02 对应 partition03 & partition04
- Consumer03 对应 partition05 & partition06
- Consumer04 对应 partition07
- Consumer05 对应 partition08
(2)RoundRobinAssignor
- 第一个方框同样是创建分配结果,不再赘述。
- 第二个方框遍历所有 topic 的所有分区,为了更直观的表述,假设 consumer01 订阅了 topic01 和 topic02,consumer02 订阅了 topic03,那么这里遍历的对象就是 topic01、topic02、topic03 对应的所有 partition。
- 第三个方框进行具体 partition 的分配,并且分配原则很简单,只要当前 partition 对应的 topic 被当前正在遍历到的 Consumer 订阅了,那么该 partition 就归属到该 Consumer 对应的消费范畴,否则尝试下一个 Consumer。
(3)StickyAssingor
该分配策略的主要目的是在分配尽可能平均的基础上,让分区的分配还尽可能与上次分配的结果相同,这样就减少了消费者与partition之间建立消费连接等过程的消耗.
4.4、位移(offset)提交 & 指定位移消费
a. 为什么需要位移(offset)
消费者组的概念结合kafka 持久化可以实现重复消费;另一方面,分配策略也给出了消费者组中消费者与 partition 之间的对应关系,现在每一个消费者就可以开始消费对应 partition 上面的消息了。
回到之前的问题,给一个 partition 分配多个 Consumer,虽然对于 kafka 来说理论可行,但是仍然选择单一 partition 对应单个 Consumer,即便如此,这个 Consumer 本身仍然需要记录自身的消费进度,否则下一次拉取消息或者再均衡后将该 partition 分配给消费者组中其余的 Consumer 时就不知从何处开始消费了。
为此,kafka 中的每条消息都有一个
offset(位移)
信息,用于表示消息在 partition 中的位置(该信息是相对于 partition 而言的,不同 partition 可以存在相同 offset 的消息)。
消费者在消费消息的过程中会获取尚未消费的消息,这就要求记录上一次消费的 offset,并持久化保存;与此同时,消费者消费完消息后需要更新该消费 offset。在旧版本(0.9之前)消费者客户端中,offset 信息是保存在 zookeeper 中,但是由于读写性能以及羊群效应等原因,在新版本(0.9及其之后的版本)中 offset 信息保存在内部主题__consumer_offsets 中,充分借助了 kafka 自身的高吞吐和持久化等特性。offset 信息不仅跟 topic/partition 有关,还跟 consumer_group 有关,这也跟之前的消费者组是消费的基本单元是保持一致的。
b. offset 提交方式
consumer 提交 offset 的方式分为自动提交和手动提交。自动提交完全依赖 kafka 客户端来完成,按照指定的时间间隔自动提交消费者拉取到的最大消息的 offset,简单易用,但是可能会造成消费丢失和重复消费的问题,如消息拉取之后尚未消费或者消费失败,但是 offset 已经提交,那么就会出现消费丢失的情况,另一方面消息已经消费了但是在提交 offset 之前消费者宕机,此时就会出现重复消费的问题。
4.5、LEO & HW
消费者消费消息的时候虽然是从 partition 中获取消息,但是根据最初 kafka 的结构图可知 partition 又包括 leader replica 和若干 follower replica,其中 leader replica 负责接收用户请求,而 follower replica 则负责同步日志以及在 leader replica 宕机时升级为 leader replica 继续对外提供服务,而且为了防止 leader replica 和 follower replica 之间同步相差太多还引入了 ISR 的概念。那么可能会出现这样一个情景:
当生产者发送消息给 leader replica 时,假设该消息前一条消息的 offset 记为x,那么该消息的 offset=x+1,consumer 消费了该条消息,并提交 offset=x+1,此时 follower replica 尚未完成同步。如果此时 leader replica 宕机,当 consumer 继续消费的时候就只能从 x+2 开始消费,那么无论发送到之前为 follower replica(现在升级为 leader replica)的下一条消息是什么都无法消费,因为这条消息的 offset=x+1,而消费者只能从offset=x+2 的地方开始消费。
为了解决这个问题,kafka 引入了 LEO 和 HW 两个概念。
- LEO,log end offset,partition replica 中最后一条消息的偏移量;
- HW,high watermark,ISR 中最小的 LEO,如下图所示。
消费者只能消费 HW 及其之前的消息,HW 之后的消息无法消费,这样就保证了消费者消费的消息已经被 ISR 中的所有副本同步了,也就避免了前述消费到尚未同步的消息并导致后续消费丢失的问题。
如果 leader replica 宕机,系统默认会从 ISR 集合中的其他 replica 中选择一个提升为 leader replica,此时其余的 follower replica 会截取 HW 之后的log,并根据最新的 leader replica 复制 HW 之后的日志,这样保证了 HW 之前的日志一定是同步的,即便再次消费也仍然是相同的消息,但是对于 HW 之后的消息则可能会丢失,因为新的 leader replica 可能并不具备全部的消息。