4 安装启动
4.1 下载
https://rocketmq.apache.org/dowloading/releases/
4.2 安装
先决条件:
- 64bit OS, Linux/Unix/Mac is recommended;(Windows user see guide below)
- 64bit JDK 1.8+;
- Maven 3.2.x;
- Git;
- 4g+ free disk for Broker server
4.3 启动
(1)配置环境变量
(2)启动NameServer和Broker
(3)测试生产者和消费者
发送信息
:tools.cmd org.apache.rocketmq.example.quickstart.Producer
接收消息
:tools.cmd org.apache.rocketmq.example.quickstart.Consumer
5 Java整合RocketMQ案例
5.1 入门案例
5.1.1 新建Maven项目
5.1.2 引入依赖
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
5.1.3 生产者案例
(1)同步发送
/**
* @desc: Producer端发送同步消息
* @author: YanMingXin
* @create: 2021/9/14-14:09
* @info: 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
**/
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("my_mq_one");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" ,
"TagA" ,
("Hello RocketMQ " + i).getBytes()
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.println(sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
(2)单向发送
/**
* @desc: 单向发送消息
* @author: YanMingXin
* @create: 2021/9/14-14:10
* @info: 这种方式主要用在不特别关心发送结果的场景,例如日志发送。
**/
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("my_mq_one");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" ,
"TagA",
("Hello RocketMQ " + i).getBytes()
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
(3)异步发送
/**
* @desc: 发送异步消息
* @author: YanMingXin
* @create: 2021/9/14-14:09
* @info: 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
**/
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("my_mq_one");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
// 根据消息数量实例化倒计时计算器
final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
for (int i = 0; i < messageCount; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 等待5s
countDownLatch.await(5, TimeUnit.SECONDS);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
5.1.4 消费者案例
/**
* @desc: 消费消息
* @author: YanMingXin
* @create: 2021/9/14-14:12
**/
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_mq_one_consumer");
// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started......");
}
}
5.1.5 研究下Message类
(1)理解:
Message是一个RocketMQ中的类也可以说是一个普通的对象,是消息发送和消费的最小实体。
(2)构造方法:
(3)几个重要的成员变量:
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
private String transactionId;
- topic:用于区分消息的Topic
- flag:
- properties:用于获取某些外部属性和属性值
- body:消息体
- transactionId:在开启事务的情况下保存事务的ID
5.2 顺序消息案例
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
5.2.1 顺序消息生产
/**
* @desc: Producer,发送顺序消息
* @author: YanMingXin
* @create: 2021/9/14-15:15
**/
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 订单列表
List<OrderStep> orderList = new Producer().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
/**
* 订单的步骤
*/
@Data
@ToString
@Accessors(chain = true)
private static class OrderStep {
private long orderId;
private String desc;
}
/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
final long order1 = 15103111039L;
final long order2 = 15103111065L;
final long order3 = 15103117235L;
List<OrderStep> orderList = new ArrayList<>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(order1).setDesc("order1创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(order2).setDesc("order2创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(order1).setDesc("order1付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(order3).setDesc("order3创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(order2).setDesc("order2付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(order3).setDesc("order3付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(order2).setDesc("order2完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(order1).setDesc("order1推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(order3).setDesc("order3完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(order1).setDesc("order1完成");
orderList.add(orderDemo);
return orderList;
}
}
5.2.2 顺序消息消费
/**
* @desc: 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
* @author: YanMingXin
* @create: 2021/9/14-15:27
**/
public class ConsumerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
5.3 延时消息案例
5.3.1 延时消息生产者
/**
* @desc: 发送延时消息
* @author: YanMingXin
* @create: 2021/9/14-15:43
**/
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TopicTest", ("Hello scheduled message " + i).getBytes());
// 设置延时等级4,这个消息将在30s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(4);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}
5.3.2 演示消息消费者
/**
* @desc: 启动消费者等待传入订阅消息
* @author: YanMingXin
* @create: 2021/9/14-15:43
**/
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topics
consumer.subscribe("TopicTest", "*");
// 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
5.3.3 使用场景和限制
(1)使用场景
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
(2)限制
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
//等价于 1->1s 2->5s 3->10s ......
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码
SendMessageProcessor.java
5.4 批量消息案例
5.4.1 批量消息生产者
/**
* @desc: 批量生产
* @author: YanMingXin
* @create: 2021/9/14-16:24
**/
public class BatchMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("BatchMessageProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
producer.start();
String topic = "TopicTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
// 关闭生产者
producer.shutdown();
}
}
5.4.2 批量消息消费者
/**
* @desc: 启动消费者等待传入订阅消息
* @author: YanMingXin
* @create: 2021/9/14-15:43
**/
public class BatchMessageConsumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchMessageConsumer");
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topics
consumer.subscribe("TopicTest", "*");
// 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
5.4.3 消息列表分割
复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
private int getStartIndex() {
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while(tmpSize > SIZE_LIMIT) {
currIndex += 1;
Message message = messages.get(curIndex);
tmpSize = calcMessageSize(message);
}
return currIndex;
}
private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length();
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节
return tmpSize;
}
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}
参考:https://github.com/apache/rocketmq/tree/master/docs/cn