kafka分布式集群搭建

  • Post author:
  • Post category:其他


一、版本

CentOS 7.5

zookeeper-3.4.12

kafka _2.12-1.1.0

二、zookeeper安装

1、下载解压zookeeper压缩包

tar -zvxf zookeeper-3.4.12.tar.gz

2、创建数据与日志文件夹

mkdir /usr/local/zookeeper-3.4.12/data
mkdir /usr/local/zookeeper-3.4.12/logs

3、复制配置文件

进入conf目录,复制zoo_sample.cfg

cp zoo_sample.cfg zoo.cfg

4、进入data目录,执行命令

echo 1 > myid

创建myid文件并输入值为1,依次在另外两台机器上执行同样的操作,myid的数值依次为2,3配置成功;

5、修改配置文件

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/usr/local/zookeeper-3.4.12/data
dataLogDir=/usr/local/zookeeper-3.4.12/logs
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
#集群服务器地址
server.1=IP1:2888:3888
server.2=IP2:2888:3888
server.3=IP3:2888:3888

6、启动zookeeper

sh zkServer.sh start

三 、kafka安装

1、下载并解压kafka压缩包

tar -zvxf  kafka_2.12-1.1.0.tgz

2、修改配置文件

打开kafka配置文件

vim server.properties

修改相关配置

# 服务器Id,设置为唯一数字。三台服务器可分别设置为1、2、3
broker.id=1

#监听地址
advertised.listeners=PLAINTEXT://IP地址:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

#kafka网络通信的线程数
num.network.threads=3

#kafka IO操作的线程数
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


#数据存储路径
log.dirs=/tmp/kafka-logs

#默认partition的数量
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

#集群状态下,为保证可用性,需要设置为大于1,这里设置为3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

############################# Log Flush Policy #############################


#日志保存时长
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=IP1:2181,IP2:2181,IP3:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
#group.initial.rebalance.delay.ms=0

# 是否自动创建主题 flase 否  true 是
auto.create.topics.enable=false

## 允许删除主题,默认是false
delete.topic.enable=true

3、启动kafka

bin/kafka-server-start.sh -daemon config/server.properties &

四、相关参数

broker 服务端 配置


message.max.bytes ( 默认:1M)

– broker能接收消息的最大字节数,该值应该大于等于生产者的max.request.size,小于等于消费者的fetch.message.max.bytes,否则broker就会因为消费端无法使用这个消息而挂起。


log.segment.bytes


(默认:1GB)

– kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。


replica.fetch.max.bytes (默认::1MB)

– broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。

Consumer 消费者 配置


fetch.message.max.bytes (默认 1MB)

– 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。所以,如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。

Producer 生产者 配置


buffer.memory(默认:32M)

– 生产者缓冲区大小设置,如果缓冲区足够大,生产者可以一直写入,但并不代表消息被真正send;


batch.size(默认:16384 byte)

– 每个数据包的大小设置,数据包达到指定大小后就可以被发送了,缓冲区内会存在多个数据包;


linger.ms

–  如果数据包大小一直没有达到batch.size,设置最多等待多久,消息会发送出去;


max.request.size(默认:1M)




生产者一次发送数据的最大大小,它的值要大于batch.size 的大小

五、注意事项

1、

为保证

所有分区可用,offsets.topic.replication.factor至少配置为3;

2、关闭自动创建主题,同时尽量保证集群所有broker启动后,再开始客户端消费,否则无法保证partition及其副本均匀分布,影响高可用;

3、集群启动后,可通过命令查看分区及其副本分布情况;

bin/kafka-topics.sh --describe --zookeeper localhost:2182 --topic __consumer_offsets


关注微信公众号,查看更多技术文章。

转载于:https://www.cnblogs.com/dafanjoy/p/11394474.html