rocketmq&rabbitmq基本对比及接口封装

  • Post author:
  • Post category:其他


Rabbitmq学习以及基本接口封装

基于工程效率团队线上已有消息中间件在使用,为了统一技术方案,所以采用rabbitmq作为消息中间件。rocketmq和rabbitmq这两种消息中间件略有区别:

Rocketmq

Rabbitmq

Java开发,便于二次开发

Erlang语言开发

管理后台:cluster、topic、producer、consumer、nameserver。无登录账号密码

管理后台:connection、channel、exchange、queueus、admin。有登陆账号密码

Broker向Nameserver注册topic,producer从namesrver获取topic,进而向关联broker发送消息

通过exchange绑定不同queues进行消息派送

消费端支持集群消费和广播消费;广播消费下,同一个consumergroup下所有消费实例都会共享消费消息内容

同一个queue下所有消费实例会均分消息内容

支持事务消息

不支持事务消息

Broker支持通过tag过滤

Exchange绑定特定queues,每个queue为特定服务使用,两个不同服务如果采用同一个queue,那么在从这个队列进行消息消费时只能通过消息具体内容进行区分

通过将需要顺序的消息发送到同一个queue里保证顺序消息的功能

Exchange绑定固定的queue,实现顺序消息

接口封装步骤:

pom添加依赖

通用依赖:

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>1.5.9.RELEASE</version>

<relativePath/> <!– lookup parent from repository –>

</parent>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

<version>1.5.2.RELEASE</version>

</dependency>

添加rabbitmq服务配置信息

# rabbitmq配置

spring:

rabbitmq:

addresses:

172.16.4.201

username: admin

password: Pass@1234

Rabbitmq三种交换机模式:

a)         Direct Exchange

(直连):传递时需要一个Routekey,通过Routkey寻找对应队列;这种模式Exchange不需要绑定(binding)queue

例子:

BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);

b)         Fanout Exchange

(广播):任何发送到Fanout Exchange的消息都会转发到与该Exchange绑定(bingding)的所有queue上;这种模式不需要routkey

例子:BindingBuilder.bind(queueA).to(fanoutExchange);

c)         Topic Exchange

可以理解为direct exchange+fanout exchange;任何发送到Exchange的消息都会被转发所有关心Routekey中所指定的queue上;

需要指定routekey;exchange需要绑定(binding)queue;

模糊匹配:#表示0个或若干个关键字, “”表示一个关键字。如

log.

”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。

例子:BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);

针对不同交换机模式具体rabbitmq配置文件如下:


public


class

