SpringBoot 整合 rocketMQ 实战教学及RocketMQ讲解

  • Post author:
  • Post category:其他


版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。

本文链接:https://blog.csdn.net/weixin_44100514/article/details/96996988

————————————————

Name Server

Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

Broker

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。

Producer

Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。

Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。

Consumer

Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知。

Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。

当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦master恢复,未同步过去的消息会被最终消费掉。

消费者对列是消费者连接之后(或者之前有连接过)才创建的。我们将原生的消费者标识由 {IP}@{消费者group}扩展为 {IP}@{消费者group}{topic}{tag},(例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk)。任何一个元素不同,都认为是不同的消费端,每个消费端会拥有一份自己消费对列(默认是broker对列数量*broker数量)。新挂载的消费者对列中拥有commitlog中的所有数据。

整合无非就是添加坐标,连接mq,发送消息,消费消息

导入pom相关坐标:

org.apache.rocketmq

rocketmq-client

4.2.0

在application.properties里配置(当然yml也可以,注意空格):



消费者的组名

apache.rocketmq.consumer.PushConsumer=PushConsumer



生产者的组名

apache.rocketmq.producer.producerGroup=Producer



NameServer地址

apache.rocketmq.namesrvAddr=localhost:9876

创建生产者:

package com.mt.rocketmq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

import org.apache.rocketmq.remoting.common.RemotingHelper;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Component;

import org.springframework.util.StopWatch;

import javax.annotation.PostConstruct;

/**

  • Created by Administrator on 2019/7/23.

    */

    @Component

    public class RocketMQClient {

    /**

    • 生产者的组名

      */

      @Value(“${apache.rocketmq.producer.producerGroup}”)

      private String producerGroup;

    /**

    • NameServer 地址

      */

      @Value(“${apache.rocketmq.namesrvAddr}”)

      private String namesrvAddr;

    /**

    • @Author zhangxiaofeng

    • @Description //TODO 服务器加载Servlet的时候运行,并且只会被服务器调用一次

    • @Description //TODO 被@PostConstruct修饰的方法会在构造函数之后,init()方法之前运行

    • @Date 2019/7/23 14:47

    • @Param []

    • @return void


      /

      @PostConstruct

      public void defaultMQProducer() {


      //生产者的组名

      DefaultMQProducer producer = new DefaultMQProducer(producerGroup);

      //指定NameServer地址,多个地址以 ; 隔开

      producer.setNamesrvAddr(namesrvAddr);

      producer.setVipChannelEnabled(false);

      try {


      /


      * Producer对象在使用之前必须要调用start初始化,初始化一次即可

      * 注意:切记不可以在每次发送消息时,都调用start方法

      */

      producer.start();

       //创建一个消息实例,包含 topic、tag 和 消息体
       //如下:topic 为 "TopicTest",tag 为 "push"
       Message message = new Message("TopicTest", "push", "发送消息----zhangxiaofeng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
      
       StopWatch stop = new StopWatch();
       stop.start();
      
       for (int i = 0; i < 1; i++) {
           SendResult result = producer.send(message);
           System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
       }
       stop.stop();
       System.out.println("----------------发送一万条消息耗时:" + stop.getTotalTimeMillis());
      

      } catch (Exception e) {


      e.printStackTrace();

      } finally {


      producer.shutdown();

      }

      }

}

创建对应的消费者:

package com.mt.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

import org.apache.rocketmq.common.message.MessageExt;

import org.apache.rocketmq.remoting.common.RemotingHelper;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**

  • Created by Administrator on 2019/7/23.

    */

    @Component

    public class RocketMQServer {

    /**

    • 消费者的组名

      */

      @Value(“${apache.rocketmq.consumer.PushConsumer}”)

      private String consumerGroup;

    /**

    • NameServer 地址

      */

      @Value(“${apache.rocketmq.namesrvAddr}”)

      private String namesrvAddr;

    @PostConstruct

    public void defaultMQPushConsumer() {


    //消费者的组名

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

     //指定NameServer地址,多个地址以 ; 隔开
     consumer.setNamesrvAddr(namesrvAddr);
     consumer.setVipChannelEnabled(false);
     try {
         //订阅PushTopic下Tag为push的消息
         consumer.subscribe("TopicTest", "push");
    
         //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
         //如果非第一次启动,那么按照上次消费的位置继续消费
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
         consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
             try {
                 for (MessageExt messageExt : list) {
    
                     System.out.println("messageExt: " + messageExt);//输出消息内容
    
                     String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
    
                     System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//输出消息内容
                 }
             } catch (Exception e) {
                 e.printStackTrace();
                 return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
             }
             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
         });
         consumer.start();
     } catch (Exception e) {
         e.printStackTrace();
     }
    

    }

}

详细说明一下@PostConstruct这个注解

这个注解在方法上是来修饰非静态的 void() 方法,而且这个方法是不可以抛出异常声明的

被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器调用一次,类似于Serclet的inti()方法。被@PostConstruct修饰的方法会在构造函数之后,init()方法之前运行。

消费成功后,如图:

我这里消费了两条,是因为我之前就已经发送过一条