dubbo-客户端注册监听

  • Post author:
  • Post category:其他


背景

dubbo 在使用 zookeeper(zk) 作为注册中心的时候,是通过订阅zk的watch监听机制来达到更新最新服务提供者列表的目的,其中大致流程如下。

今天我们主要分析红色订阅,以及处理的逻辑。

订阅

在消费者启动阶段根据

protocol

会一层层执行到

RegistryProtocol.refer()

,

org.apache.dubbo.registry.integration.RegistryProtocol#doRefer()
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        //注释1
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));//
        return invoker;
    }
复制代码

注释1处,调用

directory

对象的

subscribe

进行订阅监听,延着这个方法点进去。

org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe()
public void doSubscribe(final URL url, final NotifyListener listener) {
    List<URL> urls = new ArrayList<>();
    for (String path : toCategoriesPath(url)) {
        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
        if (listeners == null) {
            zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
            listeners = zkListeners.get(url);
        }
        ChildListener zkListener = listeners.get(listener);
        if (zkListener == null) {
        //注释1
            listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
            zkListener = listeners.get(listener);
        }
        zkClient.create(path, false);
        List<String> children = zkClient.addChildListener(path, zkListener);
        if (children != null) {
            urls.addAll(toUrlsWithEmpty(url, path, children));
        }
    }
    notify(url, listener, urls);//注释2
}
复制代码

注释1处是jdk1.8的lambda写法,这里就是订阅zk节点变动的处理回调方法

notify()

,然后通过

zkClient(CuratorFramework客户端)

向zk服务器注册,注释2处默认首次订阅的时候将添加的节点推送给回调方法,不过如果此时生产者没有启动的话,消费者的调用列表是不会更新的,下面看看回调方法做了哪些操作。

回调

回调方法会从zk客户端的

EventThread#processEvent()

最后一直会调用到

RegistryDirectory#notify()

方法

//org....RegistryDirectory#notify()
//..省略部分
refreshOverrideAndInvoker(providerURLs);//注释1
复制代码

这边就是刷新调用列表的入口了。

//org.xx.RegistryDirectory#refreshInvoker()
private void refreshInvoker(List<URL> invokerUrls) {
	//...
 Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);//注释1

    List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));

    routerChain.setInvokers(newInvokers);
    this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;//注释2
    this.urlInvokerMap = newUrlInvokerMap;

    destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);//注释3
}
复制代码

注释1处用zk返回的节点

url(dubbo://ip:20880/com.poizon.study.api.service...)

生成

invoker

服务调用对象,注释2处将最新的调用列表赋值给了

invokers

我们这里先记下,到时候回来看这个变量,注释3会清除过期的调用列表,比如某个提供者机器下线了(下线感兴趣的可以看看源码,并不是马上下线哦,有策略的),这边需要在调用者列表中将次移除。

invoker 生成的方式这边不多介绍了,大致流程为调用protocol.refer() 层层包装最后包装为

InvokerDelegate(ListenerInvokerWrapper(CallbackRegistrationInvoker(Filter(AsyncToSyncInvoker(DubboInvoker())))))

invoker使用

在消费者发起请求的时候,调用链路为

MockClusterInvoker

->

FailoverClusterInvoker#doInvoke()

->

FailoverClusterInvoker#list()

,这个list中文含义就说列出可用的生产者列表,我们进到方法里面看看。

//org.apache.dubbo.rpc.cluster.directory.AbstractDirectory#list
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }
    return doList(invocation);//注释1
}
//org.apache.dubbo.registry.integration.RegistryDirectory#doList
public List<Invoker<T>> doList(Invocation invocation) {
    if (forbidden) {
        throw new RpcException("No provider available");//注释2
    }
    if (multiGroup) {
        return this.invokers == null ? Collections.emptyList() : this.invokers;//注释3
    }
}
复制代码

注释2处的错误异常大家应该都见过,当没有可以调用的生产者就会抛次错误,注释3处这边返回了

invokers

变量,

这个变量就是前面我们订阅处理之后生成新的invoekrs赋值的

总结

笔者是按照zk作为注册中心走的流程,大家也可以换成nacos走走看。


链接:https://juejin.cn/post/6925674293980299277