RabbitConfig {


private


final

Logger

logger

= LoggerFactory.

getLogger

(

this

.getClass());

@Value(“${spring.rabbitmq.host}”)


private

String host;

@Value(“${spring.rabbitmq.port}”)


private


int

port;

@Value(“${spring.rabbitmq.username}”)


private

String username;

@Value(“${spring.rabbitmq.password}”)


private

String password;

//rabbitmq连接池

@Bean


public

ConnectionFactory connectionFactory() {

CachingConnectionFactory connectionFactory =

new

CachingConnectionFactory(host, port);

connectionFactory.setUsername(username);

connectionFactory.setPassword(password);

connectionFactory.setVirtualHost(“/”);

connectionFactory.setPublisherConfirms(

true

);


return

connectionFactory;

}

//rabbitTemplate用以简化rabbitmq发送和接收消息

@Bean

@Scope(ConfigurableBeanFactory.


SCOPE_PROTOTYPE


)

// 必须是prototype类型


public

RabbitTemplate rabbitTemplate() {

RabbitTemplate template =

new

RabbitTemplate(connectionFactory());


return

template;

}

/*

* 1、Direct Exchange(直连):传递时需要一个Routekey,通过Routkey寻找对应queue

*/

//获取DirectExchange类型交换机

//交换机-exchange


public


static


final

String


Direct_Exchange


= “DIERCT_EXCHANGE”;

//队列-queue


public


static


final

String


Direct_Queue_A


= “Direct_QUEUE_A”;


public


static


final

String


Direct_Queue_B


= “Direct_QUEUE_B”;

//路由关键字-routingkey


public


static


final

String


Direct_RoutingKey_A


= “DIERCT_ROUTINGKEY_A”;


public


static


final

String


Direct_RoutingKey_B


= “DIERCT_ROUTINGKEY_B”;

@Bean


public

DirectExchange directExchange() {


return


new

DirectExchange(


Direct_Exchange


);

}

//获取queue

@Bean


public

Queue queueA() {


return


new

Queue(


Direct_Queue_A


);

}

@Bean


public

Queue queueB() {


return


new

Queue(


Direct_Queue_B


);

}

//传递routkey给exchange,将queue绑定到exchange上;可以将多个队列绑定到同一个exchange上;

//在生产者发送时需要routkey,格式:RabbitTemplate.convertAndSend(EXCHANGE, ROUTINGKEY, content);

@Bean


public

Binding directBindingA() {


return

BindingBuilder.

bind

(queueA()).to(directExchange()).with(


Direct_RoutingKey_A


);

}

@Bean


public

Binding directBindingB() {


return

BindingBuilder.

bind

(queueB()).to(directExchange()).with(


Direct_RoutingKey_B


);

}

/*

* 2、Fanout Exchange(广播):任何发送到Fanout Exchange的消息都会转发到与该Exchange绑定(bingding)的所有queue上;这种模式不需要routkey

*/


static


final

String


Fanout_Exchange


=”FANOUT_EXCHANGE”;


static


final

String


Fanout_Queue_A


=”FANOUT_QUEUE_A”;


static


final

String


Fanout_Queue_B


=”FANOUT_QUEUE_B”;

@Bean


public

FanoutExchange fanoutExchange() {


return


new

FanoutExchange(


Fanout_Exchange


);

}

@Bean


public

Queue fanoutQueueA() {


return


new

Queue(


Fanout_Queue_A


);

}

@Bean


public

Queue fanoutQueueB() {


return


new

Queue(


Fanout_Queue_B


);

}

//广播方式交换机与queue绑定无需routekey

//生产者发送时routkey为空,格式rabbitTemplate.convertAndSend(Exchange,””, content);这样可以将消息广播到在RabbitConfig类中所有绑定的queues上

@Bean


public

Binding fanoutBindingA() {


return

BindingBuilder.

bind

(fanoutQueueA()).to(fanoutExchange());

}

@Bean


public

Binding fanoutBingB() {


return

BindingBuilder.

bind

(fanoutQueueB()).to(fanoutExchange());

}

/*

* 3、Topic Exchange:任何发送到Exchange的消息都会被转发所有关心Routekey中所指定的queue上

*/


static


final

String


Topic_Exchange


=”TOPIC_EXCHANGE”;


static


final

String


Topic_Queue_A


=”TOPIC_QUEUE_A”;


static


final

String


Topic_Queue_B


=”TOPIC_QUEUE_B”;


static


final

String


Topic_Routing_KeyA


=”TOPIC_#”;


static


final

String


Topic_Routing_KeyB


=”TOPIC_*”;

@Bean


public

TopicExchange topicExchange() {


return


new

TopicExchange(


Topic_Exchange


);

}

@Bean


public

Queue topicQueueA() {


return


new

Queue(


Topic_Queue_A


);

}

@Bean


public

Queue topicQueueB() {


return


new

Queue(


Topic_Queue_B


);

}

@Bean


public

Binding topicBingA() {


return

BindingBuilder.

bind

(topicQueueA()).to(topicExchange()).with(


Topic_Routing_KeyA


);

}

@Bean


public

Binding topicBingB() {


return

BindingBuilder.

bind

(topicQueueB()).to(topicExchange()).with(


Topic_Routing_KeyB


);

}

}

消息发送接口:针对不同交换机类型,发送方法参数略有不同

@Component

