1)pom配置
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
2)yml配置
apache:
rocketmq:
name-server: 127.0.0.1:9876
producer:
topic: testTopic1
#消息最大长度 默认1024*4(4M)
maxMessageSize: 4096
#发送消息超时时间,默认3000
sendMsgTimeout: 3000
#发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
algorithm:
producerGroup: algorithm-api
#跟接受的集群组有关,同一个集群组所采用的comsumerGroup是一致的
comsumerGroup: algorithm-api1
consumer:
#跟生产者的topic对应
topic: testTopic1
tag: testTag
consumeMessageBatchMaxSize: 3
consumeThreadMin: 20
consumeThreadMax: 64
3)生产者配置类:MQProducerConfiguration和生产者调用类:MqProducerServiceImpl
@Configuration
@Order(0)
public class MQProducerConfiguration {
public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);
/**
* 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
*/
@Value("${apache.rocketmq.producer.algorithm.producerGroup}")
private String groupName;
@Value("${apache.rocketmq.name-server}")
private String namesrvAddr;
/**
* 消息最大大小,默认4M
*/
@Value("${apache.rocketmq.producer.maxMessageSize}")
private Integer maxMessageSize ;
/**
* 消息发送超时时间,默认3秒
*/
@Value("${apache.rocketmq.producer.sendMsgTimeout}")
private Integer sendMsgTimeout;
/**
* 消息发送失败重试次数,默认2次
*/
@Value("${apache.rocketmq.producer.retryTimesWhenSendFailed}")
private Integer retryTimesWhenSendFailed;
@Bean
public DefaultMQProducer getRocketMQProducer() throws RocketMQException {
if (StringUtils.isEmpty(this.groupName)) {
throw new RocketMQException("groupName is blank");
}
if (StringUtils.isEmpty(this.namesrvAddr)) {
throw new RocketMQException("nameServerAddr is blank");
}
DefaultMQProducer producer;
producer = new DefaultMQProducer(this.groupName);
producer.setNamesrvAddr(this.namesrvAddr);
//如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
//producer.setInstanceName(instanceName);
// if(this.maxMessageSize!=null){
// producer.setMaxMessageSize(this.maxMessageSize);
// }
if(this.sendMsgTimeout!=null){
producer.setSendMsgTimeout(this.sendMsgTimeout);
}
//如果发送消息失败,设置重试次数,默认为2次
if(this.retryTimesWhenSendFailed!=null){
producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
}
try {
producer.start();
LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]"
, this.groupName, this.namesrvAddr));
} catch (MQClientException e) {
LOGGER.error(String.format("producer is error {}"
, e.getMessage(),e));
throw new RocketMQException(e.getMessage());
}
return producer;
}
}
@Component(value = "mqProducerService")
@Log4j2
//实现类
public class MqProducerServiceImpl implements MqProducerService {
@Autowired
private DefaultMQProducer producer;
@Value("${apache.rocketmq.producer.topic:broadcastSocketMsg}")
private String topic;
private static final String TAG_SOCKET_MSG = "testTag";
public void sendSocketMsg(BaseMsg obj) throws Exception {
try {
Message msg;
msg = new Message(topic ,
TAG_SOCKET_MSG,
JsonUtils.beanToJson(obj).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
msg.putUserProperty("request-id", UUID.randomUUID().toString().replace("-", ""));
log.info("MQ请求参数:"+JsonUtils.beanToJson(obj));
SendResult sendResult= producer.send(msg);
log.info("MQ结果返回:"+JsonUtils.beanToJson(sendResult));
} catch (RemotingException | MQBrokerException | InterruptedException | UnsupportedEncodingException |MQClientException e) {
log.error(e.getMessage());
Thread.currentThread().interrupt();
}
}
}
//接口类
public interface MqProducerService {
public void sendSocketMsg(BaseMsg obj) throws Exception;
}
4)配置调用的Controller类:MQController
@RestController
@RequestMapping("/mq")
@Api(value="/mq", tags="消息模块")
public class MQController {
@Autowired
private MqProducerService mqProducerService;
@PostMapping(value = "/msg")
@Produces("application/json;charset=UTF-8")
@ApiOperation(value = "新增消息", notes = "新增消息")
public Object addUser(@RequestBody @ApiParam(value = "传入的user", required = true) BaseMsg baseMsg) throws Exception {
mqProducerService.sendSocketMsg(baseMsg);
return Result.buildSuccessResult();
}
}
5)消费者配置类:MQConsumerConfiguration和消费者处理类:MQConsumeMsgListenerProcessor
@Configuration
public class MQConsumerConfiguration {
public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
@Value("${apache.rocketmq.name-server}")
private String namesrvAddr;
@Value("${apache.rocketmq.producer.algorithm.comsumerGroup}")
private String groupName;
@Value("${apache.rocketmq.consumer.consumeThreadMin}")
private int consumeThreadMin;
@Value("${apache.rocketmq.consumer.consumeThreadMax}")
private int consumeThreadMax;
@Value("${apache.rocketmq.consumer.topic}")
private String topic;
@Value("${apache.rocketmq.consumer.tag}")
private String tag;
@Value("${apache.rocketmq.consumer.consumeMessageBatchMaxSize}")
private int consumeMessageBatchMaxSize;
@Autowired
private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
@Bean
public DefaultMQPushConsumer getRocketMQConsumer() throws RocketMQException {
if (StringUtils.isEmpty(groupName)) {
throw new RocketMQException("groupName is blank!");
}
if (StringUtils.isEmpty(namesrvAddr)) {
throw new RocketMQException("namesrvAddr is null!");
}
if (StringUtils.isEmpty(topic)) {
throw new RocketMQException("topics is null !!!");
}
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
//set to broadcast mode
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.registerMessageListener(mqMessageListenerProcessor);
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
/**
* 设置消费模型,集群还是广播,默认为集群
*/
// consumer.setMessageModel(MessageModel.CLUSTERING);
/**
* 设置一次消费消息的条数,默认为1条
*/
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
try {
consumer.subscribe(topic, tag);
consumer.start();
LOGGER.info("consumer is start !!! groupName:{},topic:{},tag:{},namesrvAddr:{}", groupName, topic,tag, namesrvAddr);
} catch (MQClientException e) {
LOGGER.error("consumer is start !!! groupName:{},topic:{},tag:{},namesrvAddr:{}", groupName, topic,tag, namesrvAddr,
e);
throw new RocketMQException(e.getMessage());
}
return consumer;
}
}
@Component
@Log4j2
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
@Value("${apache.rocketmq.consumer.topic}")
private String topic;
private static String TAG_SOCKET_MSG = "testTag";
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0);
log.info("接受到的消息为:" + messageExt.toString());
if (messageExt.getTopic().equals(topic)) {
if (messageExt.getTags().equals(TAG_SOCKET_MSG)) {
int reconsume = messageExt.getReconsumeTimes();
if (reconsume == 3) {// 消息已经重试了3次,如果不需要再次消费,则返回成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//TODO 处理对应的业务逻辑
log.info("接收的body体:" + new String(messageExt.getBody()));
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
有什么问题或者更好的方案欢迎交流
版权声明:本文为zhoupan2008原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。