RocketMQ源码分析:Producer中延迟故障机制

  • Post author:
  • Post category:其他


前言

RocketMQ中的延迟故障机制是为了帮助Producer能够通过消息发送延迟或者消息发送结果主动感知Broker忙碌或者故障,消息发送延迟或者消息发送失败时可以将Broker排除在选择列表之外。这个机制默认是不开启的,如果需要开启这个机制需要在创建Producer时主动开启。

消息延迟故障机制使用

消息延迟故障机制需要在创建Producer时,通过

producer.setSendLatencyFaultEnable(true)

主动开启才能够生效。

DefaultMQProducer producer = new DefaultMQProducer();
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.setSendLatencyFaultEnable(true);
复制代码


消息延迟故障机制生效时机

这个机制只有在producer发送消息时,由Producer自动选择MessageQueue才会生效,如果发送消息时指定了要发送的MessageQueue或者指定了MessageQueueSelector,即使开启了这个配置,也不会生效。

消息延迟故障机制源码分析

RocketMQ消息延迟故障机制的源码在

org/apache/rocketmq/client/latency

包下,在这个包下只有一个接口

LatencyFaultTolerance

,以及两个类

LatencyFaultToleranceImpl

,

MQFaultStrategy

,下面我们来分析下源码。

LatencyFaultTolerance源码分析

LatencyFaultTolerance接口的作用是提供broker延迟信息记录的方法。它提供了broker延迟信息更新,查询,删除的方法以及根据延迟信息选择延迟相对较小的broker。

public interface LatencyFaultTolerance<T> {
    // 更新延迟信息
    void updateFaultItem(final T name/*brokerName*/, final long currentLatency/*当前延迟*/, final long notAvailableDuration/*不可用周期*/);
    // 查询broker是否可用
    boolean isAvailable(final T name/*brokerName*/);
    // 删除延迟信息
    void remove(final T name/*brokerName*/);
    // 随机选择一个broker
    T pickOneAtLeast();
}
复制代码

LatencyFaultToleranceImpl源码分析

通过名字就可以看出LatencyFaultToleranceImpl是LatencyFaultTolerance的实现类。它包含了两个成员变量

  • faultItemTable

faultItemTable缓存了broker和延迟信息的关系。它是一个key是brokername,value是记录broker延迟信息的对象,

  • randomItem

ThreadLocalIndex内部包含一个ThreadLocal,它记录着当前线程的一个随机index,并提供

incrementAndGet()

,获取自增的index。

public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
    // key是brokerName,value是FaultItem(记录延迟时间)
  private final ConcurrentHashMap<String/*brokerName*/, FaultItem> faultItemTable = new ConcurrentHashMap<>(16);
​
    private final ThreadLocalIndex randomItem = new ThreadLocalIndex();
    // broker延迟信息对象
    class FaultItem implements Comparable<FaultItem> {
        // brokerName
        private final String name;
        // 截至当前延迟
        private volatile long currentLatency;
        // 开始时间戳
        private volatile long startTimestamp;
        // 省略部分代码
    }
}
复制代码

更新broker延迟信息的方法如下(LatencyFaultToleranceImpl#updateFaultItem),它的整个逻辑与我们工作中使用缓存的逻辑很相似,包含下面三个步骤

  1. 从延迟故障map(faultItemTable)中获取获取延迟故障对象
  2. 如果延迟故障对象不存在,则创建一个延迟故障对象,并put到map中
  3. 如果延迟故障对象存在,则更新当前broker的延迟故障对象
   @Override
    public void updateFaultItem(final String name/*brokerName*/, final long currentLatency/*当前延迟*/, final long notAvailableDuration/*不可用周期*/) {
        // 查看brokername是否被标记过为不可用
        FaultItem old = this.faultItemTable.get(name);
        // 如果为空,则说明第一次被标记
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            // 记录本次耗时
            faultItem.setCurrentLatency(currentLatency);
            // 当前时间+不可用时间=截止时间
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
​
            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {  // 如果old不为空,则说明原来已经有map中已经有值了,因此重新标记
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            // 如果不为空,则重新标记
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }
复制代码

随机选择broker的逻辑如下

  1. 将broker延迟故障map中的所有FaultItem添加到LinkedList中
  2. 按照延迟时间排序
  3. 随机取LinkedList中index是LinkedList的size在前一半的broker
    // 随机选择一个延迟时间排在前50%的broker
    @Override
    public String pickOneAtLeast() {
        final Enumeration<FaultItem> elements = this.faultItemTable.elements();
        List<FaultItem> tmpList = new LinkedList<>();
        while (elements.hasMoreElements()) {
            final FaultItem faultItem = elements.nextElement();
            tmpList.add(faultItem);
        }
        if (!tmpList.isEmpty()) {
            // 按照延迟时间排序
            Collections.sort(tmpList);
            final int half = tmpList.size() / 2;
            if (half <= 0) {
                return tmpList.get(0).getName();
            } else {
                // 随机选择延迟时间排前50%的broker
                final int i = this.randomItem.incrementAndGet() % half;
                return tmpList.get(i).getName();
            }
        }
        return null;
    }
复制代码

MQFaultStrategy源码分析

根据topic路由信息选择MessageQueue

MQFaultStrategy中最重要的方法是根据topic路由信息和前一个的brokername返回一个MessageQueue。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        try {
            // 获取MessageQueue选择索引,并+1
            int index = tpInfo.getSendWhichQueue().incrementAndGet();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                // index与messageQueueSize取余,如果可用,则返回,否则选择下一个MessageQueue
                int pos = index++ % tpInfo.getMessageQueueList().size();
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    return mq;
            }
            // 随机选择一个broker
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            // 轮询选择一个写队列
            int writeQueueNums = tpInfo.getWriteQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
​
        return tpInfo.selectOneMessageQueue();
    }
​
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}
复制代码

上面方法中的源码整理成流程图如下

更新broker延迟故障时间

在MQFaultStrategy内部有两个数组最大延迟数组(latencyMax)以及不可用周期数据组(notAvailableDuration)

private long[] latencyMax = {50L/*50ms*/, 100L/*100ms*/, 550L/*550ms*/, 1000L/*1s*/, 2000L/*2s*/, 3000L/*3s*/, 15000L/*15s*/};
// 不可用周期
private long[] notAvailableDuration = {0L, 0L, 30000L/*30s*/, 60000L/*1 min*/, 120000L/*2 min*/, 180000L/*3min*/, 600000L/*10min*/};
复制代码

producer在推送消息时会根据推送消息耗时更新broker的不可用周期时间,消息耗时与broker不可用周期对应关系如下。当发送消息耗时在0~100ms时,不可用周期为0s。发送消息耗时在100ms~550ms时,broker不可用周期为30s。以此类推,当发送消息耗时在3s~15s时,broker不可用周期为10min。

更新broker延迟故障时间源码如下

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    // 打开发送延迟故障机制
    if (this.sendLatencyFaultEnable) {
        // 如果是个隔离异常则标记执行持续时长为30秒,并根据执行时长计算broker不可用时长
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        // 记录broker不可用时长信息
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}
​
private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }
​
    return 0;
}
复制代码

延迟故障机制在消息推送过程的影响

DefaultMQProducerImpl在推送

同步消息

并且未指定MessageQueue时,会调用

MQFaultStrategy#selectOneMessageQueue

选择MessageQueue。消息推送完成后无论是正常返回推送结果还是抛出异常会调用

MQFaultStrategy#updateFaultItem

更新推送延迟时间。具体调用过程可以参考下面时序图。



版权声明:本文为2301_76607156原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。