public class RabbitSender {

@Autowired

private AmqpTemplate rabbitTemplate;

/*

* directExchange类型  需指定routingkey

*/

public void directSend(String sendMessage) {

// 注意 第一个参数是我们交换机的名称 ,第二个参数是routerKey 我们不用管空着就可以,第三个是你要发送的消息

this.rabbitTemplate.convertAndSend(RabbitConfig.Direct_Exchange, RabbitConfig.Direct_RoutingKey_A, sendMessage); // exchange,routkey,message

}

/*

* fanoutExchange类型 无需指定routingkey

*/

public void fanoutSend(String sendMessage) {

for(int i=0;i<1;i++) {

rabbitTemplate.convertAndSend(RabbitConfig.Fanout_Exchange, “”, sendMessage+i);

}

}

/*

* topicExchange类型 需指定routingkey

*/

public void topicSend(String sendMessage) {

rabbitTemplate.convertAndSend(RabbitConfig.Topic_Exchange, RabbitConfig.Topic_Routing_KeyA, sendMessage);

}

}

监听器  可以监听某个或者某些queue

@Component

@RabbitListener(queues = {RabbitConfig.Direct_Queue_A})

public class Consumer1 {

/**

* 消息消费

* @RabbitHandler 代表此方法为接受到消息后的处理方法

*/

@RabbitHandler

public void recieved(String message) {

System.out.println(“——–:”+JSON.toJSONString(message));

}

}

基于docker rocketmq安装&rocketmq基本接口封装

Rocketmq单机现已部署到qa环境:

http://30.16.80.9:8080/#/

Rocketmq接口封装代码以提交到cap/backed/develop-v2.0分支

具体rocketmq部署以及接口总结文档如下:

一、基于docker rocketmq安装:

1、  拉取rocketmq镜像

docker pull rocketmqinc/rocketmq

2、  拉取rocketmq-console镜像

docker pull styletang/rocketmq-console-ng

3、  启动nameserver

docker run -d -p 9876:9876 -v `pwd`/data/namesrv/

logs:/root/logs

-v `pwd`/data/namesrv/

store:/root/store

–name rmqnamesrv rocketmqinc/rocketmq sh mqnamesrv

4、 启动broker

docker run -d -p 10911:10911 -p 10909:10909 -v `pwd`/data/broker/

logs:/root/logs

-v `pwd`/data/broker/

store:/root/store

–name rmqbroker –link rmqnamesrv:namesrv -e “NAMESRV_ADDR=namesrv:9876” rocketmqinc/rocketmq sh mqbroker -c ../conf/broker.conf

由于启动broker时rocketmq默认指定为内网地址,会导致外网无法连接到broker,报出如下错误信息: connect to xxx.xx.xx.xx:10911 failed

解决方案:①docker exec -it xxxxx bash    –xxx是指broker对应的containerid

②cd ../conf

③vi broker.conf

④增加brokerIP1 = xxx.xxx.xxx.xxx  –这里的ip地址指定为外网地址

⑤重启broker容器

5、 启动rocketmq-console

docker run -e “JAVA_OPTS=-Drocketmq.namesrv.addr=xxx.xxx.xxx.xxx:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false” -p 8080:8080 -t styletang/rocketmq-console-ng

— xxx.xxx.xxx.xxx 为服务器地址

二、rocketmq接口封装

1、  rocketmq结构

2、  rocketmq实例

a)         消费端

i.              普通发送消息

public SendResult send(String topic, String tag, String sendMsg) {


log.info

(“SendMessage_topic:” + topic + “SendMessage_tag:” + tag + “,sendMsg:” + sendMsg);

SendResult sendResult = null;

Message msgMessage = null;

if (StringUtils.isBlank(topic)) {

throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL, “topics is null !!!”, false);

}

try {

if (StringUtils.isBlank(tag)) {

msgMessage = new Message(topic, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));

} else {

msgMessage = new Message(topic, tag, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));

}

