前言
在此只引入关键依赖,关于SpringBoot等其他基础依赖请自行准备。
另外,本文中的实现方式为
点对点
的方式。
开始
首先,在pom文件中引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
①一个Producer
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Slf4j
@Component
public class MessageProducer {
@Value(value = "${rocketMQ.address}")
private String address;
private DefaultMQProducer producer;
private static final Integer RETRY_TIMES = 5;
/**
* 消息生产者初始化
*/
@PostConstruct
private void init(){
try {
producer = new DefaultMQProducer("zxy");
producer.setNamesrvAddr(address);
producer.setInstanceName("MessageProducer");
producer.setRetryTimesWhenSendAsyncFailed(RETRY_TIMES);
producer.start();
} catch (Exception e){
log.error("消息生产者初始化失败");
}
}
/**
* 消息生产者关闭
*/
@PreDestroy
public void destroy(){
try {
producer.shutdown();
} catch (Exception e){
log.error("消息生产者关闭失败");
}
}
/**
* 异步发送mq
* @param topic
* @param tags
* @param body
*/
@Async
public void sendMessage(String topic, String tags, String body) {
try {
Message message = new Message(topic, tags, body.getBytes("UTF-8"));
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功, sendResult:{}" , sendResult);
}
@Override
public void onException(Throwable throwable) {
Integer retryTimes = producer.getRetryTimesWhenSendAsyncFailed();
log.info("消息发送失败, retryTimes:{}" , retryTimes);
}
});
} catch (Exception e) {
log.error("消息发送失败,topic:{}, tags:{}, body:{}", topic, tags, body);
}
}
}
②一个Consumer
import com.zxy.devops.project.biz.enums.MessageTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class DefaultConsumer {
@Value(value = "${rocketMQ.address}")
private String address;
@Autowired
private DefaultListener defaultListener;
@PostConstruct
public void messageSubscribe() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
consumer.setNamesrvAddr(address);
consumer.setInstanceName("DefaultConsumer");
consumer.subscribe("default-topic", "*");
consumer.registerMessageListener(defaultListener);
consumer.start();
log.info("Default consumer start.");
}
}
③一个Listener
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class DefaultListener implements MessageListenerConcurrently {
private static final Integer RETRY_TIMES = 5;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
log.info(Thread.currentThread().getName()
+" Receive New Messages: " + list.size());
Message message = list.get(0);
if (message == null) {
log.error("处理消息时, message 为空");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
log.info("处理消息 , topic={}, tags={}, keys={}, reConsumeTimes={}",
message.getTopic(),
message.getTags(),
message.getKeys(),
((MessageExt) message).getReconsumeTimes());
String defaultTag = "tag";
try {
//处理mq
if (defaultTag.equals(message.getTags())) {
//获取消息body
String body = new String(message.getBody());
//TODO 执行业务逻辑
}
log.info("处理MQ消息消费 成功 , topic={}, tag={}, key={}, reConsumeTimes={}",
message.getTopic(),
message.getTags(),
message.getKeys(),
((MessageExt) message).getReconsumeTimes());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("处理MQ消息消费 保存记录失败,topic={}, tag={}, key={}, reConsumeTimes={}, body={},e={}",
message.getTopic(), message.getTags(), message.getKeys(), ((MessageExt) message).getReconsumeTimes(), new String(message.getBody()), e);
if (((MessageExt) message).getReconsumeTimes() < RETRY_TIMES) {
log.info("处理MQ,准备重新消费消息");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} else {
// 超过5次,就不重发了
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}
}
总结
本文中实现了SpringBoot整合RocketMQ点对点的方式,其中Producer可通用向自定义topic生产消息,Consumer可自定义订阅topic,如ConsumerA订阅topic-A,ConsumerB订阅topic-B。在本文中重试次数到达上限后,可另行添加补偿机制,如生成日志存入数据库,后续批量处理。
版权声明:本文为IceCaptain原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。