RocketMQ消息队列——学习笔记

  • Post author:
  • Post category:其他
  • 消息队列是一种异步的服务间通信方式,是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

消息服务介绍和使用场景

什么是AMQP: 即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端中间件不同产品,不同的开发语言等条件的限制。

什么是JMS:java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口

JMS是一种与厂商无关的API。

使用场景:

核心应用:

解耦:订单系统 >> 物流系统

异步:用户注册 >> 发送邮件,初始化信息

削峰:秒杀、日志处理

跨平台、多语言

分布式事务、最终一致性

RPC调用上下游对接、数据源变动》通知下属

主流消息中间件框架对比

  1. Apache ActiveMQ
    Apache出品,历史悠久,支持多种语言的客户端和协议Java,NET,c++等,基于JMS Provider实现。吞吐量不高,多队列的时候性能下降,存在消息丢失的情况,比较少大规模使用。
  2. Kafka
    是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafaka是一种高吞吐量的分布式发布订阅消息系统,它可以处理大规模的网站中的所有动作流数据(网页浏览,搜索和其他用户行动),副本集机制,实现数据冗余,保持数据尽量不丢失,支持多个生产者和消费者。不支持批量和广播消息,运维难度大。
  3. RabbitMQ
    是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端。。。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方便表现不错。使用Erlang开发,阅读和修改源码难度大
  4. RocketMQ
    阿里开源的消息中间件,Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点,性能强劲(零拷贝技术),支持海量堆积,支持指定次数和时间间隔的失败消息重发,支持consumer端tag过滤,延迟消息等,在阿里内部大规模使用,适合在电商、互联网金融等领域使用。因为是阿里内部从实践到产品的产物,因此里面很多接口、API并不是很普遍适用。

RocketMQ介绍

是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  1. 能够保证严格的消息顺序
  2. 提供丰富的消息拉取模式
  3. 高效的订阅者水平扩展能力
  4. 实时的消息订阅机制
  5. 亿级消息堆积能力
  6. 支持分布式事务

RocketMQ核心概念

  1. 消息模型(Message Model)
    RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

消息生产者(Producer):
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
消息消费者(Consumer):
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
主题(Topic):
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
消息队列(Message Queue):
正真存放消息的地方,每一个主题下面会有多个队列
标签(Tag):
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
组(Group):
生产者组(Producer Group)
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
消费者组(Consumer Group)
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。

  1. 部署结构RocketMQ的服务端有两部分组成:
    1. 代理服务器(Broker Server)
      消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
      1. Broker面向Producer和Consumer接收和发送消息
      2. 向NameServer提交自己的消息
      3. 时消息中间件的消息存储/转发服务器
      4. 每个Broker节点,启动时,都会遍历NameServer列表,与每个都建立长连接,注册自己信息,定时上报
      5. Broker集群
        1. Broker高可用,可以配成Master/Slave结构,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave。
        2. 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master
        3. Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义BrokerId为0表示Master,非0表示Slave
    2. 名字服务(Name Server)
      名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。


学习笔记,整理不易。你的支持,我的动力!

更多学习资料,IT系列课程,请关注vx公众号:豆萌萌 网课大咖

vx:dmm_wkdc

为您提供全网最全的学习资料

更有面试题整理,金三银四冲刺,IT电子书籍等

你需要的,我恰好有,愿意推荐给你哦!


消息类型

  1. 普通消息
    消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
  2. 普通顺序消息
    普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
  3. 严格顺序消
    严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
  4. 延时消息
    是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
    · level == 0,消息为非延迟消息
    · 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
    · level > maxLevel,则level== maxLevel,例如level==20,延迟2h
    定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入 特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有 相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消 息写入真实的topic。
    需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
  5. 事务消息
    RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义 到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

顺序消费

MQ实现了顺序消费

顺序消费的原理
1、首先,需要顺序消费的消息,需要保证发消息发到同一个TOPIC中
2、MQ会将顺序消息,发送到同一个QUEUE中,因为队列本身就会保证先进先出(如果自己实现,需要,new MessageQueueSelector(消息选择器),通过逻辑代码保证这些消息选择到了同一个QUEUE进行发送)
3、发送的时候要保证是一个线程发出的消息
4、消费者端可以set一个MessageListener,其中MessageListenerOrderly对象,是顺序消费的消息监听器,他对每一个QUEUE开启一个线程。保证一个QUEUE中的消息,只被一个线程消费