sendResult = producer.send(msgMessage);

} catch (Exception e) {

log.error(e.toString());

}

return sendResult;

}

ii.              顺序消息发送

RocketMQ中同一个队列不能被并行消费,但可以并行消费多个队列。基于此,Rocket可以保证将需要排序的内容放在同一个队列中便可以保证消费的顺序进行

public SendResult sendOrderly(String orderId, String topic, String tag, String sendMsg) {


log.info

(“Orderly SendMessage_topic:” + topic + “SendMessage_tag:” + tag + “,sendMsg:” + sendMsg);

SendResult sendResult = null;

Message msgMessage = null;

if (StringUtils.isBlank(topic)) {

throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL, “topics is null !!!”, false);

}

try {

if (StringUtils.isBlank(tag)) {

msgMessage = new Message(topic, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));

} else {

msgMessage = new Message(topic, tag, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));

}

sendResult = producer.send(msgMessage, new MessageQueueSelector() {

@Override

public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

Integer id = (Integer) arg;

int index = id % mqs.size();

return mqs.get(index);

}

}, orderId);

} catch (Exception e) {

log.error(e.toString());

}

return sendResult;

}

b)         消息监听器

i.顺序消费 实现MessageListenerOrderly接口

public class MQConsumeMsgOrderlyListener implements MessageListenerOrderly {

@Value(“${rocketmq.consumer.topic}”)

private String topic;

@Value(“${rocketmq.consumer.tag}”)

private String tag;

@Override

public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

ConsumeOrderlyStatus result=ConsumeOrderlyStatus.SUCCESS;

if (StringUtils.isBlank(topic)) {

throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL, “topics is null !!!”, false);

}

for(MessageExt msg:msgs) {

try {

if(StringUtils.isNotBlank(msg.getTopic()) && msg.getTopic().equals(topic)) {  //TODO 这个需要么,consumer订阅会指定topic

String Message = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);


log.info

(“MQConsumeMsgListenerProcessor consumeMessage body:” + Message);

// TODO 根据接收到mq消息内容进行其他操作

return result;

}

}catch(Exception e) {

result = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;

}

}

return null;

}

ii. 并发消费 实现MessageListenerConcurrently接口


public


class

MQConsumeMsgConcurrentListener

implements

MessageListenerConcurrently {

@Value(“${rocketmq.consumer.topic}”)


private

String topic;

@Value(“${rocketmq.consumer.tag}”)


private

String tag;

@Override


public

ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

ConsumeConcurrentlyStatus result = ConsumeConcurrentlyStatus.


CONSUME_SUCCESS


;


if

(StringUtils.

isBlank

(topic)) {


throw


new

RocketMQException(RocketMQErrorEnum.


PARAMM_NULL


, “topics is null !!!”,

false

);

}


for

(MessageExt msg : msgs) {


try

{


if

(StringUtils.

isNotBlank

(msg.getTopic()) && msg.getTopic().equals(topic)) { //

TODO

String Message =

new

String(msg.getBody(), RemotingHelper.


DEFAULT_CHARSET


);



log


.info(“MQConsumeMsgListenerProcessor consumeMessage body:” + Message);

//

TODO

根据接收到mq消息内容进行其他操作


return

result;

}

}

catch

(Exception e) {

result = ConsumeConcurrentlyStatus.


RECONSUME_LATER


;

}

}


return

result;

}

}

c)         消费端

i.)                   集群消费

一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息;

在初始化消费者时将consumer设置为集群消费模式:consumer.setMessageModel(MessageModel.CLUSTERING);

ii.)                 广播消费

一条消息被多个Consumer消费,几十这些Consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer消费一次;

在初始化消费者时将consumer设置为广播消费模式:

consumer.setMessageModel(MessageModel.BROADCASTING);

基本代码
Application :


package com.leolztang.sb.aop.rocketmq2;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;

@SpringBootApplication
@EnableJpaAuditing 
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}


View Code

InitialProducer


package com.leolztang.sb.aop.rocketmq2;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

