前言
RocketMQ中的延迟故障机制是为了帮助Producer能够通过消息发送延迟或者消息发送结果主动感知Broker忙碌或者故障,消息发送延迟或者消息发送失败时可以将Broker排除在选择列表之外。这个机制默认是不开启的,如果需要开启这个机制需要在创建Producer时主动开启。
消息延迟故障机制使用
消息延迟故障机制需要在创建Producer时,通过
producer.setSendLatencyFaultEnable(true)
主动开启才能够生效。
DefaultMQProducer producer = new DefaultMQProducer();
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.setSendLatencyFaultEnable(true);
复制代码
消息延迟故障机制生效时机
这个机制只有在producer发送消息时,由Producer自动选择MessageQueue才会生效,如果发送消息时指定了要发送的MessageQueue或者指定了MessageQueueSelector,即使开启了这个配置,也不会生效。
消息延迟故障机制源码分析
RocketMQ消息延迟故障机制的源码在
org/apache/rocketmq/client/latency
包下,在这个包下只有一个接口
LatencyFaultTolerance
,以及两个类
LatencyFaultToleranceImpl
,
MQFaultStrategy
,下面我们来分析下源码。
LatencyFaultTolerance源码分析
LatencyFaultTolerance接口的作用是提供broker延迟信息记录的方法。它提供了broker延迟信息更新,查询,删除的方法以及根据延迟信息选择延迟相对较小的broker。
public interface LatencyFaultTolerance<T> {
// 更新延迟信息
void updateFaultItem(final T name/*brokerName*/, final long currentLatency/*当前延迟*/, final long notAvailableDuration/*不可用周期*/);
// 查询broker是否可用
boolean isAvailable(final T name/*brokerName*/);
// 删除延迟信息
void remove(final T name/*brokerName*/);
// 随机选择一个broker
T pickOneAtLeast();
}
复制代码
LatencyFaultToleranceImpl源码分析
通过名字就可以看出LatencyFaultToleranceImpl是LatencyFaultTolerance的实现类。它包含了两个成员变量
- faultItemTable
faultItemTable缓存了broker和延迟信息的关系。它是一个key是brokername,value是记录broker延迟信息的对象,
- randomItem
ThreadLocalIndex内部包含一个ThreadLocal,它记录着当前线程的一个随机index,并提供
incrementAndGet()
,获取自增的index。
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
// key是brokerName,value是FaultItem(记录延迟时间)
private final ConcurrentHashMap<String/*brokerName*/, FaultItem> faultItemTable = new ConcurrentHashMap<>(16);
private final ThreadLocalIndex randomItem = new ThreadLocalIndex();
// broker延迟信息对象
class FaultItem implements Comparable<FaultItem> {
// brokerName
private final String name;
// 截至当前延迟
private volatile long currentLatency;
// 开始时间戳
private volatile long startTimestamp;
// 省略部分代码
}
}
复制代码
更新broker延迟信息的方法如下(LatencyFaultToleranceImpl#updateFaultItem),它的整个逻辑与我们工作中使用缓存的逻辑很相似,包含下面三个步骤
- 从延迟故障map(faultItemTable)中获取获取延迟故障对象
- 如果延迟故障对象不存在,则创建一个延迟故障对象,并put到map中
- 如果延迟故障对象存在,则更新当前broker的延迟故障对象
@Override
public void updateFaultItem(final String name/*brokerName*/, final long currentLatency/*当前延迟*/, final long notAvailableDuration/*不可用周期*/) {
// 查看brokername是否被标记过为不可用
FaultItem old = this.faultItemTable.get(name);
// 如果为空,则说明第一次被标记
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
// 记录本次耗时
faultItem.setCurrentLatency(currentLatency);
// 当前时间+不可用时间=截止时间
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) { // 如果old不为空,则说明原来已经有map中已经有值了,因此重新标记
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
// 如果不为空,则重新标记
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
复制代码
随机选择broker的逻辑如下
- 将broker延迟故障map中的所有FaultItem添加到LinkedList中
- 按照延迟时间排序
- 随机取LinkedList中index是LinkedList的size在前一半的broker
// 随机选择一个延迟时间排在前50%的broker
@Override
public String pickOneAtLeast() {
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
List<FaultItem> tmpList = new LinkedList<>();
while (elements.hasMoreElements()) {
final FaultItem faultItem = elements.nextElement();
tmpList.add(faultItem);
}
if (!tmpList.isEmpty()) {
// 按照延迟时间排序
Collections.sort(tmpList);
final int half = tmpList.size() / 2;
if (half <= 0) {
return tmpList.get(0).getName();
} else {
// 随机选择延迟时间排前50%的broker
final int i = this.randomItem.incrementAndGet() % half;
return tmpList.get(i).getName();
}
}
return null;
}
复制代码
MQFaultStrategy源码分析
根据topic路由信息选择MessageQueue
MQFaultStrategy中最重要的方法是根据topic路由信息和前一个的brokername返回一个MessageQueue。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
// 获取MessageQueue选择索引,并+1
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// index与messageQueueSize取余,如果可用,则返回,否则选择下一个MessageQueue
int pos = index++ % tpInfo.getMessageQueueList().size();
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
// 随机选择一个broker
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
// 轮询选择一个写队列
int writeQueueNums = tpInfo.getWriteQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
复制代码
上面方法中的源码整理成流程图如下
更新broker延迟故障时间
在MQFaultStrategy内部有两个数组最大延迟数组(latencyMax)以及不可用周期数据组(notAvailableDuration)
private long[] latencyMax = {50L/*50ms*/, 100L/*100ms*/, 550L/*550ms*/, 1000L/*1s*/, 2000L/*2s*/, 3000L/*3s*/, 15000L/*15s*/};
// 不可用周期
private long[] notAvailableDuration = {0L, 0L, 30000L/*30s*/, 60000L/*1 min*/, 120000L/*2 min*/, 180000L/*3min*/, 600000L/*10min*/};
复制代码
producer在推送消息时会根据推送消息耗时更新broker的不可用周期时间,消息耗时与broker不可用周期对应关系如下。当发送消息耗时在0~100ms时,不可用周期为0s。发送消息耗时在100ms~550ms时,broker不可用周期为30s。以此类推,当发送消息耗时在3s~15s时,broker不可用周期为10min。
更新broker延迟故障时间源码如下
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
// 打开发送延迟故障机制
if (this.sendLatencyFaultEnable) {
// 如果是个隔离异常则标记执行持续时长为30秒,并根据执行时长计算broker不可用时长
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
// 记录broker不可用时长信息
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
复制代码
延迟故障机制在消息推送过程的影响
DefaultMQProducerImpl在推送
同步消息
并且未指定MessageQueue时,会调用
MQFaultStrategy#selectOneMessageQueue
选择MessageQueue。消息推送完成后无论是正常返回推送结果还是抛出异常会调用
MQFaultStrategy#updateFaultItem
更新推送延迟时间。具体调用过程可以参考下面时序图。