实战发送消息
注意 :记得启动nameser和broker
-
快速创建springboot项目
https://start.spring.io/
-
加入相关依赖,这里的版本记得要和服务端版本一致,不然启动不能自动创建topic!切记!
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
-
Message对象
- topic: 主题名称
- tag: 标签,用于过滤
- key: 消息唯一标示,可以是业务字段组合
- body: 消息体,字节数组
-
注意 发送消息到Broker,需要判断是否有此topic启动broker的时候,
本地环境建议开启自动创建topic,生产环境建议关闭自动化创建topic
建议先手工创建Topic,如果靠程序自动创建,然后再投递消息,会出现延迟情况
-
概念模型: 一个topic下面对应多个queue,可以在创建Topic时指定,如订单类topic
-
通过可视化管理后台查看消息
这里我们编写消息提供者!
@Component
public class PayProducer {
private String producerGroup = "pay_group";
private String nameServerAddr = "39.96.192.171:9876"; //多节点逗号分隔
private DefaultMQProducer producer;
public PayProducer(){
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(nameServerAddr);
start();
}
public DefaultMQProducer getProducer(){
return this.producer;
}
/**
* 对象在用之前必须要调用一次,只能初始化一次
*/
public void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 一般在应用上下文,使用上下文监听器,进行关闭
*/
public void shutdown(){
this.producer.shutdown();
}
}
@RestController
public class PayController {
@Autowired
private PayProducer payProducer;
private static final String topic = "pay_topic";
@RequestMapping("/api/v1/pay_cb")
public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message message = new Message(topic, "taga", ("hello rocketmq!" + text).getBytes());
SendResult sendResult = payProducer.getProducer().send(message);
System.out.println(sendResult);
return null;
}
}
启动服务,访问路径,应该会报如下的500错误,看控制台:
原因:Broker 禁止自动创建 Topic,且用户没有通过手工方式创建 此Topic, 或者broker和Nameserver网络不通 解决:
通过
sh bin/mqbroker -m 查看配置 autoCreateTopicEnable=true, 则自动创建topic
Centos7 关闭防火墙 systemctl stop firewalld
然后我们重新访问,又会报超时的错误
原因:阿里云存在多网卡,rocketmq都会根据当前网卡选择一个IP使用,当你的机器有多块网卡时,很有可能会有问题。比如,我遇到的问题是我机器上有两个IP,一个公网IP,一个私网IP, 因此需要配置broker.conf 指定当前的公网ip, 然后重新启动broker 新增配置:conf/broker.conf (属性名称brokerIP1=broker所在的公网ip地址 ) 新增这个配置:brokerIP1=120.76.62.13 启动命令:nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &
控制台查看不了数据,提示连接 10909错误 原因:Rocket默认开启了VIP通道,VIP通道端口为10911-2=10909
解决:阿里云安全组需要增加一个端口 10909
再次访问!idea控制台发送消息成功!
我们查看rocketmq控制台
可以查看到我们发送的消息!接下来我们创建消费者来消费消息!