学前小故事
有一天,产品跑来说:“我们要做一个用户注册功能,需要在用户注册成功后给用户发一封成功邮件。”
小明(攻城狮):“好,需求很明确了。” 不就提供一个注册接口,保存用户信息,同时发起邮件调用,待邮件发送成功后,返回用户操作成功。没一会功夫,代码就写完了。验证功能没问题后,就发布上线了。
线上正常运行了一段时间,产品匆匆地跑来说:“你做的功能不行啊,运营反馈注册操作响应太慢,已经有好多用户流失了。”
小明听得一身冷汗,赶紧回去改。他发现,原先的以单线程同步阻塞的方式进行邮件发送,确实存在问题。这次,他利用了 JAVA 多线程的特性,另起线程进行邮件发送,主线程直接返回保存结果。测试通过后,赶紧发布上线。小明心想,这下总没问题了吧。
没过多久,产品又跑来了,他说:“现在,注册操作响应是快多了。但是又有新的问题了,有用户反应,邮件收不到。能否在发送邮件时,保存一下发送的结果,对于发送失败的,进行补发。”
小明一听,哎,又得熬夜加班了。产品看他一脸苦逼的样子,忙说:“邮件服务这块,别的团队都已经做好了,你不用再自己搞了,直接用他们的服务。”
小明赶紧去和邮件团队沟通,谁知他们的服务根本就不对外开放。这下小明可开始犯愁了,明知道有这么一个服务,可是偏偏又调用不了。
邮件团队的人说,“看你愁的,我给你提供了一个类似邮局信箱的东西,你往这信箱里写上你要发送的消息,以及我们约定的地址。之后你就不用再操心了,我们自然能从约定的地址中取得消息,进行邮件的相应操作。”
后来,小明才知道,这就是外界广为流传的消息队列。你不用知道具体的服务在哪,如何调用。你要做的只是将该发送的消息,向你们约定好的地址进行发送,你的任务就完成了。对应的服务自然能监听到你发送的消息,进行后续的操作。这就是消息队列最大的特点,将同步操作转为异步处理,将多服务共同操作转为职责单一的单服务操作,做到了服务间的解耦。
哈哈,这下能高枕无忧了。太年轻,哪有万无一失的技术啊~
不久的一天,你会发现所有业务都替换了邮件发送的方式,统一使用了消息队列来进行发送。这下仅仅一个邮件服务模块,难以承受业务方源源不断的消息,大量的消息堆积在了队列中。这就需要更多的消费者(邮件服务)来共同处理队列中的消息,即所谓的分布式消息处理。
rocketmq入门
-
消息队列
含义 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能, 高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。 应用场景 (同下方rocketmq应用场景)
示例图 a. 电商系统 (1)应用将主干逻辑处理完成后,写入消息队列。消息发送是否成功可以开启消息的确认模式。(消息队列返回消息接收成功状态后,应用再返回,这样保障消息的完整性) (2)扩展流程(发短信,配送处理)订阅队列消息。采用推或拉的方式获取消息并处理。 (3)消息将应用解耦的同时,带来了数据一致性问题,可以采用最终一致性方式解决。比如主数据写入数据库,扩展应用根据消息队列,并结合数据库方式实现基于消息队列的后续处理。 b. 日志收集系统 (消息将应用解耦的同时,带来了数据一致性问题,可以采用最终一致性方式解决。比如主数据写入数据库,扩展应用根据消息队列,并结合数据库方式实现基于消息队列的后续处理。) 1.Zookeeper注册中心,提出负载均衡和地址查找服务; 2.日志收集客户端,用于采集应用系统的日志,并将数据推送到kafka队列; 3.Kafka集群:接收,路由,存储,转发等消息处理;
-
rocketmq示例图
分析 a.消息队列是一种"先进先出"的数据结构 b.不使用队列的情况下,生产者与消费者之间是通过RPC交互的
-
rocketmq应用场景
-
应用解耦
问题描述 系统的耦合性越高,容错性就越低,以电商应用为例,用户创建订单后, 如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障 或者因为升级等原因暂时不可用,都会造成下单操作异常 解耦含义 使用消息队列解耦,系统的耦合性就会下降了,比如物流系统发生故障, 需要几分钟才能修复,在这段时间内,物流系统要处理的数据被缓存到消 息队列中,用户的下单操作正常完成。当物流系统恢复后,补充处理存在 消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故 障 场景 用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口 那么会存在以下缺点 1.假如库存系统无法访问,则订单减库存将失败,从而导致订单失败 2.订单系统与库存系统耦合 解决方案 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户下单成功 库存系统:订单下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行 库存操作 假如:在下单时库存系统不能正常使用,也不影响正常下单,因为下单后,订单系统写入消息 队列就不再关心其他的后续操作了。实现了订单系统与库存系统的应用解耦
-
流量削峰
问题描述 应用系统如果遇到系统请求流量的瞬间猛增,有可能将系统压垮,有了消 息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大 大提高系统的稳定性 削峰含义 一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用 户请求,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知 用户下单完毕,这方法虽然会耗时,但出现系统不能下单的情况 场景描述 秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为了解决这个问题,一般需要 在应用前端加入消息队列。这样做的好处有 1.可以控制活动的人数 2.可以缓解短时间内高流量压垮应用 3.用户请求,服务器接收后,首先写入消息队列,假如消息队列长度超过最大数量,则直接 抛弃用户请求或跳转到错误页面 4.秒杀业务根据消息队列中的请求信息,再做后续处理
-
数据分发
数据分发含义 通过消息队列可以让数据在多个系统之间更加方便流通。只需要将数据发 送到消息队列,数据使用方直接在消息队列中获取数据即可 图解 A系统产生数据,发送到MQ BCD哪个系统需要,自己去MQ消费即可 如果某个系统不需要数据,取消对MQ消息的消费即可 新系统要数据,直接从MQ消费即可
-
异步处理
场景描述 用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行方式 2. 并行方式 串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信,以上三个任务 完成后,返回给客户端 并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个 任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间 探究 假设三个业务节点每个使用50毫秒,不考虑网络等其他开销,则串行方式的时间是150毫秒 ,并行的时间可能是100毫秒 因为cpu在单位时间内处理的请求数是一定的,假设cpu1秒吞吐量是100次,则串行方式1 秒内cpu可处理的请求量是7次(1000/150),并行方式处理的请求量是10次(1000/100)。 小结 根据如上案例,传统的方式系统的性能,如并发量、吞吐量、响应量等会有瓶颈 解决(使用消息队列) 按照以上约定,用户的响应时间相当于是注册消息写入数据库时间,也就是50毫秒。 注册邮件,发送短信写入消息队列后,直接返回,因为写入消息队列的速度很快 基本可以忽略,因此用户响应时间可能是50毫秒。因此架构改变后,系统的吞吐量 提高到每秒20 QPS,比串行提高了3倍,比并行提高了2倍
-
日志处理
含义 日志处理是指将消息队列用在日志处理中,比如kafka的应用,解决大量日志传输的问题, 架构简化如下 日志采集客户端,负责日志数据采集,定时写入kafka队列 kafka消息队列,负责日志数据的接收,存储和转发 日志处理,订阅并消费kafka队列中的日志数据
-
搭建环境
环境安装——Linux
rocketmq安装
下载地址:
http://rocketmq.apache.org/
安装选择:
二进制包安装方式(含有bin的安装包)
安装环境:unzip 安装包名
安装环境:
linux64位系统、JDK1.8(64位)、源码安装需要安装maven3.2x
-
RocketMQ下载及安装
-
RocketMQ目录结构
bin 启动脚本,包括shell脚本和cmd脚本 conf 实例配置文件,包括broker配置文件、logback配置文件等 lib 依赖jar包,包括netty、commons-lang、FastJSON等
-
RocketMQ启动及测试
-
前提条件
内存修改 Rocketmq默认的虚拟机内存较大,启动broker如果因为内存不足失败,需要编辑如下两个配置文件,去修改JVM内存大小 vi runbroker.sh vi runserver.sh #编辑runbroker.sh和runserver.sh修改默认JVM大小 #参考设置 JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -xx:MetaspaceSize=128m xx:MaxMetaspaceSize=320m" 端口 开启端口:10911 10912 10909 9876 启动位置 bin目录
-
NameServer启动
位置 RocketMQ目录下的bin目录 本地部署(linux) nohup sh mqnamesrv & 启动NameServer tail -f ~/logs/rocketmqlogs/namesrv.log 查看日志 外网部署 nohup sh mqnamesrv -n "192.168.33.100:9876" & 启动NameServer tail -f ~/logs/rocketmqlogs/namesrv.log 查看日志
[注]此时NameServer启动成功
-
Broker启动
位置 RocketMQ目录下的bin目录 本地部署(linux) nohup sh mqbroker -n localhost:9876 & 启动Broker tail -f ~/logs/rocketmqlogs/broker.log 查看启动日志 外网部署 echo 'brokerIP1=192.168.33.100' > ../conf/broker.properties nohup sh mqbroker -n localhost:9876 -c ../conf/broker.properties autoCreateTopicEnable=true & tail -f ~/logs/rocketmqlogs/broker.log 查看启动日志
-
发送与接受消息测试(linux端)
含义 在发送或接收消息之前,开发者需要通知客户端name servers 的位置。RocketMQ提供多种 实现方式。为了简单起见,下方展示环境变量NAMESRV_ADDR的用法 发送消息(bin目录下) 设置环境变量:export NAMESRV_ADDR=localhost:9876 使用安装包的Demo发送消息: sh tools.sh org.apache.rocketmq.example.quickstart.Producer 接受消息 (bin目录下) 设置环境变量:export NAMESRV_ADDR=localhost:9876 接受消息:sh tools.sh org.apache.rocketmq.example.quickstart.Consumer jps查看进程号
RocketMQ由如下几部分构成
Name Server
Broker
Producer
Consumer
-
RocketMQ关闭(linux端)
位置 RocketMQ的bin目录 命令 sh mqshutdown namesrv 关闭NameServer sh mqshutdown broker 关闭Broker
-
日志查看问题(windows端)
1.使用NotePad++工具 2.下载NPPTTP插件 3.使用插件远程连接Linux
nohup的作用
nohup命令:如果你正在运行一个进程,而且你觉得在退出帐户时或者关闭客户端该进程还不会结束,那么可以使用nohup命令。该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。在缺省情况下该作业的所有输出都被重定向到一个名为nohup.out的文件中。
简单理解:
nohup运行命令可以使命令永久的执行下去,和用户终端没有关系,例如我们断开SSH连接都不会影响他的运行,注意了nohup没有后台运行的意思;
&的作用
&是指在后台运行,但当用户退出(挂起)的时候,命令自动也跟着退出
注意
nohup COMMAND & 这样就能使命令永久的在后台执行
nohup可以使用Ctrl+C结束掉,而&使用Ctrl+C则结束不掉,nohup不受终端关闭,用户退出影响,而&则受终端关闭,用户退出影响
-
管理工具
-
mqadmin管理工具
使用方式 进入RocketMQ安装位置,在bin目录下执行./mqadmin {admin} {args} 命令介绍 updateTopic[创建Topic] 类路径[com.alibaba.rocketmq.tools.command.topic.UpdateTopicSubCommand] 参数 是否必填 说明 -b 如果-c为空,则必填 broker 地址,表示topic 建在该broker -c 如果-b为空,则必填 cluster 名称,表示topic 建在该集群(集群可通过clusterList 查询) -h 否 打印帮助 -n 是 nameserve 服务地址列表,格式ip:port;ip:port;... -r 否 可读队列数(默认为8) -w 否 可写队列数(默认为8) 注意事项 几乎所有命令都需要配置-n 表示NameServer地址,格式为ip:port; 几乎所有命令都可以通过-h获取帮助; 如果既有Broker地址(-b)配置项又有clusterName(-c)配置项,则优先以Broker地址执行 命令;如果不配置Broker地址,则对集群中所有主机执行命令
-
集群监控平台搭建
含义 RocketMQ有一个对其扩展的开源项目,incubator-rocketmq-externals, 这个项目中有一个子模块叫rocketmq-console。这个便是管理控制台项目。 步骤是先将incubator-rocketmq-externals从git拉到本地,然后对rocketmq-console 进行操作(编译打包运行) git地址 https://github.com/SummerUnfair/rocketmq-externals.git 使用步骤 1. 在rocketmq-console中配置namesrc集群地址 rocketmq.config.namesrvAddr=192.168.33.100:9876 2. 执行打包命令 clean package -Dmaven.test.skip=true 3.启动rokcetmq-console java -jar rocketmq-console-ng-1.0.0.jar
-
使用集群监控平台
rocketmq控制台(驾驶舱) 下图首页即为"驾驶舱"标签下的图标 -Broker TOP 10 :是指前10个Brokder处理消息的数量。 比如从下图可以看出来,我只有一个Brokder,并且此Brokder处理了100条消息. -Broker 5min trend: 此图标可以筛选出某个Topic下5分钟的消息数量,可以切换时间, 所以就相当于可以查看某个Topic下的消息数量趋势。 切换namesrvAdd(如图) 集群(如图) 主题(如图) -状态,TopicTest是rocketmq系统自带的Topic,默认配置有4个队列 -发送消息,设定:主体、tag、消息体后,即可在控制台发送消息 消息(如图)
rocketmq基础
RocketMQ架构设计图及说明
说明
RocketMQ 整体架构设计主要分为四大部分,分别是:Producer、Consumer、Broker、NameServer。
为了更贴合实际,图画的都是集群部署,像 Broker 还画了主从。
■ Producer:就是消息生产者,可以集群部署。它会先和 NameServer 集群中的随机一台建立长连接,得知当前要发送的 Topic 存在哪台 Broker Master上,然后再与其建立长连接,支持多种负载平衡模式发送消息。
■ Consumer:消息消费者,也可以集群部署。它也会先和 NameServer 集群中的随机一台建立长连接,得知当前要消息的 Topic 存在哪台 Broker Master、Slave上,然后它们建立长连接,支持集群消费和广播消费消息。
■ Broker:主要负责消息的存储、查询消费,支持主从部署,一个 Master 可以对应多个 Slave,Master 支持读写,Slave 只支持读。Broker 会向集群中的每一台 NameServer 注册自己的路由信息。
■ NameServer:是一个很简单的 Topic 路由注册中心,支持 Broker 的动态注册和发现,保存 Topic 和 Borker 之间的关系。通常也是集群部署,但是各 NameServer 之间不会互相通信, 各 NameServer 都有完整的路由信息,即无状态。
再用一段话来概括它们之间的交互
先启动 NameServer 集群,各 NameServer 之间无任何数据交互,Broker 启动之后会向所有 NameServer 定期(每 30s)发送心跳包,包括:IP、Port、TopicInfo,NameServer 会定期扫描 Broker 存活列表,如果超过 120s 没有心跳则移除此 Broker 相关信息,代表下线。
这样每个 NameServer 就知道集群所有 Broker 的相关信息,此时 Producer 上线从 NameServer 就可以得知它要发送的某 Topic 消息在哪个 Broker 上,和对应的 Broker (Master 角色的)建立长连接,发送消息。
Consumer 上线也可以从 NameServer 得知它所要接收的 Topic 是哪个 Broker ,和对应的 Master、Slave 建立连接,接收消息。
简单的工作流程如上所述,相信大家对整体数据流转已经有点印象了,我们再来看看每个部分的详细情况。
RocketMQ优缺点
RocketMQ起到解耦、削峰、数据分发的作用,同时也存在着系统可用性降低、系统复杂度提高、一致性问题 这三个方面缺点。
系统可用性降低 :
系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响.
系统复杂度提高 :
MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。如何保证
消息没有被重复消费、怎么处理消息丢失情况、如何保证消息传递的顺序性。
一致性问题 :
A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理完成、D处理失败、如何保证消
息数据处理的一致性。
常见的MQ产品宏观对比
产品 | 开发语言 | 单机吞吐量 | 时效性 | 可用性 | 特性 |
---|---|---|---|---|---|
ActiveMQ | java | 万级 | ms级 | 高(主从架构) | ActiveMQ 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好 |
RabbitMQ | erlang | 万级 | us级 | 高(主从架构) | RabbitMQ 基于erlang开发,所以并发能力强,性能极其好,延时很低,管理界面较丰富 |
RocketMQ | java | 10万级 | ms级 | 非常高(分布式架构) | RocketMQ MQ功能比较完备,扩展性佳 |
Kafka | scala | 10万级 | ms级以内 | 非常高(分布式架构) | Kafka 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广 |
rocketmq消息管理
-
环境搭建
-
maven依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
-
consume messages(消费者)
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // Instantiate with specified consumer group name. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // Specify name server addresses. consumer.setNamesrvAddr("192.168.33.100:9876"); // Subscribe one more more topics to consume. consumer.subscribe("TopicTest", "*"); // Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //Launch the consumer instance. consumer.start(); System.out.printf("Consumer Started.%n"); } }
-
-
生产者发送消息的三种方式
-
轮廓图示
-
可靠同步发送
含义 同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。 示例 (调用DefaultMQProducer的send方法) 应用 重要的通知消息、短信通知、短信营销系统 生产者代码(如下)
public class SyncProducer { public static void main(String[] args) throws Exception { // Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("192.168.33.100:9876"); // Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { // Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("ferao 帅").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } // Shut down once the producer instance is not longer in use. producer.shutdown(); } }
SendResult [sendStatus=SEND_OK, msgId=C0A8006C22E018B4AAC22DDEB59C0060, offsetMsgId=C0A8216400002A9F00000000000C0782, messageQueue=MessageQueue [topic=TopicTest, brokerName=192.168.33.100, queueId=0], queueOffset=974] SendResult [sendStatus=SEND_OK, msgId=C0A8006C22E018B4AAC22DDEB59F0061, offsetMsgId=C0A8216400002A9F00000000000C0844, messageQueue=MessageQueue [topic=TopicTest, brokerName=192.168.33.100, queueId=1], queueOffset=974] SendResult [sendStatus=SEND_OK, msgId=C0A8006C22E018B4AAC22DDEB5A00062, offsetMsgId=C0A8216400002A9F00000000000C0906, messageQueue=MessageQueue [topic=TopicTest, brokerName=192.168.33.100, queueId=2], queueOffset=974] SendResult [sendStatus=SEND_OK, msgId=C0A8006C22E018B4AAC22DDEB5A20063, offsetMsgId=C0A8216400002A9F00000000000C09C8, messageQueue=MessageQueue [topic=TopicTest, brokerName=192.168.33.100, queueId=3], queueOffset=974]
ConsumeMessageThread_19 Receive New Messages: [MessageExt [queueId=2, storeSize=194, queueOffset=973, sysFlag=0, bornTimestamp=1594302370200, bornHost=/192.168.33.2:57952, storeTimestamp=1594302369250, storeHost=/192.168.33.100:10911, msgId=C0A8216400002A9F00000000000C05FE, commitLogOffset=787966, bodyCRC=1284723028, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=974, CONSUME_START_TIME=1594302370203, UNIQ_KEY=C0A8006C22E018B4AAC22DDEB598005E, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[102, 101, 114, 97, 111, 32, -27, -72, -123], transactionId='null'}]] ConsumeMessageThread_17 Receive New Messages: [MessageExt [queueId=3, storeSize=194, queueOffset=973, sysFlag=0, bornTimestamp=1594302370202, bornHost=/192.168.33.2:57952, storeTimestamp=1594302369252, storeHost=/192.168.33.100:10911, msgId=C0A8216400002A9F00000000000C06C0, commitLogOffset=788160, bodyCRC=1284723028, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=974, CONSUME_START_TIME=1594302370206, UNIQ_KEY=C0A8006C22E018B4AAC22DDEB59A005F, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[102, 101, 114, 97, 111, 32, -27, -72, -123], transactionId='null'}]] ConsumeMessageThread_20 Receive New Messages: [MessageExt [queueId=0, storeSize=194, queueOffset=974, sysFlag=0, bornTimestamp=1594302370204, bornHost=/192.168.33.2:57952, storeTimestamp=1594302369254, storeHost=/192.168.33.100:10911, msgId=C0A8216400002A9F00000000000C0782, commitLogOffset=788354, bodyCRC=1284723028, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=975, CONSUME_START_TIME=1594302370207, UNIQ_KEY=C0A8006C22E018B4AAC22DDEB59C0060, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[102, 101, 114, 97, 111, 32, -27, -72, -123], transactionId='null'}]] ConsumeMessageThread_12 Receive New Messages: [MessageExt [queueId=1, storeSize=194, queueOffset=974, sysFlag=0, bornTimestamp=1594302370207, bornHost=/192.168.33.2:57952, storeTimestamp=1594302369257, storeHost=/192.168.33.100:10911, msgId=C0A8216400002A9F00000000000C0844, commitLogOffset=788548, bodyCRC=1284723028, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=975, CONSUME_START_TIME=1594302370209, UNIQ_KEY=C0A8006C22E018B4AAC22DDEB59F0061, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[102, 101, 114, 97, 111, 32, -27, -72, -123], transactionId='null'}]] ConsumeMessageThread_8 Receive New Messages: [MessageExt [queueId=2, storeSize=194, queueOffset=974, sysFlag=0, bornTimestamp=1594302370208, bornHost=/192.168.33.2:57952, storeTimestamp=1594302369258, storeHost=/192.168.33.100:10911, msgId=C0A8216400002A9F00000000000C0906, commitLogOffset=788742, bodyCRC=1284723028, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=975, CONSUME_START_TIME=1594302370211, UNIQ_KEY=C0A8006C22E018B4AAC22DDEB5A00062, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[102, 101, 114, 97, 111, 32, -27, -72, -123], transactionId='null'}]]
-
可靠异步发送
含义 异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback)。 消息发送方在发送了一条消息后,不需要等待服务器响应即可返回, 进行第二条消息发送。发送方通过回调接口接收服务器响应,并对响应结果进行处理。 应用 异步发送通常被用于对响应时间敏感的业务场景 示例
public class AsyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("example_group_name"); //Launch the instance. producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); for (int i = 0; i < 100; i++) { final int index = i; //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
-
单向(Oneway)发送
含义 单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发, 即只发送请求不等待应答。 此方式发送消息的过程耗时非常短,一般在微秒级别。 应用 单向发送用于要求一定可靠性的场景,例如日志收集 示例 (调用DefaultMQProducer的sendOneway方法)
public class OnewayProducer { public static void main(String[] args) throws Exception{ //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("example_group_name"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. producer.sendOneway(msg); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
-
-
msgId生成算法
含义 当开发者用rocketmq发送消息的时候通常都会返回如下消息 SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000118B4AAC2088BEB070000, offsetMsgId=C0A8216400002A9F00000000001A3D34, messageQueue=MessageQueue [topic=TopicTest, brokerName=192.168.33.100, queueId=1], queueOffset=2025] 对于客户端来说msgId是由客户端producer自己生成的,offsetMsgId是由broker生成的, 其中offsetMsgId就是我们在rocketMQ控制台直接输入查询的那个messageId。 概念 msgId 该ID 是消息发送者在消息发送时会首先在客户端生成,全局唯一, 在 RocketMQ 中该 ID 还有另外的一个叫法:uniqId,无不体现其全局唯一性。 offsetMsgId 消息偏移ID,该 ID 记录了消息所在集群的物理地址, 主要包含所存储 Broker 服务器的地址( IP 与端口号)以及所在commitlog 文件的物理偏移量。
两者ID的生成算法 msgId 生成步骤 1.初始化参数LEN,FIX_STRING,COUNTER 2.初始化buffer 3.设置开始时间 4.字节转string工具方法 5.最终生成msgId 细节 其中createUniqId就是最终生成msgId方法。除些之外的方法者是createUniqId调 用或者被间接调用的方法,这些方法实现也比较简单。 StringBuilder sb = new StringBuilder(LEN * 2); 由此可知msgId的长度是LEN * 2 = 16 * 2 = 32; 设time = 当前时间 - 本月开始时间(ms); 从代码得到 FIX_STRING = ip + 进程pid + MessageClientIDSetter.class.getClassLoader().hashCode(); createUniqIDBuffer 加入time 和 counter 因子。 最终得到msgId的生成因子是: ip + 进程pid + MessageClientIDSetter.class.getClassLoader().hashCode() + time + counter(AtomicInteger自增变量) 最后调用bytes2string进行十六进制的移位和编码就产生了我们的msgId。 分析算法 对于每个producer实例来说ip都是唯一的,所以不同producer生成的msgId是不会重复的。 对于producer单个实例来说的区分因子是:time + counter。 首先应用不重启的情况下msgId是保证唯一性的, 应用重启了只要系统的时钟不变msgId也是唯一的。 所以只要系统的时钟不回拨我们就可以保证msgId的全局唯一。
offsetMsgId 生成步骤 broker端生成的offsetMsgId就比较简单了,直接就是主机ip + 物理分区的offset, 再调用UtilAll.bytes2string进行移位转码就完成了
rocketmq之Java Class
DefaultMQProducer类
概述
DefaultMQProducer类是应用发送消息使用的基类,封装一些通用的方法方便开发者在更多场景中使用。属于线程安全类,在配置并启动后可在多个线程间共享此对象。
其可以通过无参构造方法快速创建一个生产者,通过getter/setter方法,调整发送者的参数。主要负责消息的发送,支持同步/异步/oneway的发送方式,这些发送方式均支持批量发送。
方法
属性 | 内容 |
---|---|
DefaultMQProducerImpl defaultMQProducerImpl; | 生产者内部默认实现类 |
String producerGroup; | Producer组名, 默认值为DEFAULT_PRODUCER。多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。 |
String createTopicKey; | 自动创建测试的topic名称, 默认值为TBW102;在发送消息时,自动创建服务器不存在的topic,需要指定Key。broker必须开启isAutoCreateTopicEnable |
int defaultTopicQueueNums; | 创建默认topic的queue数量。默认4 |
int sendMsgTimeout; | 发送消息超时时间,默认值10000,单位毫秒 |
int compressMsgBodyOverHowmuch; | 消息体压缩阈值,默认为4k(Consumer收到消息会自动解压缩) |
int retryTimesWhenSendFailed; | 同步模式,返回发送消息失败前内部重试发送的最大次数。可能导致消息重复。默认2 |
int retryTimesWhenSendAsyncFailed; | 异步模式,返回发送消息失败前内部重试发送的最大次数。可能导致消息重复。默认2 |
boolean retryAnotherBrokerWhenNotStoreOK; | 声明发送失败时,下次是否投递给其他Broker,默认false |
int maxMessageSize; | 最大消息大小。默认4M; 客户端限制的消息大小,超过报错,同时服务端也会限制 |
TraceDispatcher traceDispatcher | 消息追踪器,定义了异步传输数据接口。使用rcpHook来追踪消息 |
DefaultMQPushConsumer类
概述
DefaultMQPushConsumer类是rocketmq客户端消费者的实现,从名字上已经可以看出其消息获取方式为broker往消费端推送数据,其内部实现了流控,消费位置上报等等。DefaultMQPushConsumer是Push消费模式下的默认配置。
方法
字段 | 内容 |
---|---|
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; | 消费者实现类,所有的功能都委托给DefaultMQPushConsumerImpl来实现 |
String consumerGroup; | 消费者组名,必须设置,参数默认值是:DEFAULT_CONSUMER (需要注意的是,多个消费者如果具有同样的组名,那么这些消费者必须只消费同一个topic) |
MessageModel messageModel; | 消费的方式,支持以下两种 1、集群消费 2、广播消费。BROADCASTING 广播模式,即所有的消费者可以消费同样的消息CLUSTERING 集群模式,即所有的消费者平均来消费一组消息 |
ConsumeFromWhere consumeFromWhere; | 消费者从那个位置消费,分别为:CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 ;CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费;CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费(以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始) |
AllocateMessageQueueStrategy allocateMessageQueueStrategy; | 消息分配策略,用于集群模式下,消息平均分配给所有客户端;默认实现为AllocateMessageQueueAveragely |
Map<String, String> subscription; | topic对应的订阅tag |
MessageListener messageListener; | 消息监听器 ,处理消息的业务就在监听里面。目前支持的监听模式包括:MessageListenerConcurrently,对应的处理逻辑类是MessageListener messageListener ;ConsumeMessageConcurrentlyService MessageListenerOrderly 对应的处理逻辑类是ConsumeMessageOrderlyService;两者使用不同的ACK机制。RocketMQ提供了ack机制,以保证消息能够被正常消费。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。上面两个不同的监听模式使用的ACK机制是不一样的。 |
OffsetStore offsetStore; | offset存储实现,分为本地存储或远程存储 。集群消费:从远程Broker获取。广播消费:从本地文件获取。 |
-
DefaultMQPushConsumer类
重要字段 int consumeThreadMin = 20 线程池自动调整 int consumeThreadMax = 64 线程池自动调整 long adjustThreadPoolNumsThreshold = 100000 int consumeConcurrentlyMaxSpan = 2000 单队列并行消费最大跨度,用于流控 int pullThresholdForQueue = 1000 一个queue最大消费的消息个数,用于流控 long pullInterval = 0 检查拉取消息的间隔时间,由于是长轮询,所以为 0,但是如果应用为了流控, 也可以设置大于 0 的值,单位毫秒,取值范围: [0, 65535] consumeMessageBatchMaxSize = 1 并发消费时,一次消费消息的数量,默认为1, 假如修改为50,此时若有100条消息,那么会创建两个线程,每个线程分配50条消息。 换句话说,批量消费最大消息条数,取值范围: [1, 1024]。默认是1 pullBatchSize = 32 消费者去broker拉取消息时,一次拉取多少条。取值范围: [1, 1024]。默认是32 。可选配置 boolean postSubscriptionWhenPull = false boolean unitMode = false 重要方法 subscribe(String topic, String subExpression) 订阅某个topic,subExpression传*为订阅该topic所有消息 registerMessageListener(MessageListenerConcurrently messageListener) 注册消息回调,如果需要顺序消费,需要注册MessageListenerOrderly的实现 start 启动消息消费
-
Message类
含义 Producer发送的消息定义为Message类 位置 org.apache.rocketmq.common.message 字段定义 如图 字段详解 topic Message都有Topic这一属性,Producer发送指定Topic的消息,Consumer订阅Topic下的 消息。 通过Topic字段,Producer会获取消息投递的路由信息,决定发送给哪个Broker。 flag 网络通信层标记。 body Producer要发送的实际消息内容,以字节数组形式进行存储。Message消息有一定大小限制。 transactionId RocketMQ 4.3.0引入的事务消息相关的事务编号。 properties 该字段为一个HashMap,存储了Message其余各项参数,比如tag、key等关键的消息属性。 RocketMQ预定义了一组内置属性,除了内置属性之外, 还可以设置任意自定义属性。当然属性的数量也是有限的, 消息序列化之后的大小不能超过预设的最大消息大小。 系统内置属性定义于org.apache.rocketmq.common.message.MessageConst (如图) 对于一些关键属性,Message类提供了一组set接口来进行设置, class Message { public void setTags(String tags) {...} public void setKeys(Collection<String> keys) {...} public void setDelayTimeLevel(int level) {...} public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {...} public void setBuyerId(String buyerId) {...} } 这几个set接口对应的作用分别为为, 属性 接口 用途 MessageConst.PROPERTY_TAGS setTags 在消费消息时可以通过tag进行消 息过滤判定 MessageConst.PROPERTY_KEYS setKeys 可以设置业务相关标识,用于消费 处理判定,或消息追踪查询 MessageConst.PROPERTY_DELAY_TIME_LEVEL setDelayTimeLevel 消息延迟处理级别,不同级别对应不同延迟时间 MessageConst.PROPERTY_WAIT_STORE_MSG_OK setWaitStoreMsgOK 在同步刷盘情况下是否需要等待数 据落地才认为消息发送成功 `MessageConst.PROPERTY_BUYER_ID setBuyerId 没有在代码中找到使用的地方,所 以暂不明白其用处 这几个字段为什么用属性定义,而不是单独用一个字段进行表示?方便之处可能在于消息数据存 盘结构早早定义,一些后期添加上的字段功能为了适应之前的存储结构,以属性形式存储在一个 动态字段更为方便,自然兼容。
-
MessageExt类讲解
含义 对于发送方来说,上述Message的定义以足够。但对于RocketMQ的整个处理流程来说, 还需要更多的字段信息用以记录一些必要内容,比如消息的id、创建时间、存储时间等 等。在同package下可以找到与之相关的其余类定义。首先就是MessageExt, 字段 字段 用途 queueId 记录MessageQueue编号,消息会被发送到Topic下的MessageQueue storeSize 记录消息在Broker存盘大小 queueOffset 记录在ConsumeQueue中的偏移 sysFlag 记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识 bornTimestamp 消息创建时间,在Producer发送消息时设置 storeHost 记录存储该消息的Broker地址 msgId 消息Id commitLogOffset 记录在Broker中存储便宜 bodyCRC 消息内容CRC校验值 reconsumeTimes 消息重试消费次数 preparedTransactionOffset 事务详细相关字段 注意 Message还有一个名为MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX的属性, 在消息发送时由Producer生成创建。 上面的msgId则是消息在Broker端进行存储时通过MessageDecoder.createMessageId方法生成 的,其构成为(如图) 这个MsgId是在Broker生成的,Producer在发送消息时没有该信息,Consumer在消费消息时则能 获取到该值。RocketMQ也提供了相关命令, 命令 实现类 描述 queryMsgById QueryMsgByIdSubCommand 根据MsgId查询消息
rocketmq角色介绍
-
Producer
含义 消息生产者,负责创建消息发送给Broker,一般由业务系统负责产生消息 RocketMQ默认提供了DefaultMQProducer、TransactionMQProducer用于发送消息。 内含 a.Producer Group: 是一组Producer的名称。通常来说一个业务系统可能会奉陪一个Producer Group。 Producer Group后续可以用于消息发送相关的各项管理监控功能。 b.Message: RocketMQ中的消息。Message必须设置Topic以及消息体,除此之外还可以配 置一些自定义属性。只要不超过预定义的消息大小,自定义属性可以任意添加。 c.Message Model: 消息投递存在两种不同类别, d.Tag: Message可以设置Tag,Tag是系统预定义的属性。Message设置了Tag之后,在消费的时候 可以根据Tag进行过滤。RocketMQ提供了几种过滤方式。可以认为Tag是Message的二级类别
-
Consumer
含义 消息消费者,用于从消息队列获取消息。常用的Consumer类 内含 a.DefaultMQPushConsumer, 收到消息自动调用传入的处理方法来处理,实时性高。 b.DefaultMQPullConsumer 用户自主控制,灵活度更高。 c.Push Consumer: 服务端向消费端推送消息 d.Pull Consumer: 消费端向服务端定时拉取消息 e.Consumer Group : 是一组Consumer的名称.相同Group下的Consumer需要有同样的订阅关系,且消费逻辑一致 否则消息投递的时候可能会出现一些难以排查的问题。 Consumer Group同样用于分配给不同的业务系统。通过管理工具可以控制Group的消费范围
-
Broker
含义 Broker是具体提供业务的服务器, 解释一:RocketMQ的核心逻辑是Broker。Broker是实际用于手法消息的功能单元。从RocketMQ 使用者的角度来看,生产者通过接口将消息投递到Broker,消费者从Broker获取消息进行消费。 RocketMQ提供了推拉结合的方式用于获取消息。 解释二:单个Broker节点与所有的NameServer节点保持长连接及心跳, 并会定时将Topic信息注册到NameServer,顺带一提底层的通信和连接都是基于Netty实现的。 Broker中分master和slave两种角色,每个master可以对应多个slave, 但一个slave只能对应一个master,master和slave通过指定相同的Brokername, 不同的BrokerId (master为0)成为一个组。 master和slave之间的同步方式分为同步双写和异步复制, 异步复制方式master和slave之间虽然会存在少量的延迟, 但性能较同步双写方式要高出10%左右 举例:邮局 。它是RocketMQ的核心,用于接收Producer发过来的 消息、以及处理Consumer的消费消息请求、消息的持久化存储、服务端过滤功能等 另外,Broker中还存在一些非常重要的名词需要说明: 内含 a.Topic: 区分消息的种类,一个发送者可以发送消息给一个或者多个Topic,一个消息的接受者可以 订阅一个或者多个Topic消息。对于RokectMQ而言,Topic只是一个逻辑上的概念, 真正的消息存储其实是在Topic中的Queue中。这要设计是为了消息的顺序消费, b.Message Queue: 相当于是Topic的分区,用于并发发送和接受消息
-
NameServer
含义 解释一:RocketMQ没有引入第三方服务依赖,消息队列内部的服务发现以及配置更新等,都 借由Name Server来完成。从功能上来说,Name Server相当于一个轻量级简化版 的Zookeeper,或者说提供了类似ZK的功能。 Name Server的定位是维护RocketMQ全局相关配置,提供消息路由信息,除此之外 并不包含过多复杂逻辑。因为其相对轻量级,一般一组Name Server集群可以服务多 组Broker集群。 Name Server Cluster是多个Name Server实例的统称,Name Server之间并无关 联,互相也不同步信息。多个Name Server的存在是为了提供高可用服务,不同实例之 间的数据信息同步则实际是在数据写入的时候保证的。一份配置或消息路由信息会写入 所有Name Server实例中。 解释二:相当于配置中心,维护Broker集群、Broker信息、Broker存活信息、主题与 队列信息等。NameServer彼此之间不通信,每个Broker与集群内所有NameServer 保持长连接 通信机制 1.Broker启动后需要完成一次将自己注册到NameServer的操作;随后每隔30秒时间定时向 NameServer更新Topic路由信息 2.Producer发送消息时,需要根据消息的Topic从本地缓存的获取路由信息。如果没有则 更新路由信息,会从NameServer重新拉取,同时Producer会默认每隔30秒向NameServer 拉取一次路由信息 3.Consumer消费消息时,从NameServer获取的路由信息,并再完成客户端的负载均衡后,监听 指定消息队列获取消息并进行消费
集群
-
集群模式
单Master模式 这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上 环境使用,可以用于本地测试 多Master模式 一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点是 优点:配置简单,单个Master宕机或重启维护对应无影响,在磁盘配置为PAID10时,即时 机器宕机不可恢复情况下,由于PAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量 消息,同步刷盘一条不丢),性能最高。 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时 性会受到影响。 多Master多Slave模式(异步) 每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主 备都写成功,才向应用返回成功,这种模式优缺点如下: 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据 可用性都非常高 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本 在主节点宕机后,备份不能自动切换为主机
-
集群特点
NameServer 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。 Broker 部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个slave,但是一个 Slave 只能对应一个Master,Master与Slave的对应关系通过指定相同BrokerName,不同的 BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个 Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定时从NameServer 取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。 Producer完全无状态,可集群部署。 Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer 取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、 Slave发送心跳。Consumer既可以从Master订阅信息,也可以从Slave订阅消息,订阅规则 由Broker配置决定。
-
各集群间关系
-
Producer集群
与nameserver的关系 单个Producer和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉, 生产者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。 与nameserver之间没有心跳。 与broker的关系 单个Producer和与其关联的所有broker保持长连接,并维持心跳。 默认情况下消息发送采用轮询方式,会均匀发到对应Topic的所有queue中。 最佳实践 1.一个应用尽可能只使用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。 只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。 2.每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。 服务器会为每个消息创建索引(哈希索引),应用可以通过 Topic,key 来查询返条消息内容, 以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。 3.消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。 4.对于消息不可丢失应用,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定 时程序尝试重发或者人工触发重发。 5.某些应用如果不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。
-
Producer集群
与nameserver的关系 单个Consumer和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉, 消费者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。与nameserver之间没有心跳。 与broker的关系 单个Consumer和与其关联的所有broker保持长连接,并维持心跳,失去心跳后, 则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费。 最佳实践 1.Consumer 数量要小于等于queue的总数量,由于Topic下的queue会被相对均匀的分配给 Consumer,如果 Consumer 超过queue的数量,那多余的 Consumer 将没有queue可以消费消息。 2.消费过程要做到幂等(即消费端去重),RocketMQ为了保证性能并不支持严格的消息去重。 3.尽量使用批量方式消费,RocketMQ消费端采用pull方式拉取消息,通过 consumeMessageBatchMaxSize参数可以增加单次拉取的消息数量,可以很大程度上提高消费吞吐量。 另外,提高消费并行度也可以通过增加Consumer处理线程的方式,对应参数 consumeThreadMin和consumeThreadMax。 4.消息发送成功或者失败,要打印消息日志。
-
-
补充
-
线上建议关闭autoCreateTopicEnable配置
该配置用于在Topic不存在时自动创建,会造成的问题是自动新建的Topic只会存在于一台broker上, 后续所有对该Topic的请求都会局限在单台broker上,造成单点压力。
-
broker master宕机情况是否会丢消息
broker master宕机,虽然理论上来说不能向该broker写入但slave仍然能支持消费, 但受限于rocketmq的网络连接机制,默认情况下最多需要30秒,消费者才会发现该情况, 这个时间可通过修改参数pollNameServerInteval来缩短。这个时间段内,发往该broker的请求都是失败的, 而且该broker的消息无法消费,因为此时消费者不知道该broker已经挂掉。 直到消费者得到master宕机通知后,才会转向slave进行消费,但是slave不能保证master的消息100%都同步过来了, 因此会有少量的消息丢失。但是消息最终不会丢,一旦master恢复,未同步过去的消息仍然会被消费掉。
-
总结
-
掌握rocketmq流程
基础部分 mq的介绍 作用 注意事项 各mq产品比较 rocketMQ环境搭建 安装rocketmq 启动rocketmq 测试rocketmq 关闭rocketmq rocketmq高可用集群搭建 集群各个角色介绍 集群搭建方式 双主双从集群搭建 集群监控平台 各个消息发送样例 同步消息 异步消息 单向消息 顺序消息 批量消息 过滤消息 事务消息 项目实战 项目背景 功能分析 项目环境搭建 下单功能,保证各服务的数据一致性 确认订单功能,通过消息进行数据分发 整体联调 高级功能和源码分析 高级功能 消息的存储和发送 消息存储结构 刷盘机制 消息的同步复制和异步复制 负载均衡 Producer负载均衡 Consumer负载均衡 源码分析 路由中心NameServer NameServer架构设计 NameServer启动流程 NameServer路由注册和故障剔除 消息生产者Producer 生产者启动流程 生产者发送消息流程 批量发送 消息存储 消息存储流程 存储文件与内存映射 存储文件 实时更新消息消费队列和存储文件 消息队列与索引文件恢复 刷盘机制 过期文件删除机制 消息消费Consumer 消费者启动流程 消息拉取 消息队列负载均衡和重新分布机制 消息消费过程 定时消息机制 顺序消费