RocketMQ实战(1)springBoot整合RabbitMQ
1.引入maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
这里注意版本号,最好和你用的rocktmq的版本号保持一致
2.生产者相关配置
2.1 定义生产者自动配置类
@ConfigurationProperties(prefix = "rocketmq.producer")
@Component
@Data
public class RocketmqProducerConfig {
private String namesrvAddr;
private String groupName;
private Integer maxMessageSize;
private Integer sendMsgTimeout;
private Integer retryTimesWhenSendFailed;
private Integer retryTimesWhenSendAsyncFailed;
@Bean
@ConditionalOnMissingBean
public DefaultMQProducer defaultMQProducer() throws RuntimeException {
DefaultMQProducer producer = new DefaultMQProducer(this.groupName);
producer.setNamesrvAddr(this.namesrvAddr);
// 如果topic不存在 自动创建topic
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
//如果发送消息的最大限制
producer.setMaxMessageSize(this.maxMessageSize);
//如果发送消息超时时间
producer.setSendMsgTimeout(this.sendMsgTimeout);
//如果发送消息失败,设置重试次数,默认为 2 次
producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
//如果是异步发送消息失败,设置重试次数 默认为0次
producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendAsyncFailed);
try {
producer.start();
} catch (MQClientException e) {
throw new RuntimeException(e);
}
return producer;
}
}
2.2生产者配置文件
生产者配置类写完之后,可以执行一下SpringBoot的入口程序App.main(),方便我们在写配置文件的时候有参数提示;
server:
port: 60044
spring:
application:
name: rocket-producer
rocketmq:
producer:
namesrv-addr: 127.0.0.1:9876
group-name: ${spring.application.name}
max-message-size: 4194304
send-msg-timeout: 300000
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 0
3.消费者相关配置
3.1 定义消费者自动配置类
@ConfigurationProperties(prefix = "rocketmq.consumer")
@Component
@Data
public class RocketmqConsumerConfig {
private String namesrvAddr;
private String groupName;
private int consumeThreadMin;
private int consumeThreadMax;
// 订阅指定的 topic
private String topics;
// 订阅指定的 tag 默认的为* topic下的全部
private String tag = "*";
private int consumeMessageBatchMaxSize;
@Autowired
private MqMessageListenerProcessor mqMessageListenerProcessor;
@Bean
@ConditionalOnMissingBean
public DefaultMQPushConsumer defaultMQPushConsumer() throws RuntimeException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
// 设置 consumer 第一次启动是从队列头部开始消费还是队列尾部开始消费
// 如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 设置消费模型,集群还是广播,默认为集群
consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置一次消费消息的条数,默认为 1 条
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
consumer.registerMessageListener(mqMessageListenerProcessor);
try {
// 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,使用*;
System.out.println(topics);
System.out.println(tag);
consumer.subscribe(topics, tag);
// 启动消费
consumer.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
return consumer;
}
}
注意这里注入的MqMessageListenerProcessor,是消息监听,实现代码为
@Slf4j
@Component
public class MqMessageListenerProcessor implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if(CollectionUtils.isEmpty(list)){
log.info("收到的消息为空");
}else{
/**
* 业务逻辑
*/
log.info("收到了消息====="+new String(list.get(0).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
3.2消费者配置文件
server:
port: 60045
spring:
application:
name: rocket-consumer
rocketmq:
consumer:
namesrv-addr: 127.0.0.1:9876
group-name: ${spring.application.name}
consume-thread-min: 20
consume-thread-max: 64
consume-message-batch-max-size: 1
topics: TopicTest2020
4.测试
4.1 生产者生产消息
这里我我们通过controller进行模拟消息生产;
@RestController
public class ProducerController {
@Autowired
DefaultMQProducer defaultMQProducer;
@RequestMapping("/send")
public String sendMessage(String message) throws Exception {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest2020" ,"*" ,
message.getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = defaultMQProducer.send(msg);
//可以通过sendResult返回消息是否成功送达
return "ok";
}
}
模拟测试
访问 127.0.0.1:60044/send?message=testMessage
查看消费者控制台
至此,SpringBoot集成RocketMQ已经完成;
编程并不是一蹴而就的,每天都是新的革命,加油呀~
版权声明:本文为qq_33449307原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。