背景
dubbo 在使用 zookeeper(zk) 作为注册中心的时候,是通过订阅zk的watch监听机制来达到更新最新服务提供者列表的目的,其中大致流程如下。
今天我们主要分析红色订阅,以及处理的逻辑。
订阅
在消费者启动阶段根据
protocol
会一层层执行到
RegistryProtocol.refer()
,
org.apache.dubbo.registry.integration.RegistryProtocol#doRefer()
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
//注释1
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));//
return invoker;
}
复制代码
注释1处,调用
directory
对象的
subscribe
进行订阅监听,延着这个方法点进去。
org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe()
public void doSubscribe(final URL url, final NotifyListener listener) {
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
//注释1
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);//注释2
}
复制代码
注释1处是jdk1.8的lambda写法,这里就是订阅zk节点变动的处理回调方法
notify()
,然后通过
zkClient(CuratorFramework客户端)
向zk服务器注册,注释2处默认首次订阅的时候将添加的节点推送给回调方法,不过如果此时生产者没有启动的话,消费者的调用列表是不会更新的,下面看看回调方法做了哪些操作。
回调
回调方法会从zk客户端的
EventThread#processEvent()
最后一直会调用到
RegistryDirectory#notify()
方法
//org....RegistryDirectory#notify()
//..省略部分
refreshOverrideAndInvoker(providerURLs);//注释1
复制代码
这边就是刷新调用列表的入口了。
//org.xx.RegistryDirectory#refreshInvoker()
private void refreshInvoker(List<URL> invokerUrls) {
//...
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);//注释1
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;//注释2
this.urlInvokerMap = newUrlInvokerMap;
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);//注释3
}
复制代码
注释1处用zk返回的节点
url(dubbo://ip:20880/com.poizon.study.api.service...)
生成
invoker
服务调用对象,注释2处将最新的调用列表赋值给了
invokers
我们这里先记下,到时候回来看这个变量,注释3会清除过期的调用列表,比如某个提供者机器下线了(下线感兴趣的可以看看源码,并不是马上下线哦,有策略的),这边需要在调用者列表中将次移除。
invoker 生成的方式这边不多介绍了,大致流程为调用protocol.refer() 层层包装最后包装为
InvokerDelegate(ListenerInvokerWrapper(CallbackRegistrationInvoker(Filter(AsyncToSyncInvoker(DubboInvoker())))))
。
invoker使用
在消费者发起请求的时候,调用链路为
MockClusterInvoker
->
FailoverClusterInvoker#doInvoke()
->
FailoverClusterInvoker#list()
,这个list中文含义就说列出可用的生产者列表,我们进到方法里面看看。
//org.apache.dubbo.rpc.cluster.directory.AbstractDirectory#list
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
return doList(invocation);//注释1
}
//org.apache.dubbo.registry.integration.RegistryDirectory#doList
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
throw new RpcException("No provider available");//注释2
}
if (multiGroup) {
return this.invokers == null ? Collections.emptyList() : this.invokers;//注释3
}
}
复制代码
注释2处的错误异常大家应该都见过,当没有可以调用的生产者就会抛次错误,注释3处这边返回了
invokers
变量,
这个变量就是前面我们订阅处理之后生成新的invoekrs赋值的
。
总结
笔者是按照zk作为注册中心走的流程,大家也可以换成nacos走走看。
链接:https://juejin.cn/post/6925674293980299277