Kafka入门之基础概念和环境搭建

  • Post author:
  • Post category:其他




概念

对kafka概念有所了解的可以直接跳过去看环境搭建。



kafka基本结构

消息生产者Producer、消费者Consumer、Kafka集群。

在这里插入图片描述



kafka基本概念


1.主题


主题就是对消息的一个分类。生产者将消息发送到特定主题,消费者订阅这个主题或主题的某些分区进行消费。


2.消息(Message/Record)


消息是kafka通信基本单位,由一个固定长度的消息头和一个可变长度的消息体构成。


3.分区和副本



分区

:将一组消息归纳为一个主题,而每个主题又分为一个或多个分区(Partition)。每个分区是一个有序队列。每个分区在物理上对应一个文件夹。每条消息追加到相应的分区中,是顺序写磁盘(速度超快)。


副本

:副本分布在集群的不同代理上,提高可用性。分区的副本与日志对象一一对应。


4.Leader副本与Follower副本


为了确保分区的多个副本之间数据的一致性,Kafka会选择该分区的一个副本作为Leader副本,其他副本为Follower副本。这样做简化了数据一致性。


5.偏移量


任何发布的消息都会被直接追加到日志文件尾部(文件后缀为.log)。每条消息在日志文件的位置都会对应一个按序递增的偏移量。偏移量是分区下严格有序的逻辑值,并不表示消息在磁盘上的物理位置。消费者可以通过控制消息偏移量来对消息进行消费。为保证消息被顺序消费,旧版消费者将消费偏移量保存在Zookeeper中,新盘消费者将消息偏移量保存在Kafka内部一个主题当中。当然消费者可以在自己系统中保存偏移量。


6.日志段


日志段是Kafka日志对象分片的最小单位。一个日志段对应磁盘上一个具体日志文件和两个索引文件。两个索引文件分别以”.index”和”.timeindex”作为文件名后缀,分别表示消息偏移量索引文件和消息时间戳索引文件。


7.代理


Kafka集群就是有1个多多个Kafka实例组成的,每个Kafka实例称为代理。每个代理都有唯一的标识id。Kakka代理是无状态的,所以通过ZK来维护集群状态。


8.生产者


负责将消息发送给代理,即就是向Kafka代理发送消息的客户端。


9.消费者和消费组


消费者以拉取方式拉取数据。每个消费者都属于一个特定的消费组。如果不指定消费组,则消费者默认属于默认消费组tet-consumer-group。同一条消息只能被同一个消费组中某个消费者消费,但不同消费组的消费者可同时消费该消息。如果想要广播,每个消费者属于不同消费组就行。单播,让所有消费组在同一个消费组。

1

0.ISR


ISR保存同步的副本列表,该列表中保存的是与Leader副本保持消息同步的所有副本对应的代理节点id。


11.Zookeeper


Kafka利用Zookeeper保存相应元数据信息(代理节点信息、主题信息、分区状态信息等)

加粗样式



环境搭建(我的是linux centos6.x)


1.确保自己安装了jdk

yum install java-1.8.0-openjdk.x86_64

接着配置JAVA环境变量

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/jre
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

通过

java -version

查看是否安装完成。


2.安装jps

yum install java-1.8.0-openjdk-devel.x86_64

这是用来查看有哪些java进程在运行。 zk启动后可以检查是否启动。


3.下载zookeeper

wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz

将该压缩包解压后进入zk目录。

先创建一个data目录,存日志用。

在这里插入图片描述

进入zk目录下的conf目录,里面有zoo_sample.cfg,备份后,修改该文件名称为zoo.cfg。

接着编辑

vim zoo.cfg

文件。dataDir的路径是

刚才你创建的dada全路径,别照抄!


在这里插入图片描述

然后返回上一级目录进入bin目录,启动ZK。

在这里插入图片描述


4.下载kafka

wget http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz

启动kafka,后台运行

bin/kafka-server-start.sh -daemon config/server.properties

你可能想这就完了?发现起不来,或者测试生产者消费者无法通信?接着往下看!



遇到的问题及解决


1.kafka无法申请过大内存


错误信息:

kafka启动:Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memor

解决:


vim /bin/kafka-server-start.sh

更改最大堆内存xmx,堆初始内存xms。

-Xmx256M -Xms128M


2.kafka运行过程中localhost.localdomain: 未知的名称或服务


错误信息:

ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)

java.net.UnknownHostException: localhost.localdomain: localhost.localdomain: 未知的名称或服务

        at java.net.InetAddress.getLocalHost(InetAddress.java:1475)

        at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:122)

        at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:146)

        at kafka.consumer.Consumer$.create(ConsumerConnector.scala:109)

解决:

我们需要编辑

config/server.properties




host.name=localhost

改为ip地址也就是

host.name=xxxx.xxxx.xxxx.xxxx



如果生产者创建不了 那么需要查看下面链接

错误信息:

WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

解决:

编辑

config/server.properties


在这里插入图片描述

listeners需要配置,然后根据配置连接topic。

原来如果是:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

改为

bin/kafka-console-producer.sh --broker-list master:9092 --topic test



参考文章


linux启动zk



生产者问题



kafka运行过程中localhost.localdomain: 未知的名称或服务



kafka启动:Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memor



版权声明:本文为qq_27602093原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。