需要保证的是,发送消息的顺序,需受到手动的控制才可以

MQ的事务消息

2PC(MQ使用此模式)

  • 第一阶段:尝试提交
  • 第二阶段:本地事务确认后,再确认一次提交
    • 或者回滚

TCC三阶段提交

  • try:尝试着发送消息,如果发送成功,则将要修改的数据lock锁住,成功锁住,即Try成功
  • 执行本地事务
  • 当本地事务成功时,执行confirm,当本地事务失败时,执行cancel
1、发送端开启事务
2、发送half半消息给Broker
3、接到半消息后,写入half消息专属的消息队列,并刷盘
4、回应发送方,半消息发送成功
5、发送方开始执行本地事务
6、执行成功,则撤销half消息队列中的消息,并发送到真正的消息队列中去
7、执行失败,则撤销半消息
8、当未收到本地事务执行结果时,Broker会开启定时任务,去查询事务执行结果
9、发送方有一个回调方法,当Broker查询事务执行结果时,进入回调方法。
10、超过最大查询次数,则丢弃消息,回滚
1、事务消息,使用的类:TransactionMQProduer
2、需要设置一个事务监听器,
    (1)重写执行本地事务的逻辑,return事务执行的状态
    (2)重写检查事务的回调方法,return事务执行状态
3、启动Producer,并发送消息

消息传播模式

  1. 集群模式(Clustering)
    集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
  2. 广播模式(Broadcasting)
    广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

消息消费模式

  1. 拉取式消费(Pull Consumer)
    Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消 息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
  2. 推动式消费(Push Consumer)
    Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式 一般实时性较高。

消息刷盘

  1. 同步刷盘:
    只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回Producer 端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障, 但是性能上 会有较大影响,一般适用于金融业务应用该模式较多。
  2. 异步刷盘:
    能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给 Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的 性能和吞吐量。

消息重试

Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息 失败通常可以认为有以下几种情况:

由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消 息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而 这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过 10秒后再重试。

由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错 误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s, 再消费下一条消息,这样可以减轻Broker重试消息的压力。

RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列 (这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的), 用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要 一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时, 重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为 “SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重 新保存至“%RETRY%+consumerGroup”的重试队列中。

消息从投

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息 重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中 是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重 复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。如下 方法可以设置消息重试策略:

retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送 retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最 大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现 RemotingException、MQClientException和部分MQBrokerException时会重投。

retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他broker,仅 在同一个broker上做重试,不保证消息不丢。

retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非 SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。

死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存 储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使 用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

