RocketMQ NameServer保障数据一致性分析

  • Post author:
  • Post category:其他


路由注册角度

对于

ZooKeeper

这样的强一致性组件,使用主从分离的架构,数据只写到主节点,主从之间的数据同步通过内部机制来进行数据复制。

对于

RocketMQ

来说,

NameServer

节点之间是互相不进行通信的,这样也就无法进行数据复制。

RocketMQ

采用的机制是:在

Broker

节点启动的时候,轮询所有的

NameServer

节点,并与每个

NameServer

节点建立长连接,发送注册请求。

相应的,

NameServer

节点内部也会维护一个

Broker

列表,用来动态存储

Broker

的信息,做服务发现。

与此同时,

Broker

使用心跳机制来向所有

NameServer

节点证明自己是存活的,即定期发送心跳包;收到心跳包之后,

NameServer

节点会更新这个

Broker

的最新存活时间。


注意:


NameServer

节点在处理心跳包时,存在多个请求同时处理同一张表的情况,为了保证并发安全性,

RocketMQ

引入了读写锁(

ReadWriteLock

),保证了多个

Producer

并发读取路由信息不受影响,但同一时刻只能处理一个

Broker

发来的心跳包,这也符合读多写少的经典场景。

路由剔除


正常情况下:

如果

Broker

下线,则会与

NameServer

断开长连接,底层基于

Netty

的通道关闭监听器会监听到连接断开事件,然后将这个

Broker

信息剔除。


异常情况下:


NameServer

有一个周期为10s的定时任务,定期扫描

Broker

表,如果超过120s没有收到某个

Broker

的心跳包,则会判定其失效并移除。

对于日常运维的需求,

RocketMQ

提供了优雅剔除路由信息的方式,即可以先禁止

Broker

的写权限,这样发送到这个

Broker

的请求都会收到一个

NO_PERMISSION

的响应,客户端自动重试其他的

Broker

路由发现


生产者视角:

一般是在发送第一条消息时,才会根据

Topic



NameServer

获取路由信息


消费者视角:

订阅的

Topic

一般是固定的,所以在启动时就会拉取

针对路由信息可能变化的场景,

RocketMQ

提供了定时拉取

Topic

最新路由信息的机制,以应对

Broker

集群发生变化的场景。


DefaultMQProducer



DefaultMQConsumer

有一个

pollNameServerInterval

的配置项,用于指定从

NameServer

获取路由信息的周期,其底层依赖

MQClientInstance

类,

MQClientInstance

类中的

updateTopicRouteInfoFromNameServer

方法,可以根据指定的时间间隔,周期性地从

NameServer

里拉取路由信息。在拉取时,会将当前启动的

Producer



Consumer

需要用到的

Topic

列表放到一个集合里,逐个进行更新,源码如下:


更新单个Topic路由信息:

public boolean updateTopicRouteInfoFromNameServer(final String topic) {
    return updateTopicRouteInfoFromNameServer(topic, false, null);
}
复制代码

其中调用了:

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
    DefaultMQProducer defaultMQProducer) {
    try {
        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                TopicRouteData topicRouteData;
                if (isDefault && defaultMQProducer != null) {
                	// 使用默认TopicKey获取TopicRouteData
                    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3);
                    if (topicRouteData != null) {
                        for (QueueData data : topicRouteData.getQueueDatas()) {
                            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                            data.setReadQueueNums(queueNums);
                            data.setWriteQueueNums(queueNums);
                        }
                    }
                } else {
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                }
                if (topicRouteData != null) {
                    TopicRouteData old = this.topicRouteTable.get(topic);
                    boolean changed = topicRouteDataIsChange(old, topicRouteData);
                    if (!changed) {
                        changed = this.isNeedUpdateTopicRouteInfo(topic);
                    } else {
                        log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                    }

                    if (changed) {
                    	// 克隆出一个实例cloneTopicRouteData : topicRouteData会被设置到下面的publishInfo/subscribeInfo 
                        TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
						
						// 更新Broker地址相关信息,当某个Broker心跳超时后,会被从brokerAddrTable中移除
                        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                            this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                        }

                        // 更新发布者信息
                        {
                            TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                            publishInfo.setHaveTopicRouterInfo(true);
                            Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                            while (it.hasNext()) {
                                Entry<String, MQProducerInner> entry = it.next();
                                MQProducerInner impl = entry.getValue();
                                if (impl != null) {
                                    impl.updateTopicPublishInfo(topic, publishInfo);
                                }
                            }
                        }

                        // 更新订阅者信息
                        {
                            Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                            Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                            while (it.hasNext()) {
                                Entry<String, MQConsumerInner> entry = it.next();
                                MQConsumerInner impl = entry.getValue();
                                if (impl != null) {
                                    impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                }
                            }
                        }
                        log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                        this.topicRouteTable.put(topic, cloneTopicRouteData);
                        return true;
                    }
                } 
                
                /** 打印日志部分省略 **/

    return false;
}
复制代码



Broker

宕机时,还可以通过客户端的重试机制来解决,避免因为定时更新路由信息不及时导致的服务宕机~~


作者:小王曾是少年

链接:https://juejin.cn/post/7089251087995600904

来源:稀土掘金

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。