Kafka
一、官网
https://kafka.apache.org/intro
一、概述
Apache Kafka是一个分布式流处理平台
具备以下三种特性:
- 发布和订阅流式记录。类似于消息队列或者企业消息系统
- 存储流式数据,并且有较好的容错
- 流式数据处理
应用场景:
- 构造实时流数据管道,可以在系统或应用之间可靠的获取数据。(相当于MQ)
- 构建实时流式应用程序,对这些流数据进行转换或者影响。(流处理)
Kafka有四个核心API
- Producer API —Producer 消息的生产者
- Consumer API —- —Producer 消息的消费者
- Stream API
- Connector API
Consumer Group
包含一个或者多个Consumer,同一个消息只会被同一个消费组中一个消费者实例处理
不同的消费组可以同时消费同一条数据
Kafka架构图
-
topic:
主题表示某一种类别的消息(订阅的一个服务)Kafka流中记录都是以Topic作为一个分类去管理。 -
partition:
分区, 一种类别的消息会根据自定义的分片数量进行hash分片,没一个分片中会按照时间戳的顺序维持 一个队列。 -
Record:
Kafka流中的一则消息一般包含key、value、time stamp
Kafka是通过Topic对Record做分类管理,通常一个Topic会有0~n个订阅者订阅。一旦在订阅的Topic下产生新的Record,所有的订阅者都将消费该消息。对于每一个Topic底层Kafka集群都会以分区的形式存储
每一个分区会为每一条Record维护一份时序该时序是不可变更。每当使用API向Topic发送/写入一则记录的时候分区会会为该记录记录一次offset偏移量,该偏移量可以理解成为该Record在当前分区的唯一标示。
Kafka将按照’log.retention.hours’配置保留所有发布过来的Record无论该Record是否被消费。Record记录在规定的时间内 ”log.retention.hours”配置时间(默认1周168小时)是可以被消费者消费的,一旦操过规定时间内没有消费者消费Kafka中的消息,该消息会被系统自动清除。由于kafka的分布式磁盘存储,将数据长时间保留在磁盘中是没有太大问题的。
事实上kafka会记录每一个消费者消费的消息的offset偏移量,该偏移量是由客户端所控制的。正常情况下Consumer消费一则消息后对应该消费的者的偏移量会自动加1,但事实上消费者可以自由的控制offset偏移量来实现消息的重复消费,各个消费者之间相互不影响。
所有消息在Kafka集群中按照分区存储目的是为了针对海量数据的存储,一个Topic表示用户所关心的一类消息,Topic可以有很多的分区,因此无需担心海量日志的存储,其次每个分区运行在独立的机器上(broker),因此也会从某种意义上提升系统的并行度。
-
**leader:**Leader负责分区所有读取和写入的节点。每个节点随机选择的分区中的领导者。
Kafka消息的日志数据是通过分区的形式分布在各个server之上,每个服务器负责接收和处理用户请求。每一个分区都会在集群节点上设置副本,每个副本都有一个Leader和0~n个follower。Leader负责接收读写请求,所有的Follower负责备份主机的数据。如果leader宕机了,其中的Follower会自动的做选举,选举出Leader。每一台服务器在集群中都会扮演着其中某一个Partition数据的Leader也会扮演着其他Partition数据的Follower的角色。
生产者
消息的生产者负责生产消息到他们选择的Topic中去,由生产者决定消息存储到哪一个Topic中的哪一个Partition当中,可以在生产者那边定义一个负载均衡用于实现消息发送的负载均衡策略,如何实现负载均衡后续章节再进行讨论。
消费者
消费者是通过group的概念对自己做标记。因为不同的Group消费者之间相互独立。如果隶属于一个Group中有多个消费者,则消息会在多个消费者之间负载均衡的形式被发送给组内的消费者;如果不是在一个组内的消费者之间是广播形式发送。
-
Consumer Group 消费组
同一组内的消费者之间是通过对分区日志数据做负载均衡,kafka通过均分原则(“fair share”)将分区的数据分配给每一组内的消费者(一般消费者的数目不应该操过分区的数目)。如果一个组内的有消费者退出了,kafka会自动的原本属于该消费者的分区在动态分分配给组内其他消费者。
二、环境搭建
准备工作
- 安装JDK,并配置JAVA_HOME
- 同步时钟
- 安装ZooKeeper
- 安装HDFS集群
-
下载环境压缩包
-
安装配置
[root@nodeX ~]# tar -zxvf kafka_2.11-0.11.0.0.tgz -C /usr
-
修改配置文件
[root@nodeX ~]# vi /usr/kafka_2.11-0.11.0.0/config/server.properties ############################# Server Basics ############################# broker.id=0|1|2 delete.topic.enable=true ############################# Socket Server Settings ############################# listeners=PLAINTEXT://node1|node2|node3:9092 ############################# Log Basics ############################# log.dirs=/usr/kafka-logs ############################# Log Retention Policy ############################# log.retention.hours=168 ############################# Zookeeper ############################# zookeeper.connect=node1:2181,node2:2181,node3:2181
三、常用指令
服务相关
启动Kafka服务
[root@nodeX kafka_2.11-0.11.0.0]# bin/kafka-server-start.sh -daemon config/server.properties
停止Kafka服务
[root@nodeX kafka_2.11-0.11.0.0]# bin/kafka-server-stop.sh
Topic操作
创建Topic
[root@node1 kafka_2.11-0.11.0.0]# bin/kafka-topics.sh --create --zookeeper
node1:2181,node2:2181,node3:2181 --topic baizhi --partitions 3 --replication-factor 3
Created topic "baizhi".
展示所有Topic
[root@node2 kafka_2.11-0.11.0.0]# bin/kafka-topics.sh --list --zookeeper
node1:2181,node2:2181,node3:2181
baizhi
删除Topic
[root@node2 kafka_2.11-0.11.0.0]# bin/kafka-topics.sh --delete --topic zpark --zookeeper
node1:2181
Topic zpark is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
查看Topic详情
[root@node2 kafka_2.11-0.11.0.0]# bin/kafka-topics.sh --describe --zookeeper
node1:2181,node2:2181,node3:2181 --topic baizhi
Topic:baizhi PartitionCount:3 ReplicationFactor:3 Configs:
Topic: baizhi Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: baizhi Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: baizhi Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Leader负责指定分区所有读取和写入的节点。每个节点将是一部分随机选择的分区中的领导者。
Replicas是此分区日志的节点列表集合,不管这些节点是否是领导者或者只是还活着(不在in-sync状态)。
ISR是一组”in-sync” 节点列表的集合。这个列表包括目前活着并跟leader保持同步的replicas,Isr 是Repli