RocketMQ原生API使用
同步发送
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("TopicTest",
"TagA",
"OrderID1",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
等待消息返回后再继续进行下面的操作。
SendResult sendResult = producer.send(msg);
producer.shutdown();
异步发送
是异步发送,这里引入一个countDownLatch,保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
单向发送
这个方法没有返回值,也没有回调。就是只管把消息发出去就行了
producer.sendOneway(msg);
消费者消费消息
拉模式
PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
推模式
推模式也是由拉模式封装出来的
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.setNamesrvAddr("worker:9876");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//wrong time format 2017_0422_221800
// consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
顺序消息
RocketMQ保证的是消息的局部有序,而不是全局有序,只保证了每个OrderID的所有消息有序(发到了同一个queue),而并不能保证所有消息都有序。
默认情况下,消息发送者会
采取Round Robin轮询方式把消息发送到不同的MessageQueue
(分区队列),而消费者消费的时候也从多个MessageQueue上拉取消息,这种情况下消息是不能保证顺序的。
只有当
一组有序的消息发送到同一个MessageQueue上
时,才能利用MessageQueue先进先出的特性保证这一组消息有序。
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size(); 根据订单id取模上消息队列的大小
return mqs.get(index);
}
}, orderId);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//最新一条消息的偏移量
consumer.subscribe("OrderTopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for(MessageExt msg:msgs){
System.out.println("收到消息内容 "+new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
广播消息
把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组。
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
延迟消息
开源版本的RocketMQ中,对延迟消息并不支持任意时间的延迟设定(商业版本中支持),而是
只支持18个固定的延迟级别
,1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。从rocketmq-console控制台就能看出来
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
批量消息
指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。
如果批量消息大于1MB就不要用一个批次发送,而要拆分成多个批次消息发送。也就是说,一个批次消息的大小不要超过1MB,实际最大的限制是4194304字节,大概4MB。这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等
List<Message> listItem = splitter.next();
producer.send(listItem);
过滤消息
Message msg = new Message("TagFilterTest",
tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
只订阅TagA和TagC的消息
consumer.subscribe("TagFilterTest", "TagA || TagC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
SQL过滤
Message msg = new Message("SqlFilterTest",
tags[i % tags.length],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
msg.putUserProperty("a", String.valueOf(i));
在消费者端使用MessageSelector.bySql(String sql)返回的一个MessageSelector,sql中可以使用的参数有默认的TAGS和一个在生产者中加入的a属性
consumer.subscribe("SqlFilterTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
消息过滤是在Broker端进行,减少推送的数据量,消费组将过滤推给生产者,降低网络请求大小
事务消息
事务消息是在分布式系统中
保证最终一致性的两阶段提交的消息实现
。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
关键在TransactionMQProducer中指定了一个TransactionListener事务监听器
public class TransactionListenerImpl implements TransactionListener {
//在提交完事务消息后执行。
//返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。
//返回ROLLBACK_MESSAGE状态的消息会被丢弃。
//返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String tags = msg.getTags();
//TagA的消息会立即被消费者消费到
if(StringUtils.contains(tags,"TagA")){
return LocalTransactionState.COMMIT_MESSAGE;
//TagB的消息会被丢弃
}else if(StringUtils.contains(tags,"TagB")){
return LocalTransactionState.ROLLBACK_MESSAGE;
//其他消息会等待Broker进行事务状态回查。
}else{
return LocalTransactionState.UNKNOW;
}
}
//在对UNKNOWN状态的消息进行状态回查时执行。返回的结果是一样的。
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String tags = msg.getTags();
//TagC的消息过一段时间会被消费者消费到
if(StringUtils.contains(tags,"TagC")){
return LocalTransactionState.COMMIT_MESSAGE;
//TagD的消息也会在状态回查时被丢弃掉
}else if(StringUtils.contains(tags,"TagD")){
return LocalTransactionState.ROLLBACK_MESSAGE;
//剩下TagE的消息会在多次状态回查后最终丢弃
}else{
return LocalTransactionState.UNKNOW;
}
}
}
使用限制
-
不支持延迟消息和批量消息
-
默认将单个消息的
检查次数限制为 15 次
,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写
AbstractTransactionCheckListener
类来修改这个行为
-
在 Broker 配置文件中的参数
transactionMsgTimeout
这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性
CHECK_IMMUNITY_TIME_IN_SECONDS
来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
-
可能不止一次被检查或消费。
-
提交给用户的目标主题消息可能会失败,建议使用同步的双重写入机制。raft协议
-
事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者
RocketMQ事务消息执行流程。
1.生产者发送half消息
2.Broker接收half消息
3.生产者执行本地事务
4.根据事务执行结果向Broker发送回滚或提交请求
5.如果是未知状态,则Broker会向生产者发起定时回查
6.生产者根据回查状态再向Broker发送回滚或提交请求
7.回查次数默认是15次。
RocketMQ使用中常见的问题
使用RocketMQ如何保证消息不丢失?
哪些环节会有丢消息的可能?
跨网络就肯定会有丢消息的可能。MQ存盘时都会先写入操作系统的缓存page cache中,然后再由操作系统异步的将消息写入硬盘。有时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来得及写入硬盘的消息就会丢失
RocketMQ消息零丢失方案
生产者使用事务消息机制保证消息零丢失
为什么要发送个half消息?有什么用?
half消息是在订单系统进行下单操作前发送,并且对下游服务的消费者是不可见的。那这个消息的作用更多的体现在
确认RocketMQ的服务是否正常
。
half消息如果写入失败了怎么办?
half消息如果写入失败,我们就可以认为MQ的服务是有问题的,这时,就不能通知下游服务了。我们可以在下单时给订单一个状态标记,然后
等待MQ服务正常后再进行补偿操作
,等MQ服务正常后重新下单通知下游服务
订单系统写数据库失败了怎么办?
可以另外找个地方把订单消息先缓存起来(Redis、文本或者其他方式),然后给RocketMQ返回一个UNKNOWN状态。这样RocketMQ就会过一段时间来回查事务状态。我们就可以在
回查事务状态时再尝试把订单数据写入数据库
下单成功后如何优雅的等待支付成功
可以用事务消息的状态回查机制来替代定时的任务。在下单时,给Broker返回一个UNKNOWN的未知状态。而在
状态回查的方法中去查询订单的支付状态
。只需要配置RocketMQ中的事务消息回查次数(默认15次)和事务回查间隔时间(messageDelayLevel),
RocketMQ配置同步刷盘
把RocketMQ的刷盘方式
flushDiskType
配置成同步刷盘就可以保证消息在刷盘过程中不会丢失了。
Dledger的文件同步
在使用Dledger技术搭建的RocketMQ集群中,Dledger会通过两阶段提交的方式保证文件在主从之间成功同步。
消费者端不要使用异步消费机制
正常情况下,消费者端都是需要先处理本地事务,然后再给MQ一个ACK响应,这时MQ就会修改Offset,将消息标记为已消费,从而不再往其他消费者推送消息。所以在Broker的这种重新推送机制下,消息是不会在传输过程中丢失的。
异步消费的方式,就有可能造成消息状态返回后消费者本地业务逻辑处理失败造成消息丢失的可能。
NameServer挂了如何保证消息不丢失?
设计一个
降级方案
来处理这个问题了。例如在订单系统中,如果多次尝试发送RocketMQ不成功,那就只能另外找给地方(Redis、文件或者内存等)
把订单消息缓存下来
,然后
起一个线程定时的扫描这些失败的订单消息,尝试往RocketMQ发送。
这样等RocketMQ的服务恢复过来后,就能第一时间把这些消息重新发送出去。
RocketMQ消息零丢失方案总结
-
生产者使用事务消息机制。
-
Broker配置同步刷盘+Dledger主从架构
-
消费者不要使用异步消费。
-
整个MQ挂了之后准备降级方案
如果针对所有服务器都在同一个机房的场景,完全可以把Broker配置成异步刷盘来提升吞吐量。
有些对消息可靠性要求没有那么高的场景,在生产者端就可以采用其他一些更简单的方案来提升吞吐,而采用定时对账、补偿的机制来提高消息的可靠性。
如果消费者不需要进行消息存盘,那使用异步消费的机制带来的性能提升也是非常显著的。
使用RocketMQ如何保证消息顺序
局部有序
需要将有序的一组消息都存入同一个MessageQueue里,这样MessageQueue的FIFO设计天生就可以保证这一组消息的有序。RocketMQ中,可以在发送者发送消息时指定一个MessageSelector对象,让这个对象来决定消息发入哪一个MessageQueue。这样就可以保证一组有序的消息能够发到同一个MessageQueue里。
全局消息
将Topic配置成只有一个MessageQueue队列(默认是4个)。这样天生就能保证消息全局有序了。
使用RocketMQ如何快速处理积压消息?
通过mqadmin指令在后台检查各个Topic的消息延迟情况。
如果Topic下的MessageQueue配置得是足够多的
通过增加Consumer的服务节点数量来加快消息的消费
如果Topic下的MessageQueue配置得不够多
可以创建一个新的Topic,配置足够多的MessageQueue。然后把所有消费者节点的目标Topic转向新的Topic,并紧急上线一组新的消费者,只负责消费旧Topic中的消息,并转储到新的Topic中,这个速度是可以很快的。然后在新的Topic上,就可以通过增加消费者个数来提高消费速度了
RocketMQ的消息轨迹
在broker.conf中打开一个关键配置traceTopicEnable=true
消息轨迹数据存储
默认情况下,消息轨迹数据是存于一个系统级别的Topic ,RMQ_SYS_TRACE_TOPIC。这个Topic在Broker节点启动时,会自动创建出来
自定义轨迹数据存储的Topic
DefaultMQProducer和DefaultMQPushConsumer,他们的构造函数中,都有两个可选的参数来打开消息轨迹存储
enableMsgTrace:是否打开消息轨迹。默认是false。
customizedTraceTopic:配置将消息轨迹数据存储到用户指定的Topic 。
SpringBoot整合RocketMQ
发送事务消息
Message<String> message = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TRANSACTION_ID,"TransID_"+i)
//发到事务监听器里后,这个自己设定的TAGS属性会丢失。但是上面那个属性不会丢失。
.setHeader(RocketMQHeaders.TAGS,tags[i % tags.length])
//MyProp在事务监听器里也能拿到,为什么就单单这个RocketMQHeaders.TAGS拿不到?这只能去调源码了。
.setHeader("MyProp","MyProp_"+i)
.build();
String destination =topic+":"+tags[i % tags.length];
//这里发送事务消息时,还是会转换成RocketMQ的Message对象,再调用RocketMQ的API完成事务消息机制。
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message,destination);
消息消费者
@RocketMQMessageListener
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message : "+ message);
}
}
消息过滤:可以由里面的selectorType属性和selectorExpression来定制
有序消费还是并发消费:由consumeMode属性定制。
集群部署还是广播部署:由messageModel属性定制。
事务消息消费
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID);
String destination = arg.toString();
localTrans.put(transId,msg);
//这个msg的实现类是GenericMessage,里面实现了toString方法
//在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。
//而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀
System.out.println("executeLocalTransaction msg = "+msg);
//转成RocketMQ的Message对象
org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);
String tags = message.getTags();
if(StringUtils.contains(tags,"TagA")){
return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags,"TagB")){
return RocketMQLocalTransactionState.ROLLBACK;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID).toString();
Message originalMessage = localTrans.get(transId);
//这里能够获取到自定义的transaction_id属性
System.out.println("checkLocalTransaction msg = "+originalMessage);
//获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性
// String tags = msg.getHeaders().get(RocketMQHeaders.TAGS).toString();
String tags = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TAGS).toString();
if(StringUtils.contains(tags,"TagC")){
return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags,"TagD")){
return RocketMQLocalTransactionState.ROLLBACK;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
这里可以看到,对事务消息,SpringBoot进行封装时,就缺少了transactionId,这在事务控制中是非常关键的。
多个发送者组需要有不同的事务消息逻辑
@ExtRocketMQTemplateConfiguration()
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
@RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate")
核心原理
读队列与写队列
创建Topic时,可以看到要单独设置读队列和写队列。通常在运行时,都需要设置读队列=写队列
perm字段表示Topic的权限。
有三个可选项。 2:禁写禁订阅,4:可订阅,不能写,6:可写可订阅
写队列会真实的创建对应的存储文件,负责消息写入。而读队列会记录Consumer的Offset,负责消息读取。
对Topic的MessageQueue进行缩减
可以先缩减写队列,待空出来的读队列上的消息都被消费完了之后,再来缩减读队列,这样就可以比较平稳的实现队列缩减了。
消息持久化
直接采用磁盘文件保存消息,默认路径在${user_home}/store目录。这些存储目录可以在broker.conf中自行指定。
checkpoint
数据存盘检查点。里面主要记录commitlog文件、ConsumeQueue文件以及IndexFile文件最后一次刷盘的时间戳。
config/*.json
这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置、消费者组配置、消费者组消息偏移量Offset 等等一些信息。
abort
这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作。
CommitLog
存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名
存储所有消息实体。所有生产者发过来的消息,都会无差别的依次存储到Commitlog文件当中。可以减少查找目标文件的时间,因为文件大小已经固定不需要去磁盘上寻找空间,磁盘顺序写入,让消息以最快的速度落盘。在多Topic场景下,优势就比较明显。
文件结构
存储的每个消息单元长度是不固定的,具体格式可以参考org.apache.rocketmq.store.CommitLog,每次存CommitLog文件时,都会去检查当前CommitLog文件空间是否足够,如果不够的话,就重新创建一个CommitLog文件。文件名为当前消息的偏移量。
ConsumerQueue
存储消息在CommitLog的索引。一个MessageQueue一个文件,记录当前MessageQueue
被哪些消费者组消费到了哪一条CommitLog
。
加速消费者的消息索引,每个文件夹对应RocketMQ中
的一个Me
ssageQueue,文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量。通过ComsumeQueue文件,就可以快速找到CommitLog文件中感兴趣的消息记录。
消费者在ConsumeQueue文件当中的消费进度,会保存在config/
consumerOffset.json
文件当中。
文件结构
由30万个固定大小20byte的数据块组成,msgPhyOffset(8byte,消息在文件中的起始位置)+msgSize(4byte,消息在文件中占用的长度)+msgTagCode(8byte,消息的tag的Hash值)。
IndexFile
为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程,辅助消息检索。用的时间命名。
文件结构
由 indexHeader(固定40byte)+ slot(固定500W个,每个固定20byte) + index(最多500W*4个,每个固定20byte) 三个部分组成。
过期文件删除
如何判断过期文件:
对于非当前写的文件,如果超过了一定的保留时间,那么这些文件都会被认为是过期文件,随时可以删除。这个保留时间就是在broker.conf中配置的fileReservedTime属性。判断文件是否过期的唯一标准就是非当前写文件的保留时间
何时删除过期文件:
有一个定时任务,对文件进行扫描,并且触发文件删除的操作。用户可以指定文件删除操作的执行时间。在broker.conf中deleteWhen属性指定。默认是凌晨四点。
如果磁盘空间的使用率达到一定的阈值,也会触发过期文件删除。broker的磁盘空间不要少于4G。
高效文件写
零拷贝技术加速文件读写
是操作系统层面提供的一种加速文件读写的操作机制,Java应用层,对应着mmap和sendFile两种方式
理解CPU拷贝和DMA拷贝
CPU拷贝
应用程序无法直接操作硬件,需要通过内核空间进行操作转换,应用程序需要与网卡、磁盘等硬件进行数据交互时,就需要在用户态和内核态之间来回的复制数据。原本都是需要由CPU来进行任务的分配、调度等管理步骤的,当发生大规模的数据读写操作时,CPU的占用率会非常高。
DMA
操作系统为了避免CPU完全被各种IO调用给占用,引入了DMA(直接存储器存储)。由DMA来
负责这些频繁的IO操作
。DMA是一套独立的指令集,
不会占用CPU的计算资源
。这样,CPU就不需要参与具体的数据复制的工作,
只需要管理DMA的权限
即可。
极大的释放了CPU的性能,因此他的拷贝速度会比CPU拷贝要快很多。但是,其实DMA拷贝本身,也在不断优化。数据复制过程中,
依然需要借助数据总进线
。当系统内的IO操作过多时,还是会
占用过多的数据总线
,造成总线冲突,最终还是会
影响数据读写性能
。
Channel
为了避免DMA总线冲突对性能的影响,后来又引入了Channel通道的方式。Channel,
是一个完全独立的处理器,专门负责IO操作
。
既然是处理器,Channel就有自己的IO指令,与CPU无关,他也
更适合大型的IO操作,性能更高
。
所谓的零拷贝技术,其实并不是不拷贝,而是要尽量减少CPU拷贝。
mmap文件映射机制
主要是通过java.nio.channels.FileChannel的map方法完成映射。NIO中MappedByteBuffer方式实现的零拷贝。将文件从用户态映射到内存,减少了一次拷贝。
public static void main(String[] args) throws IOException {
RandomAccessFile randomAccessFile = new RandomAccessFile("MappedBF.txt", "rw");
//获取对应的通道
FileChannel channel = randomAccessFile.getChannel();
/**
* 参数1: FileChannel.MapMode.READ_WRITE 使用的读写模式
* 参数2: 0 : 可以直接修改的起始位置
* 参数3: 5: 是映射到内存的大小(不是索引位置) ,即将 1.txt 的多少个字节映射到内存
* 可以直接修改的范围就是 0-5
* 实际类型 DirectByteBuffer
*/
MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
mappedByteBuffer.put(0, (byte) 'H');
mappedByteBuffer.put(3, (byte) '9');
// mappedByteBuffer.put(5, (byte) 'Y');//IndexOutOfBoundsException
randomAccessFile.close();
System.out.println("修改成功~~");
}
应用程序对磁盘文件的读与写,都需要经过
内核态与用户态之间的状态切换
,每次状态切换的过程中,就需要有大量的数据复制。内核态与用户态之间的拷贝
依然是CPU拷贝
。零拷贝技术优化的重点,就是
内核态与用户态之间的这两次拷贝
。
mmap文件映射的方式
在用户态不再保存文件的内容,而
只保存文件的映射,包括文件的内存起始地址,文件大小
等。真实的数据,也
不需要在用户态留存
,可以
直接通过操作映射,在内核态完成数据复制
。
HeapByteBuffer
没有使用零拷贝的普通文件读写机制。
JDK的NIO包中,java.nio.HeapByteBuffer映射的就是JVM的一块堆内内存,在HeapByteBuffer中,会由一个byte数组来缓存数据内容,所有的读写操作也是先操作这个byte数组。
DirectByteBuffer
mmap的读写机制
NIO把包中的另一个实现类java.nio.DirectByteBuffer则映射的是一块堆外内存。并没有一个数据结构来保存数据内容,
只保存了一个内存地址
。所有对数据的读写操作,都
通过unsafe魔法类直接交由内核完成
mmap文件映射机制示例
启动任何一个Java程序时,其实都大量用到了mmap文件映射,Java指令运行起来后,可以用jps查看到运行的进程ID。然后,就可以使用lsof -p {PID}的方式查看文件的映射情况。
mem类型的FD其实就是文件映射。
-
cwd 表示程序的工作目录。
-
rtd 表示用户的根目录。
-
txt 表示运行程序的指令。
-
下面的1u表示Java应用的标准输出,2u表示Java应用的标准错误输出,默认的/dev/pts/1是linux当中的伪终端。
-
通常服务器上会写 java xxx 1>text.txt 2>&1 这样的脚本,就是指定这里的1u,2u。
mmap机制建议的映射文件大小不要超过2G 。而RocketMQ做大的CommitLog文件
保持在1G固定大小,也是为了方便文件映射。
sendFile机制
通过java.nio.channels.FileChannel的transferTo方法完成。NIO中transfer方式实现的零拷贝。底层直接使用DMA,减少内核态与用户态之间的切换次数。
NIO中transfer方式的零拷贝。
final FileInputStream sourceFis = new FileInputStream(sourceFile);
final FileChannel sourceReadChannel = sourceFis.getChannel();
final FileOutputStream targetFos = new FileOutputStream(targetFile);
final FileChannel targetWriteChannel = targetFos.getChannel();
sourceReadChannel.transferTo(0,sourceFile.length(),targetWriteChannel);
sourceFis.close();
targetFos.close();
传统数据流拷贝方式
BufferedReader sourceBr = new BufferedReader(new InputStreamReader(new FileInputStream(sourceFile)));
BufferedWriter targetBw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(targetFile)));
while (true){
final String line = sourceBr.readLine();
if(null == line || "".equals(line)){
break;
}
targetBw.write(line);
}
targetBw.flush();
sourceBr.close();
targetBw.close();
Kafka当中是如何使用零拷贝
Kafka将文件从磁盘复制到网卡时,就大量的使用了零拷贝。
早期的sendfile实现机制其实还是依靠CPU进行页缓存与socket缓存区之间的数据拷贝。
但是,在后期的不断改进过程中,sendfile
优化了实现机制
,在拷贝过程中,并不直接拷贝文件的内容,而是
只拷贝一个带有文件位置和长度等信息的文件描述符FD
,这样就大大减少了需要传递的数据。而
真实的数据内容
,会交由DMA控制器,
从页缓存中打包异步发送到socket
中。
sendfile机制在内核态直接完成了数据的复制,不需要用户态的参与,所以这种机制的传输效率是非常稳定的。sendfile机制
非常适合大数据的复制转移
。
刷盘机制保证消息不丢失
将内存状态的数据写入到磁盘当中,这样数据才能真正完成持久化,断电也不会丢失。这个过程就称为刷盘。
同步刷盘
在返回写成功状态时,消息已经被写入磁盘。消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态
异步刷盘
返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成
SYNC_FLUSH(同步)
、
ASYNC_FLUSH
中的 一个。
消息主从复制
同步复制
等Master和Slave都写入消息成功后才反馈给客户端写入成功的状态。容易恢复数据。但是同步复制会增大数据写入的延迟,降低系统的吞吐量。
异步复制
只要master写入消息成功,就反馈给客户端写入成功的状态。然后再异步的将消息复制给Slave节点。拥有较低的延迟和较高的吞吐量。数据容易丢失。
Broker配置文件里的
brokerRole
参数进行设置,ASYNC_MASTER、
SYNC_MASTER
、SLAVE
负载均衡
Producer负载均衡
默认会
轮询目标Topic下的所有MessageQueue
,并采用
递增取模
的方式往不同的MessageQueue上发送消息,以达到
让消息平均落在不同的queue
上的目的。
可以指定一个
MessageQueueSelector
。通过这个对象来将消息发送到自己指定的MessageQueue上。这样可以
保证消息局部有序
。
Consumer负载均衡
集群模式
只需要
投递到订阅这个topic的Consumer Group下的一个实例
,采用主动拉取的方式拉取并消费消息,在拉取的时候需要
明确指定拉取哪一条message queue
。每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
分配策略
AllocateMachineRoomNearby
将同机房的Consumer和Broker优先分配在一起。可以通过一个machineRoomResolver对象来定制Consumer和Broker的机房解析规则
AllocateMessageQueueAveragely
将所有MessageQueue平均分给每一个消费者
AllocateMessageQueueAveragelyByCircle
轮询分配。轮流的给一个消费者分配一个MessageQueue。
AllocateMessageQueueByConfig
不分配,直接指定一个messageQueue列表。
AllocateMessageQueueByMachineRoom
按逻辑机房的概念进行分配
AllocateMessageQueueConsistentHash
一致性哈希策略只需要指定一个虚拟节点数,是用的一个哈希环的算法,虚拟节点是为了让Hash数据在换上分布更为均匀。
广播模式
所有Consumer都分到所有的Queue,客户端自行维护自己的消费偏移量。
消息重试
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置。
重试
-
返回Action.ReconsumeLater-推荐
-
返回null
-
抛出异常
不重试,可以直接返回Action.CommitMessage。
重试消息如何处理
重试的消息会进入一个
“%RETRY%”+ConsumeGroup
的队列中。默认允许每条消息最多重试16次,
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
重试次数
-
如果消息重试16次后仍然失败,消息将不再投递
-
一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的
-
RocketMQ可以进行定制,通过consumer.
setMaxReconsumeTimes
(20);当定制的重试次数超过16次后,消息的重试时间间隔均为2小时。
-
顺序消息会重试Integer的最大值的次数
,因为只有这个消息发送成功才可以发送下一条消息
MessageId
旧版本的RocketMQ中,一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。
在4.9.1版本中,
每次重试MessageId都会重建
。
配置覆盖
消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效。并且最后启动的Consumer会
覆盖之前启动的Consumer的配置
。
消息幂等
at most once
最多一次:每条消息
最多只会被消费一次
,RocketMQ中可以直接
用异步发送、sendOneWay
等方式就可以保证。
at least once
至少一次:每条消息
至少会被消费一次,同步发送、事务消息等很多方式能够保证。
exactly once
刚刚好一次:每条消息都只会确定的消费一次,需要由业务系统自行保证消息的幂等性。
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时
生产者意识到消息发送失败并尝试再次发送消息
,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复
息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端
将在网络恢复后再次尝试投递之前已被处理过的消息
,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
负载均衡时消息重复
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会
触发 Rebalance
,此时消费者可能会收到重复消息。
处理方式
-
RocketMQ的每条消息都有一个
唯一的MessageId
,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。
-
MessageId是无法保证全局唯一
,使用业务上唯一的一个标识
比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。