kafka学习记录—Broker(服役、退役节点,kafka副本,文件存储)

  • Post author:
  • Post category:其他




kafka学习记录—Broker



Broker工作流程



zookeeper存储的kafka信息


  1. 启动zookeeper

    [root@hadoop103 bin] ./zkCli.sh 
    
    WATCHER::
    
    WatchedEvent state:SyncConnected type:None path:null
    [zk: localhost:2181(CONNECTED) 0] 
    
    #查看信息
    [zk: localhost:2181(CONNECTED) 2] ls /
    [kafka, zookeeper]
    

    连接prettyzoo

    1. /kafka/brokers/ids 【0,1,2】:记录有哪些服务器
    2. /kafka/brokers/topics/first/atitions/0/state:记录谁是leader,有哪些服务器可用
    3. /kafka/cluster/id
    4. /kafka/consumers:offset存储在kafka主题中
    5. /kafka/controller:辅助选举谁是leader
    6. /kafka/config

  2. broker总体流程

    上面是zookeeper集群,下面是kafka集群。每台kafka的broker节点启动之后都会在zookeeper进行注册,选择controller节点(选举leader,在isr活着为前提,按照AR(kafka分区所有副本统称)排在前面的优先),controller讲节点信息上传到zookeeper,生产者发送数据到broker,follower主动与leader同步信息,



生产经验—节点服役和退役


  1. 服役新节点

    【取巧】:直接克隆hadoop103。配置完主机名称和ip,利用xshell连接,【注意!!】:讲kafka目录下的datas和logs删除(是dadoop103的数据,不删除不能同时上线)。


  2. 执行负载均衡操作

    • 创建要均衡的主题

      [root@hadoop103 bin] vim topics-to-move.json
      
      {
         "topics":[
             {"topic":"first"}
         ],
         "version":1
      }
      
    • 生成一个负载均衡的计划

      [root@hadoop103 bin] bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
      

      报错:Error: The input string is not a valid JSON

      kafka.admin.AdminOperationException: The input string is not a valid JSON

      at kafka.admin.ReassignPartitionsCommand



      .

      p

      a

      r

      s

      e

      T

      o

      p

      i

      c

      s

      D

      a

      t

      a

      (

      R

      e

      a

      s

      s

      i

      g

      n

      P

      a

      r

      t

      i

      t

      i

      o

      n

      s

      C

      o

      m

      m

      a

      n

      d

      .

      s

      c

      a

      l

      a

      :

      1272

      )

      a

      t

      k

      a

      f

      k

      a

      .

      a

      d

      m

      i

      n

      .

      R

      e

      a

      s

      s

      i

      g

      n

      P

      a

      r

      t

      i

      t

      i

      o

      n

      s

      C

      o

      m

      m

      a

      n

      d

      .parseTopicsData(ReassignPartitionsCommand.scala:1272) at kafka.admin.ReassignPartitionsCommand






      .


      p


      a


      r


      s


      e


      T


      o


      p


      i


      c


      s


      D


      a


      t


      a


      (


      R


      e


      a


      s


      s


      i


      g


      n


      P


      a


      r


      t


      i


      t


      i


      o


      n


      s


      C


      o


      m


      m


      a


      n


      d


      .


      s


      c


      a


      l


      a




      :








      1


      2


      7


      2


      )


      a


      t


      k


      a


      f


      k


      a


      .


      a


      d


      m


      i


      n


      .


      R


      e


      a


      s


      s


      i


      g


      n


      P


      a


      r


      t


      i


      t


      i


      o


      n


      s


      C


      o


      m


      m


      a


      n


      d





      .parseGenerateAssignmentArgs(ReassignPartitionsCommand.scala:722)

      at kafka.admin.ReassignPartitionsCommand



      .

      g

      e

      n

      e

      r

      a

      t

      e

      A

      s

      s

      i

      g

      n

      m

      e

      n

      t

      (

      R

      e

      a

      s

      s

      i

      g

      n

      P

      a

      r

      t

      i

      t

      i

      o

      n

      s

      C

      o

      m

      m

      a

      n

      d

      .

      s

      c

      a

      l

      a

      :

      583

      )

      a

      t

      k

      a

      f

      k

      a

      .

      a

      d

      m

      i

      n

      .

      R

      e

      a

      s

      s

      i

      g

      n

      P

      a

      r

      t

      i

      t

      i

      o

      n

      s

      C

      o

      m

      m

      a

      n

      d

      .generateAssignment(ReassignPartitionsCommand.scala:583) at kafka.admin.ReassignPartitionsCommand






      .


      g


      e


      n


      e


      r


      a


      t


      e


      A


      s


      s


      i


      g


      n


      m


      e


      n


      t


      (


      R


      e


      a


      s


      s


      i


      g


      n


      P


      a


      r


      t


      i


      t


      i


      o


      n


      s


      C


      o


      m


      m


      a


      n


      d


      .


      s


      c


      a


      l


      a




      :








      5


      8


      3


      )


      a


      t


      k


      a


      f


      k


      a


      .


      a


      d


      m


      i


      n


      .


      R


      e


      a


      s


      s


      i


      g


      n


      P


      a


      r


      t


      i


      t


      i


      o


      n


      s


      C


      o


      m


      m


      a


      n


      d





      .handleAction(ReassignPartitionsCommand.scala:236)

      at kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:206)

      at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)


      解决:

      服役新节点后,生成负载均衡计划报错:The input string is not a valid JSON,查看资料新建的文件只包括计划分区情况,修改文件后重新生成成功。

    创建副本存储计划

    [root@hadoop103 bin] vim increase-replication-factor.json
    #将生成的计划放进去
    

    执行副本存储计划

    [root@hadoop103 bin] bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --reassignment-json-file increase-replication-factor.json --execute
    

    验证计划

    [root@hadoop103 bin] x bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --reassignment-json-file increase-replication-factor.json --verify
    

  3. 退役旧节点

    执行负载均衡操作

    将hadoop104停掉



