一、简介
1.概述:
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
主要应用场景是:
日志收集系统和消息系统。
Kafka主要设计目标如下:
- 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
- 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
- 同时支持离线数据处理和实时数据处理。
- Scale out:支持在线水平扩展
2.消息系统介绍:
一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。
有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消息系统选用发布-订阅模式,Kafka就是一种发布-订阅模式。
3.点对点消息传递模式介绍:
在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。这种架构描述示意图如下:
生产者发送一条消息到queue,只有一个消费者能收到。
4.发布-订阅消息传递模式介绍:
在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下:
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。
二、Kafka的优点
1.解耦:
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2.冗余(副本):
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
3.扩展性:
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
4.灵活性&峰值处理能力:
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
5.可恢复性:
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
6.顺序保证:
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。
7.缓冲:
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。
8.异步通信:
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
三、Kafka中的术语解释
1.概述:
在深入理解Kafka之前,先介绍一下Kafka中的术语。下图展示了Kafka的相关术语以及之间的关系:
上图中一个topic配置了3个partition。Partition1有两个offset:0和1。Partition2有4个offset。Partition3有1个offset。副本的id和副本所在的机器的id恰好相同。
如果一个topic的副本数为3,那么Kafka将在集群中为每个partition创建3个相同的副本。集群中的每个broker存储一个或多个partition。多个producer和consumer可同时生产和消费数据。
2.broker:
Kafka 集群包含一个或多个服务器,服务器节点称为broker。
broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
3.Topic:
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)类似于数据库的表名。
4.Partition:
topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
5.Producer:
生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
6.Consumer:
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
7.Consumer Group:
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
8.Leader:
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
9.Follower:
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。
四、安装与使用
安装环境:
kafka集群:
10.0.3.32
10.0.3.33
10.0.3.34
10.0.3.35
10.0.3.36
10.0.3.37
zookeeper集群:
10.0.3.32
10.0.3.33
10.0.3.34
1.下载kafka安装包:
http://kafka.apache.org/downloads
2.zookeeper集群:
https://blog.csdn.net/weixin_42789427/article/details/110853868
3.安装kafka:
1.解压安装包
[yundiao@nmhs-pp-nms003037 ~]$ tar zxf /data/packages/kafka_2.13-2.7.0.tgz -C /data/middleware/
2.编写配置文件(除了修改broker.id、listeners其他配置都一样)
[yundiao@nmhs-pp-nms003037 ~]$ cat /data/middleware/kafka/config/server.properties
---------
#需要修改的配置
#每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况
broker.id=32
###listeners配置解析https://www.codercto.com/a/68756.html、https://www.cnblogs.com/ElEGenT/p/12891114.html
#broker服务器要监听的地址及端口
#EXTERNAL外部通信地址
#INTERNAL内部通信地址
listeners=EXTERNAL://192.168.106.160:9192,INTERNAL://10.0.3.32:9193
#配置监听者的安全协议
listener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
inter.broker.listener.name=INTERNAL
###下面两个参数一个用于接收并处理网络请求的线程数,默认为3。其内部实现是采用Selector模型。启动一个线程作为Acceptor来负责建立连接,再配合启动num.network.threads个线程来轮流负责从Sockets里读取请求,一般无需改动,除非上下游并发请求量过大。一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.num.io.threads主要进行磁盘io操作,高峰期可能有些io等待,因此配置需要大些。配置线程数量为cpu核数2倍,最大不超过3倍.
#broker 处理消息的最大线程数,配置成CPU核数
num.network.threads=32
#配置成CPU核数的2倍,一般不超过3倍
num.io.threads=64
#kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能 /data/kafka-logs-1,/data/kafka-logs-2
log.dirs=/data/middleware/kafka/logs
##############################################
#一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
background.threads =4
#等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。
queued.max.requests =500
#broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
#host.name
#socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes=102400
#socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes=8388608
#socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes=1048576000
# A comma separated list of directories under which to store log files
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
#每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
num.partitions=18
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
#是否允许自动创建topic,若是false,就需要通过命令创建topic
default.replication.factor=3
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.
log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.
log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
#日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleanup.policy=delete
#数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略log.retention.bytes和log.retention.minutes或log.retention.hours任意一个达到要求,都会执行删除
log.retention.hours=72
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
#topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes=1073741824
#文件大小检查的周期时间,是否使用log.cleanup.policy中设置的策略
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
#zookeeper集群的地址,可以是多个,多个之间用逗号分割
zookeeper.connect=10.0.3.32:2181,10.0.3.33:2181,10.0.3.34:2181
#ZooKeeper的连接超时时间
zookeeper.connection.timeout.ms=20000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=3000
---------
3.挂在磁盘提供kafka数据使用
[root@nmhs-pp-nms003032 ~]# mkdir /data/middleware/kafka/logs
[root@nmhs-pp-nms003032 ~]# blkid /dev/sdb1
/dev/sdb1: UUID="7c9266f9-92a7-457d-8d50-d94a31a5a652" TYPE="xfs" PARTLABEL="primary" PARTUUID="7e32217e-2628-4240-b8eb-ef2b88811f66"
[root@nmhs-pp-nms003032 ~]# echo "UUID=7c9266f9-92a7-457d-8d50-d94a31a5a652 /data/middleware/kafka/logs xfs defaults 0 0" >> /etc/fstab
[root@nmhs-pp-nms003032 ~]# mount /dev/sdb1 /data/middleware/kafka/logs
[root@nmhs-pp-nms003032 ~]# chown -R yundiao.yundiao /data/middleware/middleware/kafka
4.启动kafka
[yundiao@nmhs-pp-nms003032 ~]$ /data/middleware/kafka/bin/kafka-server-start.sh -daemon /data/middleware/kafka/config/server.properties
-daemon 后台启动
5.停止kafka
[yundiao@nmhs-pp-nms003032 ~]$ /data/middleware/kafka/bin/kafka-server-stop.sh
6.zookeeper查看kafka集群:
[yundiao@nmhs-pp-nms003032 ~]$ /data/middleware/apache-zookeeper-3.6.2-bin/bin/zkCli.sh -server 10.0.3.32:2181
[zk: 10.0.3.32:2181(CONNECTED) 1] ls /brokers
[ids, seqid, topics]
[zk: 10.0.3.32:2181(CONNECTED) 2] ls /brokers/ids
[32, 33, 34, 35, 36, 37]
[zk: 10.0.3.32:2181(CONNECTED) 3]
4.测试kafka:
1.创建Topic
[yundiao@nmhs-pp-nms003032 ~]$ /data/middleware/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.3.32:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.
--zookeeper:为zk服务器地址,已逗号分割配置多个
--replication-factor:分区leader副本数,1代表没有副本即分区本身,建议为2
--partitions:分区数
--topic:topic名称
2.查看topic
[yundiao@nmhs-pp-nms003032 ~]$ /data/middleware/kafka/bin/kafka-topics.sh --list --zookeeper 10.0.3.32:2181
test
3.查看test topic消息
[yundiao@nmhs-pp-nms003032 ~]$ /data/middleware/kafka/bin/kafka-topics.sh --describe --zookeeper 10.0.3.32:2181 --topic test
Topic: test PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 36 Replicas: 36 Isr: 36
#leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
#Replicas:列出了所有的副本节点,不管节点是否在服务中.
#Lsr:是正在服务中的节点.
4.发布消息
[yundiao@nmhs-pp-nms003032 ~]$ /data/middleware/kafka/bin/kafka-console-producer.sh --broker-list 10.0.3.32:9193 --topic test
>1
>2
>3
>4
5.消费消息
[yundiao@nmhs-pp-nms003032 ~]$ /data/middleware/kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.0.3.32:9193 --topic test --from-beginning
1
2
3
4
#from-beginning:每次从头开始消费
6.删除topic
[yundiao@nmhs-pp-nms003032 ~]$ /data/middleware/kafka/bin/kafka-topics.sh --delete --zookeeper 10.0.3.32:2181 --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[yundiao@nmhs-pp-nms003032 ~]$ /data/middleware/kafka/bin/kafka-topics.sh --list --zookeeper 10.0.3.32:2181
__consumer_offsets
转载:kafka web安装
https://www.cnblogs.com/frankdeng/p/9584870.html