RocketMQ集群搭建

  • Post author:
  • Post category:其他



消息消费的过程:


Name Server是Broker的管理者,Broker会自动将自身的信息主动发送给Name Server。


生产消息:首先Producer首先向Name Server询问Broker的地址,然后将消息传递给Broker。


消费消息:Consumer消费消息则也需要指定需要消费的信息,而这个信息则存储在Broker中,所以Consumer消费之前要先向Name Server获取存储消息的Broker的地址,从而进行消费。


MQ的两种消费方式:


(1)Pull方式


由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能;


(2)Push方式


由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常。

集群的工作流程:

1.启动NameServer,NameServer启动后监听端口,等待Broker、Producer、Consumer连接,形成一个路由控制中心。

2.Broker启动,跟所有的NameServer保持长连接,定时发送心跳包;心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息;注册成功后,NameServer集群中就有Topic跟Broker的映射关系。

3.收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在那些Broker上,也可以在发送消息时自动创建Topic。

4.Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发送消息。

5.Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在那些Broker,然后直接跟Broker建立连接通道,开始消费消息。

集群搭建部署(双主双从):


1.服务器环境

序号 IP 角色 架构模式
1 192.168.1.10 nameserver、brokerserver Master1、Slave1
2 192.168.1.12 nameserver、brokerserver Master2、Slave2


2.配置环境变量

vim /etc/profile

#rocketmq
ROCKETMQ_HOME=/data/rocketmq/
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
export NAMESRV_ADDR=192.168.1.10:9876

source /etc/profile


3.创建消息存储路径

mkdir -p /data/rocketmq/store/

mkdir /data/rocketmq/store/{commitlog,consumequeue,index}


4.修改broker配置文件

如上图所示:


2m-2s-async:双主双从异步复制


2m-2s-sync:双主双从同步复制


2m-noslave:双主模式


192.168.1.10:

1)master

vim /data/rocketmq/conf/2m-2s-sync/broker-a.properties

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分隔
namesrvAddr=local10:9876;local12:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/data/rocketmq/store/broker-a
#commitLog 存储路径
storePathCommitLog=/data/rocketmq/store/broker-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/data/rocketmq/store/broker-a/consumequeue
#消息索引存储路径
storePathIndex=/data/rocketmq/store/broker-a/index
#checkpoint 文件存储路径
storeCheckpoint=/data/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/data/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

2)Slave

vim /data/rocketmq/conf/2m-2s-sync/broker-b-s.properties

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-a
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=local10:9876;local12:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口,
listenPort=10920
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/data/rocketmq/store/broker-b-s
#commitLog 存储路径
storePathCommitLog=/data/rocketmq/store/broker-b-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/data/rocketmq/store/broker-b-s/consumequeue
#消息索引存储路径
storePathIndex=/data/rocketmq/store/broker-b-s/index
#checkpoint 文件存储路径
storeCheckpoint=/data/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/data/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

192.168.1.12:

1)master

vim /root/rocketmq/conf/2m-2s-sync/broker-b.properties

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分隔
namesrvAddr=local10:9876;local12:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/root/rocketmq/store/broker-a
#commitLog 存储路径
storePathCommitLog=/root/rocketmq/store/broker-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/rocketmq/store/broker-a/consumequeue
#消息索引存储路径
storePathIndex=/root/rocketmq/store/broker-a/index
#checkpoint 文件存储路径
storeCheckpoint=/root/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/root/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#flushDiskType=ASYNC_FLUSH

2)Slave

vim /root/rocketmq/conf/2m-2s-syncbroker-a-s.properties

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-a
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=local10:9876;local12:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口,
listenPort=10920
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/root/rocketmq/store/broker-b-s
#commitLog 存储路径
storePathCommitLog=/root/rocketmq/store/broker-b-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/rocketmq/store/broker-b-s/consumequeue
#消息索引存储路径
storePathIndex=/root/rocketmq/store/broker-b-s/index
#checkpoint 文件存储路径
storeCheckpoint=/root/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/root/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128


5.修改启动脚本文件

1)runserver.sh

vim runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

2)runbroker.sh

vim runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"

3)tools.sh

vim tools.sh 

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"


6.服务启动

1)启动NameServer集群

分别在1.10和1.12上启动NameServer

nohup sh mqnamesrv &

2)启动Broker集群

  • 在1.10上启动master1和slave2

master1:

cd /data/rocketmq/bin

启动master1:
nohup sh mqbroker -c /data/rocketmq/conf/2m-2s-sync/broker-a.properties &

启动slave2:
nohup sh mqbroker -c /data/rocketmq/conf/2m-2s-sync/broker-b-s.properties &

验证:
[root@local10 bin]# jps
1281 NamesrvStartup
1316 BrokerStartup
1430 BrokerStartup
1503 Jps
  • 在1.12上启动master2和slave1

master2:

启动master2:
nohup sh mqbroker -c /root/rocketmq/conf/2m-2s-sync/broker-b.properties &

启动slave1:
nohup sh mqbroker -c /root/rocketmq/conf/2m-2s-sync/broker-a-s.properties &

验证:
[root@local12 bin]# jps
5492 Jps
5366 NamesrvStartup
5481 BrokerStartup
5403 BrokerStartup


7.查看日志

#查看nameserver日志
tail -500f ~/logs/rocketmqlogs/namesrv.log

#查看broker日志
tail -500f ~/logs/rocketmqlogs/broker.log


8.mqadmin管理工具

8.1 使用方式

进入RocketMQ安装位置,在bin目录下执行“ ./mqadmin {command} {args}”

8.2 命令介绍

参考网址:

https://www.cnblogs.com/zyguo/p/4962425.html


9.集群监控平台搭建

RocketMQ有一个对其扩展的开源项目

incubator-rocketmq-externals

,这个项目中有一个子模块叫rocketmq-console,这个便是管理控制台项目了,先将

incubator-rocketmq-externals

拉到本地,因为我们需要自己对rocketmq-console进行编译打包运行。


github地址:   https://github.com/apache/rocketmq-externals

1)下载此项目到本地并解压

unzip rocketmq-externals-master.zip

2)配置namesrv集群地址:

vim /data/rocketmq-externals-master/rocketmq-console/src/main/resources/application.properties

rocketmq.config.namesrvAddr=192.168.1.10:9876;192.168.1.12:9876

3)进入到rocketmq-console文件夹,打包

mvn clean package -Dmaven.test.skip=true

打包完成后会生成一个target的目录,其中会有一个名叫rocketmq-console-ng-1.0.0.jar的可执行jar文件。

4)上传jar文件到1.10上,运行

cp rocketmq-console-ng-1.0.0.jar /data/package
jar -jar rocketmq-console-ng-1.0.0.jar


10.启动成功后,我们就可以通过浏览器访问

http://localhost:8080

进入控制台界面



版权声明:本文为weixin_45203131原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。