kafka副本


  1. 副本基本信息

    • 作用:提高数据可靠性
    • 默认一个副本,生产环境中一般配置两个副本(配置太多降低效率,增加压力)
    • 副本分为leader和follower,有别于hadoop,hadoop副本三个是等价的,操作的对象只针对leader
    • 统称为AR=ISR+OSR。
    • ISR:表示和leader保持同步的follower集合,长期未发送请求则被踢出(30s),OSR:延迟过多的副本

  2. leader选举流程

    controller监听节点变化(若kafka节点挂掉,zookeeper中的id也会停掉),选举新的leader。

    #创建四个分区,四个副本
    [root@hadoop101 kafka] bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --topic qiyou01 --partitions 4 --replication-factor 4
    
    #查看
    [root@hadoop101 kafka] bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic qiyou01
    Topic: qiyou01	TopicId: n6ufjP7GRSKngztIX-aLog	PartitionCount: 4	ReplicationFactor: 4	Configs: segment.bytes=1073741824
    	Topic: qiyou01	Partition: 0	Leader: 3	Replicas: 3,1,0,2	Isr: 3,1,0,2
    	Topic: qiyou01	Partition: 1	Leader: 1	Replicas: 1,0,2,3	Isr: 1,0,2,3
    	Topic: qiyou01	Partition: 2	Leader: 0	Replicas: 0,2,3,1	Isr: 0,2,3,1
    	Topic: qiyou01	Partition: 3	Leader: 2	Replicas: 2,3,1,0	Isr: 2,3,1,0
    

    将hadoop104停掉

    [root@hadoop104 kafka] bin/kafka-server-stop.sh 
    [root@hadoop104 kafka] jps
    11105 Jps
    3506 Kafka
    [root@hadoop104 kafka] jps
    11136 Jps
    

    再次查看hadoop101

    [root@hadoop101 kafka] bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic qiyou01
    Topic: qiyou01	TopicId: n6ufjP7GRSKngztIX-aLog	PartitionCount: 4	ReplicationFactor: 4	Configs: segment.bytes=1073741824
    	Topic: qiyou01	Partition: 0	Leader: 1	Replicas: 3,1,0,2	Isr: 1,0,2
    	Topic: qiyou01	Partition: 1	Leader: 1	Replicas: 1,0,2,3	Isr: 1,0,2
    	Topic: qiyou01	Partition: 2	Leader: 0	Replicas: 0,2,3,1	Isr: 0,2,1
    	Topic: qiyou01	Partition: 3	Leader: 2	Replicas: 2,3,1,0	Isr: 2,1,0
    

    现象:停掉哪个broker,leader会顺序后延,如上所示。回复hadoop104,在最后面加入ISR队列。


  3. follower故障处理

    • LEO:每个副本的最后一个offset,LEO是最新的offset+1。

      在这里插入图片描述

    • HW:所有副本中最小的LEO。

      在这里插入图片描述

    • follower挂了,被踢出ISR,其他leader和follower继续接收数据,恢复后读取上次的HW,将lod文件高于HW的截掉,从HW开始向leader同步。等到该LEO大于等于该patition的HW,即追上leader,重新加入ISR。

      在这里插入图片描述


  4. leader出故障

    在ISR选举新的leader,保证数据一致性,其余follower将高于HW的log文件截掉,从新的leader同步数据,【注意!!】:不能保证数据不丢失。


  5. 分区副本的分配

    尽量均匀的分配


  6. leader patition自动平衡

    默认是true

    默认是10%,超过该值,控制器出发leader的平衡

    默认值300s,检查leader负载是否平衡的间隔时间


  7. 增加副本因子

    某个主题等级提升,考虑增加副本

    • 创建主题

    • 手动增加副本存储

    • 执行副本计划



