springboot+RocketMQ实现(生产者和消费者)

  • Post author:
  • Post category:其他


1)pom配置

<dependency>

    <groupId>org.apache.rocketmq</groupId>

    <artifactId>rocketmq-client</artifactId>

    <version>4.3.0</version>

</dependency>

2)yml配置

apache:

  rocketmq:

    name-server: 127.0.0.1:9876

    producer:

      topic: testTopic1

  #消息最大长度 默认1024*4(4M)

      maxMessageSize: 4096

  #发送消息超时时间,默认3000

      sendMsgTimeout: 3000

  #发送消息失败重试次数,默认2

      retryTimesWhenSendFailed: 2

      algorithm:

        producerGroup: algorithm-api

    #跟接受的集群组有关,同一个集群组所采用的comsumerGroup是一致的

        comsumerGroup: algorithm-api1

    consumer:

    #跟生产者的topic对应

      topic: testTopic1

      tag: testTag

      consumeMessageBatchMaxSize: 3

      consumeThreadMin: 20

      consumeThreadMax: 64

3)生产者配置类:MQProducerConfiguration和生产者调用类:MqProducerServiceImpl

@Configuration

@Order(0)

public class MQProducerConfiguration {

    public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);

    /**

     * 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示

     */

    @Value("${apache.rocketmq.producer.algorithm.producerGroup}")

    private String groupName;

    @Value("${apache.rocketmq.name-server}")

    private String namesrvAddr;

    /**

     * 消息最大大小,默认4M

     */

    @Value("${apache.rocketmq.producer.maxMessageSize}")

    private Integer maxMessageSize ;

    /**

     * 消息发送超时时间,默认3秒

     */

    @Value("${apache.rocketmq.producer.sendMsgTimeout}")

    private Integer sendMsgTimeout;

    /**

     * 消息发送失败重试次数,默认2次

     */

    @Value("${apache.rocketmq.producer.retryTimesWhenSendFailed}")

    private Integer retryTimesWhenSendFailed;

    @Bean

    public DefaultMQProducer getRocketMQProducer() throws RocketMQException {

        if (StringUtils.isEmpty(this.groupName)) {

            throw new RocketMQException("groupName is blank");

        }

        if (StringUtils.isEmpty(this.namesrvAddr)) {

            throw new RocketMQException("nameServerAddr is blank");

        }

        DefaultMQProducer producer;

        producer = new DefaultMQProducer(this.groupName);

        producer.setNamesrvAddr(this.namesrvAddr);

        //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName

        //producer.setInstanceName(instanceName);

//        if(this.maxMessageSize!=null){

//            producer.setMaxMessageSize(this.maxMessageSize);

//        }

        if(this.sendMsgTimeout!=null){

            producer.setSendMsgTimeout(this.sendMsgTimeout);

        }

        //如果发送消息失败,设置重试次数,默认为2次

        if(this.retryTimesWhenSendFailed!=null){

            producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);

        }

        try {

            producer.start();

            LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]"

                    , this.groupName, this.namesrvAddr));

        } catch (MQClientException e) {

            LOGGER.error(String.format("producer is error {}"

                    , e.getMessage(),e));

            throw new RocketMQException(e.getMessage());

        }

        return producer;

    }

}
@Component(value = "mqProducerService")

@Log4j2

//实现类

public class MqProducerServiceImpl implements MqProducerService {

    @Autowired

    private DefaultMQProducer producer;

    @Value("${apache.rocketmq.producer.topic:broadcastSocketMsg}")

    private String topic;

    private static final String TAG_SOCKET_MSG = "testTag";

    public void sendSocketMsg(BaseMsg obj) throws Exception {

        try {

            Message msg;

            msg = new Message(topic ,

                    TAG_SOCKET_MSG,

                    JsonUtils.beanToJson(obj).getBytes(RemotingHelper.DEFAULT_CHARSET)

            );

            msg.putUserProperty("request-id", UUID.randomUUID().toString().replace("-", ""));

            log.info("MQ请求参数:"+JsonUtils.beanToJson(obj));

            SendResult sendResult= producer.send(msg);

            log.info("MQ结果返回:"+JsonUtils.beanToJson(sendResult));

        } catch (RemotingException | MQBrokerException | InterruptedException | UnsupportedEncodingException |MQClientException e) {

            log.error(e.getMessage());

            Thread.currentThread().interrupt();

        }

    }

}
//接口类