RocketMQ安装与启动

  1. 服务器安装下载:https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip解压:unzip rocketmq-all-4.6.0-bin-release.zip根据机器内存情况设置运行内存:
    1. 修改runserver.sh
      JAVA_OPT=”${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m”
    2. 修改runbroker.sh
      JAVA_OPT=”${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g”
    3. 修改tools.sh
      JAVA_OPT=”${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m”
      运行 name server
      nohup sh bin/mqnamesrv &
      查看启动日志
      tail -f ~/logs/rocketmqlogs/namesrv.log
      运行broker
      nohup sh bin/mqbroker -n localhost:9876 &
      查看启动日志
      tail -f ~/logs/rocketmqlogs/broker.log
      关闭服务
      sh bin/mqshutdown broker
      sh bin/mqshutdown namesrv
      要想完全清空数据,删除文件夹~/store,然后重启
  2. 控制台安装
    下载: git clone https://github.com/apache/rocketmq-externals.git
    找到rocketmq-console/src/main/resources/application.properties 根据需求,修改配置
    server.port=8081
    name server地址
    也可以不修改,在启动完console后,在控制台导航栏 – 运维 – NameSvrAddrList一栏设置
    修改 pom.xml ,修改RocketMQ相关依赖的版本
    4.7.1
    切换到控制台目录 cd rocketmq-console
    mvn clean package -DskipTests

Springboot集成RocketMQ

  1. 导包
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>
  1. 发送消息
@Autowired
private RocketMQTemplate rocketMQTemplate;
//1,发送同步消息
@GetMapping("/send/{msg}")
public String sendMessage(@PathVariable("msg") String msg) {
    //携带tag与key
    Message<String> message = MessageBuilder.withPayload(msg)
            .setHeader("KEYS", UUID.randomUUID().toString())
            .build();
    rocketMQTemplate.send("msgtest:order_pay", message);
    return "同步发送成功";
}
//2,发送异步消息
@GetMapping("/send2/{msg}")
public String sendMessage2(@PathVariable("msg") String msg) {
    Map<String, Object> maps = new HashMap<>();
    aps.put("KEYS", UUID.randomUUID().toString());
    rocketMQTemplate.asyncSend("msgtest:order_pay", 
    MessageBuilder.withPayload(new MessageDto(msg)).setHeader("KEYS",                                                                          UUID.randomUUID().toString()).build(),
    new SendCallback() {    
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("发送成功: " + sendResult);
        }
        @Override
        public void onException(Throwable e) {
            System.out.println("发送失败: " + e.getMessage());
        }
    });
    return "异步发送成功!";
}
//3,发送单项消息
@GetMapping("/send3/{msg}")
public String sendMessage3(@PathVariable("msg") String msg) {
    Map<String, Object> maps = new HashMap<>();
    maps.put("KEYS", UUID.randomUUID().toString());
    rocketMQTemplate.sendOneWay("msgtest:order_pay",MessageBuilder.withPayload(
        new MessageDto(msg)).setHeader("KEYS", UUID.randomUUID().toString()).build());
    return "发送单项消息成功!";
}
//4,发送顺序消息
@GetMapping("/send5")
public String sendMessage5() {
    Map<String, Object> maps = new HashMap<>();
    maps.put("KEYS", UUID.randomUUID().toString());
    for(int i = 0; i<10;i++) {
        String msg = "order: " + i + " : " + System.currentTimeMillis();
        rocketMQTemplate.syncSendOrderly(
                "msgtest:order_pay", 
                MessageBuilder.withPayload(new MessageDto(msg)).setHeader("KEYS", 
                        UUID.randomUUID().toString()).build(), 
                //hash值匹配一个消息队列的,所有消息的hash值现在一样,会放入一个队列中
                "message"
        );
    }
    return "发送顺序消息成功!";
}
//5,发送延时消息
@GetMapping("/send4/{msg}")
public String sendMessage4(@PathVariable("msg") String msg) {
    Map<String, Object> maps = new HashMap<>();
    maps.put("KEYS", UUID.randomUUID().toString());
    rocketMQTemplate.syncSend("msgtest:order_pay", 
            MessageBuilder.withPayload(new  MessageDto(msg)).setHeader("KEYS", 
                        UUID.randomUUID().toString()).build(), 
            3000, 
            //延迟等级
            2);
    return "发送延迟消息成功!";
}
  1. 消费消息
@Component
@RocketMQMessageListener(consumerGroup = "order-group",topic = "msgtest",
       selectorExpression = "order_pay"
       //设置顺序(单线程)消费,默认是多线程异步消费
       //,consumeMode = ConsumeMode.ORDERLY
       //广播模式
       //,messageModel = MessageModel.CLUSTERING
)
public class ConsumerListener implements RocketMQListener<MessageDto> {
    @Override
    public void onMessage(MessageDto message) {
        System.out.println("消费消息");
        System.out.println(message);
    }
}
  1. 处理事务消息
//发送事务消息
@GetMapping("/send6")
public String sendMessage6() {
    Map<String, Object> maps = new HashMap<>();
    aps.put("KEYS", UUID.randomUUID().toString());
    //自定义该消息的事务ID
    String txId = UUID.randomUUID().toString();
     //处理事务业务逻辑需要用到的对象
    User user = new User(2000, "张三丰", 100);
    //发送半消息
    rocketMQTemplate.sendMessageInTransaction("msgtest:order_pay", 
            MessageBuilder.withPayload(new MessageDto(2000+""))
                   .setHeader("KEYS", UUID.randomUUID().toString())
                   .setHeader("txId", txId).build(), 
            user);
    return "发送半消息成功!";
}
//2,本地事务执行以及回查程序
@Component
@RocketMQTransactionListener
public class LocalTransactionMessageListener implements RocketMQLocalTransactionListener {
    //逻辑当中需要的业务处理对象
    @Autowired
    private UserService us;
    //用来保存事务执行成功的日志mapper对象
    @Autowired
    private RocketTransactionLogMapper rocketTransactionLogMapper;
    //执行本地事务逻辑
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("执行本地事务");
        User user = (User)arg;
        String txId = msg.getHeaders().get("txId", String.class);
        try {
            us.addUser(txId, user);
            System.out.println("本地事务执行成功");
            return RocketMQLocalTransactionState.COMMIT;
        }catch(Exception e) {
            e.printStackTrace();
            System.out.println("本地事务执行失败");
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    /*
    事务回查的时候需要调用的回查方法,通过日志 里面保存的事务日志去检查事务执行情况
    根据事务日志id去查询日志
    */
    @Override
    public RocketMQLocalTransactionState checkLocalTransactionMessage msg) {
        System.out.println("broker没拿到回执消息,执行本地事务检查");
        String txId = msg.getHeaders().get("txId", String.class);
        RocketTransactionLog rs = rocketTransactionLogMapper.selectOne(
                new QueryWrapper<RocketTransactionLog>().eq("r_txid", txId));
        if(rs == null) return RocketMQLocalTransactionState.ROLLBACK;
        return RocketMQLocalTransactionState.COMMIT;
    }
}

ASC

MQ可以在配置文件中,设置账号密码,开设白名单,限制对TOPIC以及GROUP的操作限制


学习笔记,整理不易。你的支持,我的动力!

更多学习资料,IT系列课程,请关注vx公众号:豆萌萌 网课大咖

vx:dmm_wkdc

为您提供全网最全的学习资料

更有面试题整理,金三银四冲刺,IT电子书籍等

你需要的,我恰好有,愿意推荐给你哦!


RocketMQ高性能原理

读队列与写队列

读队列

  • 读队列用于维护Offset

写队列

  • 写队列会创建对应的存储文件

1.消息实际上是写入写队列中的,然后写队列会同步消息到对应的读队列中

2.消费者拉取消息,实际是从读队列获取的


3.一般写队列要与读队列进行对应,默认也是对应的
	如果写队列多与读队列,则会造成消息丢失
	如果读队列多与写队列,则会造成消费者空转,极大浪费性能

4.对队列进行缩容时,可以短暂存在读写队列不对应的情况:
	可以先移除写队列,等待对应读队列的消息消费完毕后,再移除读队列

消息持久化

1.默认的消息持久化,目录是在用户的home目录下的store文件夹中

2.消息存储路径可配置,生产环境下建议单独配开

存储目录Stroe解析

1.about 大小0,无内容
	用于判断服务是否正常关闭。
	在服务启动时,会创建该文件,服务正常停止时,会删除该文件
	根据是否存在about文件判断上次是否正常关闭,从而做出相应补救措施。

2.checkpoint 存盘点,用于判断存盘进度

3.config 配置文件目录
包含topic、offset、group、consumerFilter等配置

4.lock 表示该文件夹已经被某个启动的服务占用
	当启动了多个MQ服务时,通过是否有lock文件,判断该目录是否已被占用,从而防止不同的服务数据相互影响

CommitLog文件夹

  • CommitLog文件,实际存放消息,文件固定大小,最大1g,第一个文件备注是000000000000000,后续文件按偏移量备注,如第二个文件存10001开始的消息,则为00000000010001
1.文件大小:固定1G

2.文件命名:用消息偏移量命名
	000000000000000000000
	实际上代表的是当前文件所存储消息的偏移量
	若第一个文件共存储了10000条消息,那么第二个文件的命名,则以解下来的偏移量来命名:
	000000000000000010001

3.存储消息的格式:二进制

4.由于消息大小不确定,那么每次在写入消息时,需要判断当前文件是否装的下,装不下则新建

对比Kafka:
	kafka存储数据,是根据每个topic下的partition,来建立一个独立的文件
	这也造成了kafka需要对每个topic来维护数据,影响是当topic增多时,影响性能

Rocket MQ:
	所有数据存到CommitLog文件夹下,不存在topic增多导致性能下降的问题

ConsumeQueue

  • 用来给Consumer端过滤消息的文件
1.按TOPIC来维护,每个TOPIC中的QUEUE,对应一个文件

2.保存的固定格式的数据:
偏移量
消息大小
tag

3.每个文件,由30W个20byte的数据块组成的。相当于每个文件存储30w个消息

4.由于tag属性被写到了ConsumeQueue文件里面,这也是为什么MQ建议使用tag检索消息的原因

5.由于有了此文件,让consumer进行拉取消息时,可以选择以下两个条件拉取(详情见下图):
	CONSUME_FROM_LAST_OFFSET
	CONSUME_FROM_FIRST_OFFSET

6.通过ConsumeQueue文件中的偏移量,可以定位到消息体在CommitLog中的位置(CommitLog文件以偏移量命名)

Index

  • 索引文件
1.这种索引文件,主要是为了让consumer拉取消息时,可以使用按时间戳的方式拉取(见上图):
	CONSUME_FROM_TIMESTAMP

2.命名方式:时间戳方式命名

3.文件格式:二进制

4.文件内容:
	indexHeader:40字节
	solt:500W个*20字节
	index:500W个*4字节

IndexFIle,索引文件,基于时间戳的方式维护

过期文件删除

定时任务删除

  • 可以通过配置指定其触发时间、频率来删除过期文件

磁盘容量阈值删除

  • 磁盘容量达到阈值,则进行过期文件删除
1.首先判断过期文件
	可以通过偏移量大小来判断,对于非当前写文件,如果超过一定保留时间,那么文件都会被认为是过期文件,随时可以删除

2.如何删除-定时任务删除
	内部有定时任务,对文件扫描,扫描出需删除的过期文件在指定时间来进行删除,在配置文件中可进行性配置

3.如何删除-根据 根据磁盘剩余容量阈值
	默认磁盘阈值88%,可进行配置,如果磁盘空间不够了,则会触发删除,而这也是MQ可能存在消息丢失的原因之一

RocketMQ文件读写

零拷贝

  • 运用了零拷贝技术是RocketMQ高性能的原因之一,它从操作系统层面提供了一种加速文件读写的机制
零拷贝,本质上就是减少了操作系统层面获取数据时CPU的复制IO操作,从而提升效率

1.一般文件读写:
	用户空间向内核申请操作————由内核向磁盘进行文件读写操作(第一次IO:磁盘复制文件到内核)————内核拿到文件后交给用户空间(第二次IO:从内核复制文件到用户空间)————用户空间拿到数据后,发送给网卡(第三次IO:用户空间复制文件到网卡)

2.而0拷贝减少了实际的文件IO操作次数
	原本复制文件,现在只是传递文件在磁盘上的索引,如此则省去了大量的复制IO,提升了效率

DMA

  • DMA是一套指令集,存在的意义是帮助CPU执行文件读写操作
1.可以把DMA想象成CPU的小弟,以前文件IO操作都需要自己完成,现在有了DMA,每次DMA要进行IO操作时,向CPU申请权限就行了,解放了CPU

2.DMA操作数据需要占用数据总线,所以IO操作还是会造成数据总线的紧张,在后续的优化中引入了Channel,Channel有自己独立的消息总线

从而提升了文件读写效率

顺序写

  • 普通的写文件,采用随机写操作,会在内存中产生很多的内存碎片,同时在写文件时,会存在大量的内存寻址(寻找足够放置文件大小的空间)等操作,影响性能
1. 预先分配一块连续的空间,仅供MQ使用,写入磁盘的时候,不需要再进行磁盘寻址,从而节省了时间,提升了性能

刷盘

异步刷盘

  • 效率高,吞吐量高,但可能存在消息丢失

同步刷盘

  • 保证消息不丢失,但效率低,影响吞吐量
1.异步刷盘:在内存往磁盘刷盘的操作,是异步执行的

2.通过定时任务执行,有延迟

3.异步刷盘的效率高,但是也存在消息丢失的可能

1.同步刷盘:每个消息都进行同步刷盘

2.刷盘成功后返回消息

3.保证了消息不丢失,但是占用系统资源,性能也会有影响

消息主从复制

1.如果Broker以集群的方式部署,会有一个master节点与多个slave节点

2.消息要从master复制到slave上

3.消息也分为同步复制与异步复制

异步复制

  • 异步复制:只要Master写入成功,就返回客户端写入成功,然后再异步的将消息复制给Slave节点

同步复制

  • 等Master与Slave都写入成功后,才反馈给客户端写入成功的消息
1.异步复制:
	系统拥有较低延迟和较高的吞吐量
	但如果master节点故障,slave节点未完成复制操作,那么存在消息丢失的可能

2.同步复制:
	保证了如果Master节点故障,Slave上也有全部的数据备份
	但是会增大数据写入的延迟,降低吞吐量

学习笔记,整理不易。你的支持,我的动力!

更多学习资料,IT系列课程,请关注vx公众号:豆萌萌 网课大咖

vx:dmm_wkdc

为您提供全网最全的学习资料

更有面试题整理,金三银四冲刺,IT电子书籍等

你需要的,我恰好有,愿意推荐给你哦!


负载均衡

Producer负载均衡

  • Producer采用轮询的方式进行负载均衡操作

1.目的:将消息均衡的发送到Broker中去

2.采用轮询的方式

3.有判断机制,如果上次轮询时,有Broker发送失败,那么再发送时尽量会跳过这个Broker

Consumer端负载均衡

广播模式

  • 所有的Consumer都发,所以不存在负载均衡

集群模式

  • 集群模式下,需要使用到负载均衡
集群模式下的负载均衡

首先,保证每一条消息只会被同一个Group下的一个Consumer所消费的前提下

1.平均分配
2.轮询
3.同机房优先分配
4.Hash环:consumer计算hash值,连成一个环(最后一个节点连着第一个,感觉像是一个环),再计算messageQueue的hash值落在哪个consumer哈希的范围内,进行分配

每次Consumer实例数量发生变化时,都会重新负载均衡一次

消息重试机制

  • 当Broker往Consumer发送消息失败时,会重新往其他Consumer推送
  • %RETRY%开头的队列,是系统创建的重试队列

1.重试队列,是针对Group的,每个Group组有一个重试队列

2.因为消息是按组划分的,别的组是否推送和消费,和自身的组无关

3.Group组内的所有的TOPIC的消息都会放在一个队列中

4.重试次数:最大16次,系统对于重试的频率间隔时间,是按照延迟消息的级别对应的

5.延迟消息共有18个级别,重试次数采用了后16个级别

6.可以设置时间,但是一般不建议,因为源码中有些定时任务执行时间也采用了这些级别规定的时间

  • 并发消息
    • 最大次数,16次
  • 顺序消息
    • 默认的重试次数是int的最大值(因为要保证顺序)
  • 发送超时时间,默认3000毫秒
  • 可以通过方法设置发送次数
// 同步消息重试次数
producer.setRetryTimeswhenSendFailed(int 次数)
// 异步消息重试次数
producer.setRetryTimeswhenSendAsynFailed(int 次数)

死信队列

  • 消息超过重试最大次数,Rocket MQ认为该消息有问题,则会将其发送到死信队列
1.死信队列命名为:
	%DLQ%+ConsumGroup

2.permission默认为2:
	2是不可读不可写
	MQ认为进入死信队列的消息,是有问题需要人工干预的消息,所以需要人工将permission设置为4或6,再进行处理

消费幂等性

MQ系统中,对于消息幂等又三种实现语义

  • at most once 最多一次: 每条消息最多只会被消费一次。
  • at least once (Rocket MQ保证的):至少一次: 每条消息至少会被消费一次
  • exactly once 刚刚好一次: 每条消息都只会确定的消费一次
1.这三种语义都有他适用的业务场景

2.其中,at most once是最好保证的:RocketMQ中可以直接用异步发送sendOneWay单路消息的方式就可以保证


3.at least once,至少会消费一次,但是这种情况,有可能出现重复消费
1.消息体MessageExt中,msgId属性,是消息的唯一ID

2.但是由于消息重投的原因,同一个消息可能会被投递多次,所以MQ无法保证是唯一出现,有可能有两个消息的msgId是一样的

3.所以官方建议,不要使用msgId作为幂等性的判断

4.send消息时,有一个参数keys,通过setKeys()方法可以传一个业务上唯一id,手动保证消息的幂等

  • RocketMQ的商业版,可以保证exactly Once,但是没有开源

Dledger集群

基于Raft算法进行构建

RocketMQ中的Dledger集群主要包含两个功能:

1、从集群中选举产生master节点。2、优化master节点往slave节点的消息同步机制

Consumer启动流程

进入DefaultMQPushConsumer,的start()方法
1、this.checkConfig(); # 检查指定的配置是否有误
2、this.copySubscription(); # 订阅列表复制一份
3、this.mQClientFactory # 初始化了连接工厂
4、this.rebalanceImpl # 设置消息重新负载策略
5、new PullAPIWrapper # 初始化pull模型的请求包装器
6、this.offsetStore # 得到消息消费的offset,根据集群和广播模式不同来新建offset存储器
7、this.consumeMessageService # 初始化消费服务,顺序消费服务或并行消费服务
8、registerOK = mQClientFactory.registerConsumer # 向Broker注册自己(consumer)的信息

PullRequest

  • 从Broker拉取到的消息,被封装在PullRequest中,是Consumer向Broker拉取消息的返回对象
PullRequest内部的属性

1、consumerGroup  # 消费组
2、MessageQueue # 消息拉取回来的元数据信息
3、ProcessQueue # 消息体
4、nextOffset # 下一个偏移量

ProcessQueue 消息体

属性
1、msgAccCnt #服务器端还有多少条消息积压
2、consuming #是否在消费中
3、queueOffsetMax #当前队列偏移量最大值(当前队列中消息被读取进度)


方法简介
1、cleanExpiredMsg() # 并发消费情况下,找出超过15分钟没有被消费成功的消息(消费超时消息),移除发回给服务端

RocketMQ源码阅读

消费者端

DefaultMQPushConsumer

start启动流程
1、checkConfig() 参数检查:检查set的参数是否合法(格式/长度/是否和默认group重名等等)
2、copySubscription() 复制Subscription
3、如果是集群消费模式,则配置一个PID
4、mqClientFactory:初始化一个mqClientFactory,初始化连接工厂
5、rebalanceImpl:初始化消息重新负载策略
6、pullAPIWrapper:初始化pull模型的请求包装器
7、offsetStore:初始化offsetStore,确定由哪一方来维护Offset(广播模式,Consumer自己维护 集群模式,Broker维护)
8、初始化consumerListenerSevice:初始化具体的消费服务,是根据Lisenter决定的
9、执行consumerListenerSevice的start()方法,启动消费消息的服务
10、registerConsumer:向Broker注册自信Consumer的信息
11、Consumer状态改变成RUNNING

RocketMQ拉取消息:长轮询

  • 长轮询介于:轮询和长连接之间
  • 轮询:Client每隔一段时间,向Server端发起请求,查询是否有新的消息,每次请求都需要建立连接
    • 这样做效率很低,并且消息有延迟
  • 长连接:Client和Server端建立双向的长连接,有新消息是Server端推给Client端
    • 服务器要保持连接,并且维护客户端状态,性能开销相比轮询更高
    • 服务器也不知道Consumer的消费能力,消息是否会有堆积

长轮询

1、Client向Server端建立连接
2、Server端判断是否有新的消息存在
3、当没有消息时:不断开连接,但进入连接挂起阶段
4、当有消息时,则推送消息给Client
5、Client进行消息消费,消费完成后,再向服务端发起推送消息的请求

消息主动权在Client端,根据Client端自己的消费能力去拉取消息
服务端也不需要花费更多资源去维护Consumer的相关信息

源码:pullMessage();

所在位置:DefaultMQPushConsumerImpl
此方法,是在Consumer端将消息拉取到之后,被调起

此方法中接收了一个入参:PullRequest

PullRequest

  • 拉回来的报文包装类
ConsumerGroup:消费者组
MessageQueue:消息元数据
ProcessQueue:消息体(真正的消息)

topic				
brokerName		
queueId

ProcessQueue

  • 一次性拉取回来之后,实际的消息体
ReadWriteLock lockTreeMap 读写锁,读写操作的时候用到的锁
TreeMap msgTreeMap 存放待处理消息的map,TreeMap可以保证顺序

清理过期消息方法:cleanExpireMsg()

  • 只在并发消费时会执行此方法,顺序消费消息时,不会进入该方法清理消息

1、一次只处理最多16条消息
2、从待处理消息msgTreeMap中,获取map中第一条消息,查看是否处理超时(默认15分钟),如果超时,则取出
3、将取出的过期消息,则发送回Broker
4、删除过期消息

初始化ProcessQueue消息的属性:putMessage(msgs)

  • 从Broker拉回消息后,进入putMessage方法,设置一些属性
  • 该方法返回的布尔值,是供顺序消费使用的
1、从客户端中接收到的消息msgs,循环put进msgTreeMap待处理消息列表中(key:offset,value:消息)
2、每put一个,则该Queue当前处理到的offset更新为该消息的offset
3、取出msgs(服务器发来的)最后一条消息,拿到offset(相当于拿到本次处理的消息中最大的offset)
4、服务器设置的最后一条偏移量-当前列表最后一条消息的偏移量
5、如果>0,则设置在msgAccCnt属性中

计算两个偏移量的差值:getMaxSpan()

计算待处理消息msgTreeMap
最大偏移量-最小偏移量

删除消息方法:removeMessage()

  • 再处理过期消息和已消费成功的消息后,会调起该方法
1、主要操作就是for循环依次处理要删除的消息,并做好数量记录等工作

顺序消费消息时使用的:rollback()

顺序消息时,会将msgTreeMap中的消息复制一份到consumingMsgOrderlyTreeMap中一份,处理时用后者来处理

当消息消费失败后
consumingMsgOrderlyTreeMap中的消息,重新推入msgTreeMap中
并清空consumingMsgOrderlyTreeMap

顺序消费消息时使用的:commit()

顺序消息时,会将msgTreeMap中的消息复制一份到consumingMsgOrderlyTreeMap中一份,处理时用后者来处理

当消息消费成功,调用commit方法
将保存消息的map,做相应的整理(数量的增减)

源码:订阅:subscribe()

所在位置:DefaultMQPushConsumerImpl
做了两件事
1、构建该Consumer关注的数据(TOPIC),在Consumer自身设置一下负载均衡
2、会设置定时向Broker端发送心跳包

发送心跳包方法:sendHeartbeatToAllBroker()

1、构建HeartbeatData 对象,包含了发送心跳包的内容
	ClintID,当前客户端ID
	producerEnpty 是否存在Producer
	producerEnpty 是否存在Consumer
2、判断producerEnpty和producerEnpty是否都为空,为空则不需要发心跳,return
	第一次进入,会有Consumer,但Producer是空的,因为第一次发心跳,还没有从Broker中拉取过Producer信息
3、包装好要发送的信息(连接超时时间,是什么语言发出的请求,请求类型设置为心跳连接等)
4、使用Netty封装了一层后,实际发送心跳连接请求

在订阅方法中,发送心跳只有一次
可以理解为,第一次发送心跳请求,是在订阅时发送的

源码:start()

1.检查连接nameServer地址
2.创建Netty客户端,执行两个定时任务	mQClientAPIImpl.start()
	删除过期请求
	将没有连接相应的nameServer断开
3.开启定时任务调度器,里面执行了多个定时任务 startScheduledTask()
	2分钟更新一次nameServer的地址
	30秒更新一次topic的路由信息,可配置
	30秒对Broker发送一次心跳检测,并将下线的broker删除
  5秒持久化一次consumer的offset(两个实现:本地存文件一份  上传至Broker一份)
	1分钟调整一次线程池,这个定时任务其实什么都没有执行
4.实际拉取消息,通过Netty向服务端拉取消息后进入回调方法
5.对消费者进行重新负载均衡,分发消息并进行消费
	得到订阅的map,for循环每个TOPIC
	从TOPIC中取出所有的MessageQueue

学习笔记,整理不易。你的支持,我的动力!

更多学习资料,IT系列课程,请关注vx公众号:豆萌萌 网课大咖

vx:dmm_wkdc

为您提供全网最全的学习资料

更有面试题整理,金三银四冲刺,IT电子书籍等

你需要的,我恰好有,愿意推荐给你哦!


版权声明:本文为xrq19950928原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。