消息消费的过程:
Name Server是Broker的管理者,Broker会自动将自身的信息主动发送给Name Server。
生产消息:首先Producer首先向Name Server询问Broker的地址,然后将消息传递给Broker。
消费消息:Consumer消费消息则也需要指定需要消费的信息,而这个信息则存储在Broker中,所以Consumer消费之前要先向Name Server获取存储消息的Broker的地址,从而进行消费。
MQ的两种消费方式:
(1)Pull方式
由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能;
(2)Push方式
由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常。
集群的工作流程:
1.启动NameServer,NameServer启动后监听端口,等待Broker、Producer、Consumer连接,形成一个路由控制中心。
2.Broker启动,跟所有的NameServer保持长连接,定时发送心跳包;心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息;注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
3.收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在那些Broker上,也可以在发送消息时自动创建Topic。
4.Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发送消息。
5.Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在那些Broker,然后直接跟Broker建立连接通道,开始消费消息。
集群搭建部署(双主双从):
1.服务器环境
序号 | IP | 角色 | 架构模式 |
1 | 192.168.1.10 | nameserver、brokerserver | Master1、Slave1 |
2 | 192.168.1.12 | nameserver、brokerserver | Master2、Slave2 |
2.配置环境变量
vim /etc/profile
#rocketmq
ROCKETMQ_HOME=/data/rocketmq/
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
export NAMESRV_ADDR=192.168.1.10:9876
source /etc/profile
3.创建消息存储路径
mkdir -p /data/rocketmq/store/
mkdir /data/rocketmq/store/{commitlog,consumequeue,index}
4.修改broker配置文件
如上图所示:
2m-2s-async:双主双从异步复制
2m-2s-sync:双主双从同步复制
2m-noslave:双主模式
192.168.1.10:
1)master
vim /data/rocketmq/conf/2m-2s-sync/broker-a.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分隔
namesrvAddr=local10:9876;local12:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/data/rocketmq/store/broker-a
#commitLog 存储路径
storePathCommitLog=/data/rocketmq/store/broker-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/data/rocketmq/store/broker-a/consumequeue
#消息索引存储路径
storePathIndex=/data/rocketmq/store/broker-a/index
#checkpoint 文件存储路径
storeCheckpoint=/data/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/data/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
2)Slave
vim /data/rocketmq/conf/2m-2s-sync/broker-b-s.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-a
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=local10:9876;local12:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口,
listenPort=10920
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/data/rocketmq/store/broker-b-s
#commitLog 存储路径
storePathCommitLog=/data/rocketmq/store/broker-b-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/data/rocketmq/store/broker-b-s/consumequeue
#消息索引存储路径
storePathIndex=/data/rocketmq/store/broker-b-s/index
#checkpoint 文件存储路径
storeCheckpoint=/data/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/data/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
192.168.1.12:
1)master
vim /root/rocketmq/conf/2m-2s-sync/broker-b.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分隔
namesrvAddr=local10:9876;local12:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/root/rocketmq/store/broker-a
#commitLog 存储路径
storePathCommitLog=/root/rocketmq/store/broker-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/rocketmq/store/broker-a/consumequeue
#消息索引存储路径
storePathIndex=/root/rocketmq/store/broker-a/index
#checkpoint 文件存储路径
storeCheckpoint=/root/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/root/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#flushDiskType=ASYNC_FLUSH
2)Slave
vim /root/rocketmq/conf/2m-2s-syncbroker-a-s.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-a
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=local10:9876;local12:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口,
listenPort=10920
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/root/rocketmq/store/broker-b-s
#commitLog 存储路径
storePathCommitLog=/root/rocketmq/store/broker-b-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/rocketmq/store/broker-b-s/consumequeue
#消息索引存储路径
storePathIndex=/root/rocketmq/store/broker-b-s/index
#checkpoint 文件存储路径
storeCheckpoint=/root/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/root/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
5.修改启动脚本文件
1)runserver.sh
vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
2)runbroker.sh
vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
3)tools.sh
vim tools.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
6.服务启动
1)启动NameServer集群
分别在1.10和1.12上启动NameServer
nohup sh mqnamesrv &
2)启动Broker集群
- 在1.10上启动master1和slave2
master1:
cd /data/rocketmq/bin
启动master1:
nohup sh mqbroker -c /data/rocketmq/conf/2m-2s-sync/broker-a.properties &
启动slave2:
nohup sh mqbroker -c /data/rocketmq/conf/2m-2s-sync/broker-b-s.properties &
验证:
[root@local10 bin]# jps
1281 NamesrvStartup
1316 BrokerStartup
1430 BrokerStartup
1503 Jps
- 在1.12上启动master2和slave1
master2:
启动master2:
nohup sh mqbroker -c /root/rocketmq/conf/2m-2s-sync/broker-b.properties &
启动slave1:
nohup sh mqbroker -c /root/rocketmq/conf/2m-2s-sync/broker-a-s.properties &
验证:
[root@local12 bin]# jps
5492 Jps
5366 NamesrvStartup
5481 BrokerStartup
5403 BrokerStartup
7.查看日志
#查看nameserver日志
tail -500f ~/logs/rocketmqlogs/namesrv.log
#查看broker日志
tail -500f ~/logs/rocketmqlogs/broker.log
8.mqadmin管理工具
8.1 使用方式
进入RocketMQ安装位置,在bin目录下执行“ ./mqadmin {command} {args}”
8.2 命令介绍
参考网址:
https://www.cnblogs.com/zyguo/p/4962425.html
9.集群监控平台搭建
RocketMQ有一个对其扩展的开源项目
incubator-rocketmq-externals
,这个项目中有一个子模块叫rocketmq-console,这个便是管理控制台项目了,先将
incubator-rocketmq-externals
拉到本地,因为我们需要自己对rocketmq-console进行编译打包运行。
github地址: https://github.com/apache/rocketmq-externals
1)下载此项目到本地并解压
unzip rocketmq-externals-master.zip
2)配置namesrv集群地址:
vim /data/rocketmq-externals-master/rocketmq-console/src/main/resources/application.properties
rocketmq.config.namesrvAddr=192.168.1.10:9876;192.168.1.12:9876
3)进入到rocketmq-console文件夹,打包
mvn clean package -Dmaven.test.skip=true
打包完成后会生成一个target的目录,其中会有一个名叫rocketmq-console-ng-1.0.0.jar的可执行jar文件。
4)上传jar文件到1.10上,运行
cp rocketmq-console-ng-1.0.0.jar /data/package
jar -jar rocketmq-console-ng-1.0.0.jar
10.启动成功后,我们就可以通过浏览器访问
http://localhost:8080
进入控制台界面