public interface MqProducerService {

    public void sendSocketMsg(BaseMsg obj) throws Exception;

}

4)配置调用的Controller类:MQController

@RestController

@RequestMapping("/mq")

@Api(value="/mq", tags="消息模块")

public class MQController {

    @Autowired

    private MqProducerService mqProducerService;

    @PostMapping(value = "/msg")

    @Produces("application/json;charset=UTF-8")

    @ApiOperation(value = "新增消息", notes = "新增消息")

    public Object addUser(@RequestBody   @ApiParam(value = "传入的user", required = true) BaseMsg baseMsg) throws Exception {

        mqProducerService.sendSocketMsg(baseMsg);

        return Result.buildSuccessResult();

    }

}

5)消费者配置类:MQConsumerConfiguration和消费者处理类:MQConsumeMsgListenerProcessor

@Configuration

public class MQConsumerConfiguration {

    public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);

    @Value("${apache.rocketmq.name-server}")

    private String namesrvAddr;

    @Value("${apache.rocketmq.producer.algorithm.comsumerGroup}")

    private String groupName;

    @Value("${apache.rocketmq.consumer.consumeThreadMin}")

    private int consumeThreadMin;

    @Value("${apache.rocketmq.consumer.consumeThreadMax}")

    private int consumeThreadMax;

    @Value("${apache.rocketmq.consumer.topic}")

    private String topic;

    @Value("${apache.rocketmq.consumer.tag}")

    private String tag;

    @Value("${apache.rocketmq.consumer.consumeMessageBatchMaxSize}")

    private int consumeMessageBatchMaxSize;

    @Autowired

    private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;

    @Bean

    public DefaultMQPushConsumer getRocketMQConsumer() throws RocketMQException {

        if (StringUtils.isEmpty(groupName)) {

            throw new RocketMQException("groupName is blank!");

        }

        if (StringUtils.isEmpty(namesrvAddr)) {

            throw new RocketMQException("namesrvAddr is null!");

        }

        if (StringUtils.isEmpty(topic)) {

            throw new RocketMQException("topics is null !!!");

        }

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);

        //set to broadcast mode

        consumer.setMessageModel(MessageModel.BROADCASTING);

        consumer.setNamesrvAddr(namesrvAddr);

        consumer.setConsumeThreadMin(consumeThreadMin);

        consumer.setConsumeThreadMax(consumeThreadMax);

        consumer.registerMessageListener(mqMessageListenerProcessor);

        /**

         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费

         */

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        /**

         * 设置消费模型,集群还是广播,默认为集群

         */

        // consumer.setMessageModel(MessageModel.CLUSTERING);

        /**

         * 设置一次消费消息的条数,默认为1条

         */

        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);

        try {

            consumer.subscribe(topic, tag);

            consumer.start();

            LOGGER.info("consumer is start !!! groupName:{},topic:{},tag:{},namesrvAddr:{}", groupName, topic,tag, namesrvAddr);

        } catch (MQClientException e) {

            LOGGER.error("consumer is start !!! groupName:{},topic:{},tag:{},namesrvAddr:{}", groupName, topic,tag, namesrvAddr,

                    e);

            throw new RocketMQException(e.getMessage());

        }

        return consumer;

    }

}
@Component

@Log4j2

public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {

    @Value("${apache.rocketmq.consumer.topic}")

    private String topic;

    private static String TAG_SOCKET_MSG = "testTag";

    @Override

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

        MessageExt messageExt = msgs.get(0);

        log.info("接受到的消息为:" + messageExt.toString());

        if (messageExt.getTopic().equals(topic)) {

            if (messageExt.getTags().equals(TAG_SOCKET_MSG)) {

                int reconsume = messageExt.getReconsumeTimes();

                if (reconsume == 3) {// 消息已经重试了3次,如果不需要再次消费,则返回成功

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

                }

                //TODO  处理对应的业务逻辑

                log.info("接收的body体:" + new String(messageExt.getBody()));

            }

        }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

    }

}

有什么问题或者更好的方案欢迎交流



版权声明:本文为zhoupan2008原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。