@Service
public class InitialProducer {
    private static final Logger log = LoggerFactory.getLogger(InitialProducer.class);
    @Value("${rocketmq.producer.groupName}")
    private String groupName;
    @Value("${rocketmq.producer.namesrvAddr}")
    private String nameserAddr;
    @Value("${rocketmq.producer.instanceName}")
    private String instanceName;
    @Value("${rocketmq.producer.maxMessageSize}")
    private int maxMessageSize;
    @Value("${rocketmq.producer.sendMsgTimeout}")
    private int sendMsgTimeout;

    private DefaultMQProducer producer;
    @Bean
    public DefaultMQProducer getRocketMQProducer() {
        if (StringUtils.isEmpty(groupName)) {
            throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"groupName is blank",false);
        }
        if (StringUtils.isEmpty(nameserAddr)) {
            throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"nameServerAddr is blank",false);
        }
        producer = new DefaultMQProducer(groupName);
        producer.setNamesrvAddr(nameserAddr);
        producer.setInstanceName(instanceName);
        producer.setMaxMessageSize(maxMessageSize);
        producer.setSendMsgTimeout(sendMsgTimeout);
        producer.setVipChannelEnabled(false);

        try {
            producer.start();
            log.info("rocketMQ is start !!groupName : {},nameserAddr:{}",groupName,nameserAddr);
        } catch (MQClientException e) {
            log.error(String.format("rocketMQ start error,{}",e.getMessage()));
            e.printStackTrace();
        }
        return producer;
    }

}


View Code

ProducerDemo :


package com.leolztang.sb.aop.rocketmq2;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/dmservice/rocket")
public class ProducerDemo {
    @Autowired
    private SendMessageService sendMessageService;
    
    @Value("${rocketmq.consumer.topic}")
    private String topic;
    @Value("${rocketmq.consumer.tag}")
    private String tag;
    
    @RequestMapping(value="/sendmq2")
    public void sendMQDemo() {
        String body="rocketMQ test body";
        for(int i=0;i<10;i++) {
            sendMessageService.send(topic, tag,  body+i+"");
        }
    }
    
}


View Code

SendMessageService

package com.leolztang.sb.aop.rocketmq2;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


@Service
public class SendMessageService {
    private static final Logger log = LoggerFactory.getLogger(SendMessageService.class);
    @Autowired
    private DefaultMQProducer producer;

    // 1、topic
    public SendResult send(String topic, String sendMsg) {
        log.info("SendMessage_topic:" + topic + ",sendMsg:" + sendMsg);
        SendResult sendResult = null;
        try {
            Message msgMessage = new Message(topic, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));
            sendResult = producer.send(msgMessage);
            log.info("SendResult topic:" + topic + "sendResult.getSendStatus:" + sendResult.getSendStatus());
        } catch (Exception e) {
            log.error(e.toString());
        }
        return sendResult;
    }

    // 2、topci+tags
    public SendResult send(String topic, String tag, String sendMsg) {
        log.info("SendMessage_topic:" + topic + ",tag:" + tag + ",sendMsg:" + sendMsg);
        SendResult sendResult = null;
        try {
            Message msgMessage = new Message(topic, tag, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));
            sendResult = producer.send(msgMessage);
            log.info("SendResult topic:" + topic + ",tag:" + tag +"sendResult.getSendStatus:" + sendResult.getSendStatus());
        } catch (Exception e) {
            log.error(e.toString());
        }
        return sendResult;
    }
}


View Code

InitialConsumer1 :


package com.leolztang.sb.aop.rocketmq2;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

@Service
public class InitialConsumer1 {
    private static final Logger log = LoggerFactory.getLogger(InitialConsumer1.class);
    
    @Value("${rocketmq.consumer.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.consumer.groupName}")
    private String groupName;
    @Value("${rocketmq.consumer.consumeThreadMin}")
    private int consumeThreadMin;
    @Value("${rocketmq.consumer.consumeThreadMax}")
    private int consumeThreadMax;
    @Value("${rocketmq.consumer.topic}")
    private String topic;
    @Value("${rocketmq.consumer.tag}")
    private String tag;
    
