引进maven依赖
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
流程
- 发送者步骤分析
- 创建Producer,并指定生产者组名
- 指定NameServer的地址
- 启动Producer
- 创建消息对象,指定Topic、Tag及消息体
- 发送消息
- 关闭Producer
- 消费者步骤分析
- 创建Consumer,并指定消费者组名
- 指定NameServer的地址
- 订阅Topic、Tag
- 设置回调函数、处理消息
- 启动Consumer
消息发送
- 同步消息
客户端线程阻塞消息,直到得到结果,如消息通知、短信
public static void main(String[] argv) throws Exception {
//创建Producer,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//指定NameServer的地址
producer.setNamesrvAddr("106.55.146.154:9876;39.106.142.155:9876");
//启动Producer
producer.start();
//创建消息对象,指定Topic、Tag及消息体
for (int i=0; i<10; i++) {
Message msg = new Message("base", "tag1", ("hello world" + i).getBytes());
//发送消息
SendResult result = producer.send(msg);
System.out.println(result.getSendStatus());
System.out.println(result.getMsgId());
System.out.println(result.getMessageQueue());
TimeUnit.SECONDS.sleep(1);
}
//关闭Producer
producer.shutdown();
}
- 异步消息
对时间比较敏感的业务场景
public static void main(String[] argv) throws Exception {
//创建Producer,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//指定NameServer的地址
producer.setNamesrvAddr("106.55.146.154:9876;39.106.142.155:9876");
//启动Producer
producer.start();
//创建消息对象,指定Topic、Tag及消息体
for (int i=0; i<10; i++) {
Message msg = new Message("base", "tag1", ("hello world" + i).getBytes());
//发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult.getSendStatus());
System.out.println(sendResult.getMsgId());
System.out.println(sendResult.getMessageQueue());
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable.getMessage());
}
});
TimeUnit.SECONDS.sleep(1);
}
//关闭Producer
producer.shutdown();
}
- 单向消息
不关注发送成功或失败的场景,如日志
public static void main(String[] argv) throws Exception {
//创建Producer,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//指定NameServer的地址
producer.setNamesrvAddr("106.55.146.154:9876;39.106.142.155:9876");
//启动Producer
producer.start();
//创建消息对象,指定Topic、Tag及消息体
for (int i=0; i<10; i++) {
Message msg = new Message("base", "tag1", ("hello world" + i).getBytes());
//发送消息
producer.sendOneway(msg);
TimeUnit.SECONDS.sleep(1);
}
//关闭Producer
producer.shutdown();
}
消息消费
- 基本消费
public static void main(String[] argv) throws Exception{
//创建Consumer,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//指定NameServer的地址
consumer.setNamesrvAddr("106.55.146.154:9876;39.106.142.155:9876");
//订阅Topic、Tag
consumer.subscribe("base", "tag1");
//设置回调函数、处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach(v -> {
System.out.println(v);
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//
consumer.start();
}
消费模式
- 负载均衡模式(默认模式)
消费者共同分担消费任务,例如,生产者来10个消息,总共有两个消费者,一个消费者消费2个,一个消费者消费8个
consumer.setMessageModel(MessageModel.CLUSTERING);
- 广播模式
所有消费者都要消费生产者发送的全部消息,例如,生产者来10个消息,总共两个消费者,一个消费者消费10,另一个消费者也消费10个
consumer.setMessageModel(MessageModel.BROADCASTING);
版权声明:本文为qq_19831379原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。