学会 Kafka 分区 其实很简单

  • Post author:
  • Post category:其他


一、副本机制

Kafka在⼀定数量的服务器上对主题分区进⾏复制。

当集群中的⼀个broker宕机后系统可以⾃动故障转移到其他可⽤的副本上,不会造成数据丢失。

创建主题:

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic tp_demo_02 --partitions 2 --replication-factor 3

上面创建主题中的 –replication-factor 3 表示有3个副本,1个Leader + 2个 Follower

  1. 将复制因⼦为1的未复制主题称为复制主题。
  2. 主题的分区是复制的最⼩单元。
  3. 在⾮故障情况下,Kafka中的每个分区都有⼀个Leader副本和零个或多个Follower副本。
  4. 包括Leader副本在内的副本总数构成复制因⼦。
  5. 所有读取和写⼊都由Leader副本负责。
  6. 通常,分区⽐broker多,并且Leader分区在broker之间平均分配


Follower分区像普通的Kafka消费者⼀样,消费来⾃Leader分区的消息,并将其持久化到⾃⼰的⽇志中

允许Follower对⽇志条⽬拉取进⾏

批处理

同步节点定义:

  1. 节点必须能够维持与ZooKeeper的会话(通过ZooKeeper的⼼跳机制)
  2. 对于Follower副本分区,它复制在Leader分区上的写⼊,并且不要延迟太多

Kafka提供的保证是,只要有⾄少⼀个同步副本处于活动状态,提交的消息就不会丢失。


宕机如何恢复

  1. 少部分副本宕机
  2. 当leader宕机了,会从follower选择⼀个作为leader。当宕机的重新恢复时,会把之前commit的数据清空,重新从leader⾥pull数据。
  3. 全部副本宕机
  4. 当全部副本宕机了有两种恢复⽅式
  5. 等待ISR中的⼀个恢复后,并选它作为leader。(等待时间较⻓,降低可⽤性)
  6. 选择第⼀个恢复的副本作为新的leader,⽆论是否在ISR中。(并未包含之前leader commit的数据,因此造成数据丢失)

二、Leader 选举

先看一张图片,在这张图片中:

  • 分区P1的Leader是0,ISR是0和1
  • 分区P2的Leader是2,ISR是1和2
  • 分区P3的Leader是1,ISR是0,1,2。

⽣产者和消费者的请求都由Leader副本来处理。Follower副本只负责消费Leader副本的数据和Leader保持同步。

对于P1,如果0宕机会发⽣什么?

Leader副本和Follower副本之间的关系并不是固定不变的,在Leader所在的broker发⽣故障的时候,就需要进⾏分区的Leader副本和Follower副本之间的切换,需要选举Leader副本。


如何选举

如果某个分区所在的服务器除了问题,不可⽤,kafka会从该分区的其他的副本中选择⼀个作为新的Leader。之后所有的读写就会转移到这个新的Leader上。现在的问题是应当选择哪个作为新的Leader。

只有那些跟Leader保持同步的Follower才应该被选作新的Leader。

Kafka会在Zookeeper上针对每个Topic维护⼀个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是⼀些分区的副本。

只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的⽣产者。

如果这个集合有增减,kafka会更新zookeeper上的记录。

如果某个分区的Leader不可⽤,Kafka就会从ISR集合中选择⼀个副本作为新的Leader。

显然通过ISR,kafka需要的

冗余度较低

,可以容忍的失败数⽐较⾼。

假设某个topic有N+1个副本,kafka可以容忍N个服务器不可⽤。


为什么不⽤少数服从多数的⽅法

少数服从多数是⼀种⽐较常⻅的⼀致性算发和Leader选举法。

它的含义是只有超过半数的副本同步了,系统才会认为数据已同步;

选择Leader时也是从超过半数的同步的副本中选择。

这种算法需要较⾼的冗余度,跟Kafka⽐起来,浪费资源。

譬如只允许⼀台机器失败,需要有三个副本;⽽如果只容忍两台机器失败,则需要五个副本。

⽽kafka的ISR集合⽅法,分别只需要两个和三个副本。


如果所有的ISR副本都失败了怎么办

此时有两种⽅法可选:

  1. 等待ISR集合中的副本复活,
  2. 选择任何⼀个⽴即可⽤的副本,⽽这个副本不⼀定是在ISR集合中。需要设置 unclean.leader.election.enable=true

这两种⽅法各有利弊,实际⽣产中按需选择。

如果要等待ISR副本复活,虽然可以保证⼀致性,但可能需要很⻓时间。⽽如果选择⽴即可⽤的副本,则很可能该副本并不⼀致。


总结

Kafka中Leader分区选举,通过维护⼀个动态变化的ISR集合来实现,⼀旦Leader分区丢掉,则从ISR中随机挑选⼀个副本做新的Leader分区。

如果ISR中的副本都丢失了,则:

  1. 可以等待ISR中的副本任何⼀个恢复,接着对外提供服务,需要时间等待。
  2. 从OSR中选出⼀个副本做Leader副本,此时会造成数据丢失

三、分区重新分配

向已经部署好的Kafka集群⾥⾯添加机器,我们需要从已经部署好的Kafka节点中复制相应的配置⽂件,然后把⾥⾯的broker id修改成全局唯⼀的,最后启动这个节点即可将它加⼊到现有Kafka集群中。

问题:新添加的Kafka节点并不会⾃动地分配数据,⽆法分担集群的负载,除⾮我们新建⼀个topic。

需要⼿动将部分分区移到新添加的Kafka节点上,Kafka内部提供了相关的⼯具来重新分布某个topic的分区。

在重新分布topic分区之前,我们先来看看现在topic的各个分区的分布位置:


创建主题

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_re_01 --partitions 5 --replication-factor 1


查看主题信息

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_re_01Topic:tp_re_01 PartitionCount:5 ReplicationFactor:1 Configs:Topic: tp_re_01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: tp_re_01 Partition: 1 Leader: 0 Replicas: 0 Isr: 0Topic: tp_re_01 Partition: 2 Leader: 0 Replicas: 0 Isr: 0Topic: tp_re_01 Partition: 3 Leader: 0 Replicas: 0 Isr: 0Topic: tp_re_01 Partition: 4 Leader: 0 Replicas: 0 Isr: 0


在node11搭建Kafka

安装 JDK、Kafka,这里不需要安装Zookeeper

修改 Kafka 的配置config/server.properties

broker.id=1 zookeeper.connect=node1:2181/myKafka

启动 kafka

[root@node11 ~]# kafka-server-start.sh /usr/src/kafka_2.12-1.0.2/config/server.properties

注意观察node11上节点启动的时候的ClusterId,看和zookeeper节点上的ClusterId是否⼀致,如果是,证明node11和node1在同⼀个集群中。

node11启动的Cluster ID:

zookeeper节点上的Cluster ID:

在node1上查看zookeeper的节点信息(node11的节点已经加⼊集群了):


现在我们在现有集群的基础上再添加⼀个Kafka节点,然后使⽤Kafka⾃带的

kafka-reassign-partitions.sh ⼯具来重新分布分区。该⼯具有三种使⽤模式:

  1. generate模式,给定需要重新分配的Topic,⾃动⽣成reassign plan&



版权声明:本文为Firstlucky77原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。