文件存储


  1. 文件存储机制


    在这里插入图片描述

    一个topic分为多个patition,一个patition分为多个segment,一个segment包含:.log日志文件,.index偏移量索引文件,.timeindex时间戳索引文件,其他文件。

    命名规则:topic名称+分区序号:first-0。

  2. [root@hadoop101 kafka] cd datas/
    [root@hadoop101 datas] ll
    总用量 16
    -rw-r--r--. 1 root root   0 422 15:29 cleaner-offset-checkpoint
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-1
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-10
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-13
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-16
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-19
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-22
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-25
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-28
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-31
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-34
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-37
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-4
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-40
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-43
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-46
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-49
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-7
    drwxr-xr-x. 2 root root 204 425 16:23 first-0
    -rw-r--r--. 1 root root   4 425 20:01 log-start-offset-checkpoint
    -rw-r--r--. 1 root root  88 425 16:23 meta.properties
    drwxr-xr-x. 2 root root 167 425 16:52 qiyou01-0
    drwxr-xr-x. 2 root root 167 425 16:52 qiyou01-1
    drwxr-xr-x. 2 root root 167 425 16:52 qiyou01-2
    drwxr-xr-x. 2 root root 167 425 16:52 qiyou01-3
    -rw-r--r--. 1 root root 468 425 20:01 recovery-point-offset-checkpoint
    -rw-r--r--. 1 root root 468 425 20:02 replication-offset-checkpoint
    [root@hadoop101 datas] cd first-0/
    [root@hadoop101 first-0] ll
    总用量 20
    -rw-r--r--. 1 root root 10485760 425 16:23 00000000000000000000.index
    -rw-r--r--. 1 root root      438 422 16:15 00000000000000000000.log
    -rw-r--r--. 1 root root 10485756 425 16:23 00000000000000000000.timeindex
    -rw-r--r--. 1 root root       10 423 00:06 00000000000000000006.snapshot
    -rw-r--r--. 1 root root        8 425 16:23 leader-epoch-checkpoint
    -rw-r--r--. 1 root root       43 422 15:57 partition.metadata
    

    直接查看log日志是乱码,用工具

    [root@hadoop101 first-0] kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index
    Dumping ./00000000000000000000.index
    offset: 0 position: 0
    
    #查看000....log
    [root@hadoop101 first-0] kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log
    Dumping ./00000000000000000000.log
    Starting offset: 0
    baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1650614951198 size: 73 magic: 2 compresscodec: none crc: 1028480518 isvalid: true
    baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 73 CreateTime: 1650615093214 size: 75 magic: 2 compresscodec: none crc: 1935213717 isvalid: true
    baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 148 CreateTime: 1650615105104 size: 73 magic: 2 compresscodec: none crc: 2561039741 isvalid: true
    baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 221 CreateTime: 1650615108653 size: 73 magic: 2 compresscodec: none crc: 2173917041 isvalid: true
    baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 294 CreateTime: 1650615341320 size: 72 magic: 2 compresscodec: none crc: 1877835761 isvalid: true
    baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 366 CreateTime: 1650615344197 size: 72 magic: 2 compresscodec: none crc: 3585889592 isvalid: true
    

    【注意!!】:原理:index为稀疏索引(方式),每往log写入4kb数据,会往index写入一条索引。index文件中保存的为相对offset,保证控件占用不会过大。

    在这里插入图片描述

    文件清除策略



版权声明:本文为qq_53320067原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。