    @Autowired
    private MQConsumeMsgListenerProcessor2 mqMessageListenerProcessor;  //TODO MQConsumeMsgListenerProcessor改为接口,可以不同consuemr不同listener
    
    @Bean
    public DefaultMQPushConsumer getRocketMQConsumer() throws RocketMQException{
        if (StringUtils.isEmpty(groupName)){
            throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"groupName is null !!!",false);
        }
        if (StringUtils.isEmpty(namesrvAddr)){
            throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"namesrvAddr is null !!!",false);
        }
        if(StringUtils.isEmpty(topic)){
            throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"topics is null !!!",false);
        }
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.registerMessageListener(mqMessageListenerProcessor);
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        consumer.setMessageModel(MessageModel.CLUSTERING); //集群消费
//        consumer.setMessageModel(MessageModel.BROADCASTING);//广播消费  对应MessageListenerConcurrently监听器
        try {
            consumer.subscribe(topic, tag); //TODO 这里是否可以注册topic和tag
            consumer.start();
        }catch(Exception e) {
            log.error("groupName:{},topic:{},tag:{}, getRocketMQConsumer error",groupName,topic,tag,e);
        }
        /**
         * 设置一次消费消息的条数,默认为1条
         */
        return consumer;
    }
    
}


View Code

MQConsumeMsgListenerProcessor


package com.leolztang.sb.aop.rocketmq2;

import java.util.List;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;


@Service
public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly {
    private static final Logger log = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
    @Value("${rocketmq.consumer.topic}")
    private String topics;
    @Value("${rocketmq.consumer.tag}")
    private String tag;
    
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        ConsumeOrderlyStatus result = ConsumeOrderlyStatus.SUCCESS;
        for(MessageExt msg:msgs) {
            try {
                if(StringUtils.isNotBlank(msg.getTopic()) && msg.getTopic().equals(topics)) {   //TODO 这个需要么,consumer订阅会指定topic
                    if(StringUtils.isNotBlank(msg.getTags()) && msg.getTags().equals(tag)) {
                        String Message=new String(msg.getBody(),RemotingHelper.DEFAULT_CHARSET);
                        log.info("======================:"+Message);
                        //TODO 根据接收到mq消息内容进行其他操作
                        return result;
                    }
                }
            }catch(Exception e) {
                result = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
        }
        return result;
    }

}


View Code

MQConsumeMsgListenerProcessor2


package com.leolztang.sb.aop.rocketmq2;

