1)pom配置
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>2)yml配置
apache:
  rocketmq:
    name-server: 127.0.0.1:9876
    producer:
      topic: testTopic1
  #消息最大长度 默认1024*4(4M)
      maxMessageSize: 4096
  #发送消息超时时间,默认3000
      sendMsgTimeout: 3000
  #发送消息失败重试次数,默认2
      retryTimesWhenSendFailed: 2
      algorithm:
        producerGroup: algorithm-api
    #跟接受的集群组有关,同一个集群组所采用的comsumerGroup是一致的
        comsumerGroup: algorithm-api1
    consumer:
    #跟生产者的topic对应
      topic: testTopic1
      tag: testTag
      consumeMessageBatchMaxSize: 3
      consumeThreadMin: 20
      consumeThreadMax: 643)生产者配置类:MQProducerConfiguration和生产者调用类:MqProducerServiceImpl
@Configuration
@Order(0)
public class MQProducerConfiguration {
    public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);
    /**
     * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
     */
    @Value("${apache.rocketmq.producer.algorithm.producerGroup}")
    private String groupName;
    @Value("${apache.rocketmq.name-server}")
    private String namesrvAddr;
    /**
     * 消息最大大小,默认4M
     */
    @Value("${apache.rocketmq.producer.maxMessageSize}")
    private Integer maxMessageSize ;
    /**
     * 消息发送超时时间,默认3秒
     */
    @Value("${apache.rocketmq.producer.sendMsgTimeout}")
    private Integer sendMsgTimeout;
    /**
     * 消息发送失败重试次数,默认2次
     */
    @Value("${apache.rocketmq.producer.retryTimesWhenSendFailed}")
    private Integer retryTimesWhenSendFailed;
    @Bean
    public DefaultMQProducer getRocketMQProducer() throws RocketMQException {
        if (StringUtils.isEmpty(this.groupName)) {
            throw new RocketMQException("groupName is blank");
        }
        if (StringUtils.isEmpty(this.namesrvAddr)) {
            throw new RocketMQException("nameServerAddr is blank");
        }
        DefaultMQProducer producer;
        producer = new DefaultMQProducer(this.groupName);
        producer.setNamesrvAddr(this.namesrvAddr);
        //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
        //producer.setInstanceName(instanceName);
//        if(this.maxMessageSize!=null){
//            producer.setMaxMessageSize(this.maxMessageSize);
//        }
        if(this.sendMsgTimeout!=null){
            producer.setSendMsgTimeout(this.sendMsgTimeout);
        }
        //如果发送消息失败,设置重试次数,默认为2次
        if(this.retryTimesWhenSendFailed!=null){
            producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
        }
        try {
            producer.start();
            LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]"
                    , this.groupName, this.namesrvAddr));
        } catch (MQClientException e) {
            LOGGER.error(String.format("producer is error {}"
                    , e.getMessage(),e));
            throw new RocketMQException(e.getMessage());
        }
        return producer;
    }
}
@Component(value = "mqProducerService")
@Log4j2
//实现类
public class MqProducerServiceImpl implements MqProducerService {
    @Autowired
    private DefaultMQProducer producer;
    @Value("${apache.rocketmq.producer.topic:broadcastSocketMsg}")
    private String topic;
    private static final String TAG_SOCKET_MSG = "testTag";
    public void sendSocketMsg(BaseMsg obj) throws Exception {
        try {
            Message msg;
            msg = new Message(topic ,
                    TAG_SOCKET_MSG,
                    JsonUtils.beanToJson(obj).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            msg.putUserProperty("request-id", UUID.randomUUID().toString().replace("-", ""));
            log.info("MQ请求参数:"+JsonUtils.beanToJson(obj));
            SendResult sendResult= producer.send(msg);
            log.info("MQ结果返回:"+JsonUtils.beanToJson(sendResult));
        } catch (RemotingException | MQBrokerException | InterruptedException | UnsupportedEncodingException |MQClientException e) {
            log.error(e.getMessage());
            Thread.currentThread().interrupt();
        }
    }
}//接口类
public interface MqProducerService {
    public void sendSocketMsg(BaseMsg obj) throws Exception;
}4)配置调用的Controller类:MQController
@RestController
@RequestMapping("/mq")
@Api(value="/mq", tags="消息模块")
public class MQController {
    @Autowired
    private MqProducerService mqProducerService;
    @PostMapping(value = "/msg")
    @Produces("application/json;charset=UTF-8")
    @ApiOperation(value = "新增消息", notes = "新增消息")
    public Object addUser(@RequestBody   @ApiParam(value = "传入的user", required = true) BaseMsg baseMsg) throws Exception {
        mqProducerService.sendSocketMsg(baseMsg);
        return Result.buildSuccessResult();
    }
}5)消费者配置类:MQConsumerConfiguration和消费者处理类:MQConsumeMsgListenerProcessor
@Configuration
public class MQConsumerConfiguration {
    public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
    @Value("${apache.rocketmq.name-server}")
    private String namesrvAddr;
    @Value("${apache.rocketmq.producer.algorithm.comsumerGroup}")
    private String groupName;
    @Value("${apache.rocketmq.consumer.consumeThreadMin}")
    private int consumeThreadMin;
    @Value("${apache.rocketmq.consumer.consumeThreadMax}")
    private int consumeThreadMax;
    @Value("${apache.rocketmq.consumer.topic}")
    private String topic;
    @Value("${apache.rocketmq.consumer.tag}")
    private String tag;
    @Value("${apache.rocketmq.consumer.consumeMessageBatchMaxSize}")
    private int consumeMessageBatchMaxSize;
    @Autowired
    private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
    @Bean
    public DefaultMQPushConsumer getRocketMQConsumer() throws RocketMQException {
        if (StringUtils.isEmpty(groupName)) {
            throw new RocketMQException("groupName is blank!");
        }
        if (StringUtils.isEmpty(namesrvAddr)) {
            throw new RocketMQException("namesrvAddr is null!");
        }
        if (StringUtils.isEmpty(topic)) {
            throw new RocketMQException("topics is null !!!");
        }
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        //set to broadcast mode
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.registerMessageListener(mqMessageListenerProcessor);
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        /**
         * 设置消费模型,集群还是广播,默认为集群
         */
        // consumer.setMessageModel(MessageModel.CLUSTERING);
        /**
         * 设置一次消费消息的条数,默认为1条
         */
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
        try {
            consumer.subscribe(topic, tag);
            consumer.start();
            LOGGER.info("consumer is start !!! groupName:{},topic:{},tag:{},namesrvAddr:{}", groupName, topic,tag, namesrvAddr);
        } catch (MQClientException e) {
            LOGGER.error("consumer is start !!! groupName:{},topic:{},tag:{},namesrvAddr:{}", groupName, topic,tag, namesrvAddr,
                    e);
            throw new RocketMQException(e.getMessage());
        }
        return consumer;
    }
}@Component
@Log4j2
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
    @Value("${apache.rocketmq.consumer.topic}")
    private String topic;
    private static String TAG_SOCKET_MSG = "testTag";
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        MessageExt messageExt = msgs.get(0);
        log.info("接受到的消息为:" + messageExt.toString());
        if (messageExt.getTopic().equals(topic)) {
            if (messageExt.getTags().equals(TAG_SOCKET_MSG)) {
                int reconsume = messageExt.getReconsumeTimes();
                if (reconsume == 3) {// 消息已经重试了3次,如果不需要再次消费,则返回成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                //TODO  处理对应的业务逻辑
                log.info("接收的body体:" + new String(messageExt.getBody()));
            }
        }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}有什么问题或者更好的方案欢迎交流
 
版权声明:本文为zhoupan2008原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