import java.util.HashMap;
import java.util.List;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class MQConsumeMsgListenerProcessor2 implements MessageListenerConcurrently {
    private static final Logger log = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
    @Value("${rocketmq.consumer.topic}")
    private String topics;
    @Value("${rocketmq.consumer.tag}")
    private String tag;
    
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        HashMap<K, V>
        ConsumeConcurrentlyStatus result = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        for(MessageExt msg:msgs) {
            try {
                if(StringUtils.isNotBlank(msg.getTopic()) && msg.getTopic().equals(topics)) {   //TODO 这个需要么,consumer订阅会指定topic
                    if(StringUtils.isNotBlank(msg.getTags()) && msg.getTags().equals(tag)) {
                        String Message=new String(msg.getBody(),RemotingHelper.DEFAULT_CHARSET);
                        log.info("==================~~~~~~~~jjjjjjjjj============:"+Message);
                        //TODO 根据接收到mq消息内容进行其他操作
                        return result;
                    }
                }
            }catch(Exception e) {
                result = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return result;
    }

}


View Code

RocketMQErrorEnum


package com.leolztang.sb.aop.rocketmq2;

public enum RocketMQErrorEnum implements ErrorCode {
    /********公共********/
    PARAMM_NULL("MQ_001","参数为空"),
    
    /********生产者*******/
    
    
    
    /********消费者*******/
    NOT_FOUND_CONSUMESERVICE("MQ_100","根据topic和tag没有找到对应的消费服务"),
    HANDLE_RESULT_NULL("MQ_101","消费方法返回值为空"),
    CONSUME_FAIL("MQ_102","消费失败")
    
    ;

    private String code;
    private String msg;

    private RocketMQErrorEnum(String code, String msg) {
        this.code = code;
        this.msg = msg;
    }
    public String getCode() {
        return this.code;
    }

    public String getMsg() {
        return this.msg;
    }
}


View Code

RocketMQException


package com.leolztang.sb.aop.rocketmq2;

public class RocketMQException extends AppException {

    /**
     * 无参构造函数
     */
    public RocketMQException() {
        super();
    }

    public RocketMQException(Throwable e) {
        super(e);
    }

    public RocketMQException(ErrorCode errorType) {
        super(errorType);
    }

    public RocketMQException(ErrorCode errorCode, String... errMsg) {
        super(errorCode, errMsg);
    }

    /**
     * 封装异常
     * 
     * @param errorCode
     * @param errMsg
     * @param isTransfer
     *            是否转换异常信息,如果为false,则直接使用errMsg信息
     */
    public RocketMQException(ErrorCode errorCode, String errMsg, Boolean isTransfer) {
        super(errorCode, errMsg, isTransfer);
    }

    public RocketMQException(ErrorCode errCode, Throwable cause, String... errMsg) {
        super(errCode, cause, errMsg);
    }
}


View Code

ErrorCode


package com.leolztang.sb.aop.rocketmq2;

import java.io.Serializable;

public interface ErrorCode extends Serializable {
    /* 
    * 错误码
     * @return
     */
    String getCode();
    /**
     * 错误信息
     * @return
     */
    String getMsg();
}


View Code

AppException


package com.leolztang.sb.aop.rocketmq2;

public class AppException extends RuntimeException{
     private static final long serialVersionUID = 1L;
        /**
         * 错误编码
         */
        protected ErrorCode errCode;

        /**
         * 错误信息
         */
        protected String errMsg;

        /**
         * 无参构造函数
         */
        public AppException() {
            super();
        }
        public AppException(Throwable e) {
            super(e);
        }
        
        public AppException(ErrorCode errCode, String... errMsg) {
            super(errCode.getMsg());
            this.errCode = errCode;
            setErrMsg(errMsg,true);
        }
        
        public AppException(ErrorCode errCode, String errMsg,Boolean isTransfer) {
            super(errMsg);
            this.errCode = errCode;
            setErrMsg(new String[]{errMsg},isTransfer);
        }
        
        /**
         * 构造函数
         *
         * @param cause 异常
         */
        public AppException(ErrorCode errCode, Throwable cause, String... errMsg) {
            super(errCode.getCode() + errCode.getMsg(), cause);
            this.errCode = errCode;
            setErrMsg(errMsg,true);
        }

        public ErrorCode getErrCode() {
            return errCode;
        }

        public void setErrCode(ErrorCode errCode) {
            this.errCode = errCode;
        }

        public String getErrMsg() {
            return this.errMsg;
        }

        public void setErrMsg(String[] errMsg,Boolean isTransfer) {

            if (null != errMsg &&errMsg.length>0) {
                if(errCode.getMsg().contains("%s") && isTransfer){
                    this.errMsg = String.format(errCode.getMsg(), errMsg);
                }else{
                    StringBuffer sf = new StringBuffer();
                    for (String msg : errMsg) {
                        sf.append(msg+";");
                    }
                    this.errMsg = sf.toString();
                }
            }else{
                this.errMsg = errCode.getMsg();
            }

        }

        public static void main(String[] args) {
            String str = "ERRCode:1004--对象不存在:[%s]";
            if (str.contains("%s")){
             System.out.println("包含");
            }
        }
}


View Code

转载于:https://www.cnblogs.com/enhance/p/11227841.html