服务发现
当spring在构建bean时,populateBean对属性填充时,会触发beanPostProcessor流程,
然后会进行ReferenceAnnotationBeanPostProcessord的流程,对服务进行发现,注入
public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor implements
ApplicationContextAware, ApplicationListener {
@Override
protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
InjectionMetadata.InjectedElement injectedElement) throws Exception {
/**
* The name of bean that annotated Dubbo's {@link Service @Service} in local Spring {@link ApplicationContext}
*/
String referencedBeanName = buildReferencedBeanName(attributes, injectedType);
/**
* The name of bean that is declared by {@link Reference @Reference} annotation injection
*/
String referenceBeanName = getReferenceBeanName(attributes, injectedType);
referencedBeanNameIdx.computeIfAbsent(referencedBeanName, k -> new TreeSet<String>()).add(referenceBeanName);
// 如果缓存中有,直接拿出来用,如果没有就构建
ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);
// 检查是否本地有服务
boolean localServiceBean = isLocalServiceBean(referencedBeanName, referenceBean, attributes);
// 如果是本地服务,设置为本地服务的引用
// referenceBean.setInjvm(Boolean.TRUE);
prepareReferenceBean(referencedBeanName, referenceBean, localServiceBean);
registerReferenceBean(referencedBeanName, referenceBean, localServiceBean, referenceBeanName);
cacheInjectedReferenceBean(referenceBean, injectedElement);
//referenceBean.get() 会去正在的去注册中心 发现服务
return getBeanFactory().applyBeanPostProcessorsAfterInitialization(referenceBean.get(), referenceBeanName);
}
private void registerReferenceBean(String referencedBeanName, ReferenceBean referenceBean,
boolean localServiceBean, String beanName) {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
if (localServiceBean) { // If @Service bean is local one
/**
* Get the @Service's BeanDefinition from {@link BeanFactory}
* Refer to {@link ServiceClassPostProcessor#buildServiceBeanDefinition}
*/
AbstractBeanDefinition beanDefinition = (AbstractBeanDefinition) beanFactory.getBeanDefinition(referencedBeanName);
RuntimeBeanReference runtimeBeanReference = (RuntimeBeanReference) beanDefinition.getPropertyValues().get("ref");
// The name of bean annotated @Service
String serviceBeanName = runtimeBeanReference.getBeanName();
// register Alias rather than a new bean name, in order to reduce duplicated beans
// 添加一个别名 为了减少重复beans
beanFactory.registerAlias(serviceBeanName, beanName);
} else { // Remote @Service Bean
if (!beanFactory.containsBean(beanName)) {
// 注册一个referenceBean,它是一个FactoryBean
beanFactory.registerSingleton(beanName, referenceBean);
}
}
}
}
ReferenceBean 服务应用bean对象
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean,
ApplicationContextAware, InitializingBean, DisposableBean {
}
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
//
init();
}
return ref;
}
public synchronized void init() {
if (initialized) {
return;
}
// 初始化 bootstrap,如果没有就构建一个并初始化
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
// compatible with api call.
if (null != this.getRegistries()) {
bootstrap.registries(this.getRegistries());
}
bootstrap.initialize();
}
checkAndUpdateSubConfigs();
checkStubAndLocal(interfaceClass);
ConfigValidationUtils.checkMock(interfaceClass, this);
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, CONSUMER_SIDE);
ReferenceConfigBase.appendRuntimeParameters(map);
if (!ProtocolUtils.isGeneric(generic)) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
}
}
map.put(INTERFACE_KEY, interfaceName);
AbstractConfig.appendParameters(map, getMetrics());
AbstractConfig.appendParameters(map, getApplication());
AbstractConfig.appendParameters(map, getModule());
// remove 'default.' prefix for configs from ConsumerConfig
// appendParameters(map, consumer, Constants.DEFAULT_KEY);
AbstractConfig.appendParameters(map, consumer);
AbstractConfig.appendParameters(map, this);
MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
if (metadataReportConfig != null && metadataReportConfig.isValid()) {
map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
}
Map<String, AsyncMethodInfo> attributes = null;
if (CollectionUtils.isNotEmpty(getMethods())) {
attributes = new HashMap<>();
for (MethodConfig methodConfig : getMethods()) {
AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
String retryKey = methodConfig.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(methodConfig.getName() + ".retries", "0");
}
}
AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
if (asyncMethodInfo != null) {
// consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
attributes.put(methodConfig.getName(), asyncMethodInfo);
}
}
}
String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
if (StringUtils.isEmpty(hostToRegistry)) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException(
"Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
map.put(REGISTER_IP_KEY, hostToRegistry);
serviceMetadata.getAttachments().putAll(map);
// 创建代理
ref = createProxy(map);
serviceMetadata.setTarget(ref);
serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
consumerModel.setProxyObject(ref);
consumerModel.init(attributes);
initialized = true;
checkInvokerAvailable();
// dispatch a ReferenceConfigInitializedEvent since 2.7.4
dispatch(new ReferenceConfigInitializedEvent(this, invoker));
}
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) {
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
urls.clear();
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (UrlUtils.isRegistry(url)) {
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// if protocols not injvm checkRegistry
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
checkRegistry();
//获取注册中心地址
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException(
"No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() +
" use dubbo version " + Version.getVersion() +
", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}
// 开始 构建
if (urls.size() == 1) {
// 去进行服务发现
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
Invoker<?> referInvoker = REF_PROTOCOL.refer(interfaceClass, url);
if (shouldCheck()) {
if (referInvoker.isAvailable()) {
invokers.add(referInvoker);
} else {
referInvoker.destroy();
}
} else {
invokers.add(referInvoker);
}
if (UrlUtils.isRegistry(url)) {
registryURL = url; // use last registry url
}
}
if (shouldCheck() && invokers.size() == 0) {
throw new IllegalStateException("Failed to check the status of the service "
+ interfaceName
+ ". No provider available for the service "
+ (group == null ? "" : group + "/")
+ interfaceName +
(version == null ? "" : ":" + version)
+ " from the multi registry cluster"
+ " use dubbo version " + Version.getVersion());
}
if (registryURL != null) { // registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
} else { // not a registry url, must be direct invoke.
String cluster = CollectionUtils.isNotEmpty(invokers)
?
(invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) :
Cluster.DEFAULT)
: Cluster.DEFAULT;
invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
}
}
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
MetadataUtils.publishServiceDefinition(consumerURL);
// create service proxy
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
}
服务发现的protocol 责任链
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8wgvFiwK-1643557300416)(img_8.png)]
public class RegistryProtocol implements Protocol {
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = getRegistryUrl(url);
Registry registry = getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url, qs);
}
}
// 获取 可能是mock的
Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
return doRefer(cluster, registry, type, url, qs);
}
protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
URL consumerUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
// 拦截migrationInvoker
return interceptInvoker(migrationInvoker, url, consumerUrl);
}
protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl) {
// 找到注册监听器
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
for (RegistryProtocolListener listener : listeners) {
// 使用监听器 去发现服务
listener.onRefer(this, invoker, consumerUrl);
}
return invoker;
}
protected List<RegistryProtocolListener> findRegistryProtocolListeners(URL url) {
return ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class)
.getActivateExtension(url, REGISTRY_PROTOCOL_LISTENER_KEY);
}
}
使用了InterfaceCompatibleRegistryProtocol 的getMigrationInvoker方法,创建转移代理实现
public class InterfaceCompatibleRegistryProtocol extends RegistryProtocol {
@Override
protected <T> ClusterInvoker<T> (RegistryProtocol registryProtocol, Cluster cluster, Registry registry,
Class<T> type, URL url, URL consumerUrl) {
// ClusterInvoker<T> invoker = getInvoker(cluster, registry, type, url);
return new MigrationInvoker<T>(registryProtocol, cluster, registry, type, url, consumerUrl);
}
}
@Activate
public class MigrationRuleListener implements RegistryProtocolListener, ConfigurationListener {
@Override
public synchronized void onRefer(RegistryProtocol registryProtocol, ClusterInvoker<?> invoker, URL url) {
MigrationInvoker<?> migrationInvoker = (MigrationInvoker<?>) invoker;
MigrationRuleHandler<?> migrationListener = new MigrationRuleHandler<>(migrationInvoker);
listeners.add(migrationListener);
migrationListener.doMigrate(rawRule);
}
}
MigrationRuleHandler处理器
public class MigrationRuleHandler<T> {
private static final Logger logger = LoggerFactory.getLogger(MigrationRuleHandler.class);
private MigrationInvoker<T> migrationInvoker;
public MigrationRuleHandler(MigrationInvoker<T> invoker) {
this.migrationInvoker = invoker;
}
private MigrationStep currentStep;
public void doMigrate(String rawRule) {
MigrationRule rule = MigrationRule.parse(rawRule);
if (null != currentStep && currentStep.equals(rule.getStep())) {
if (logger.isInfoEnabled()) {
logger.info("Migration step is not change. rule.getStep is " + currentStep.name());
}
return;
} else {
currentStep = rule.getStep();
}
migrationInvoker.setMigrationRule(rule);
if (migrationInvoker.isMigrationMultiRegistry()) {
if (migrationInvoker.isServiceInvoker()) {
migrationInvoker.refreshServiceDiscoveryInvoker();
} else {
migrationInvoker.refreshInterfaceInvoker();
}
} else {
switch (rule.getStep()) {
case APPLICATION_FIRST:
// 服务发现执行
migrationInvoker.migrateToServiceDiscoveryInvoker(false);
break;
case FORCE_APPLICATION:
migrationInvoker.migrateToServiceDiscoveryInvoker(true);
break;
case FORCE_INTERFACE:
default:
migrationInvoker.fallbackToInterfaceInvoker();
}
}
}
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
public class DubboProtocol extends AbstractProtocol {
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
// 创建远程调用的 invoker对象
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
//
private ExchangeClient[] getClients(URL url) {
// whether to share connection
int connections = url.getParameter(CONNECTIONS_KEY, 0);
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
/*
* The xml configuration should have a higher priority than properties.
*/
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
return getSharedClient(url, connections).toArray(new ExchangeClient[0]);
} else {
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
clients[i] = initClient(url);
}
return clients;
}
}
// 获取连接
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
String key = url.getAddress();
Object clients = referenceClientMap.get(key);
if (clients instanceof List) {
List<ReferenceCountExchangeClient> typedClients = (List<ReferenceCountExchangeClient>) clients;
if (checkClientCanUse(typedClients)) {
batchClientRefIncr(typedClients);
return typedClients;
}
}
List<ReferenceCountExchangeClient> typedClients = null;
synchronized (referenceClientMap) {
for (; ; ) {
clients = referenceClientMap.get(key);
if (clients instanceof List) {
typedClients = (List<ReferenceCountExchangeClient>) clients;
if (checkClientCanUse(typedClients)) {
batchClientRefIncr(typedClients);
return typedClients;
} else {
referenceClientMap.put(key, PENDING_OBJECT);
break;
}
} else if (clients == PENDING_OBJECT) {
try {
referenceClientMap.wait();
} catch (InterruptedException ignored) {
}
} else {
referenceClientMap.put(key, PENDING_OBJECT);
break;
}
}
}
try {
// connectNum must be greater than or equal to 1
connectNum = Math.max(connectNum, 1);
// If the clients is empty, then the first initialization is
if (CollectionUtils.isEmpty(typedClients)) {
typedClients = buildReferenceCountExchangeClientList(url, connectNum);
} else {
for (int i = 0; i < typedClients.size(); i++) {
ReferenceCountExchangeClient referenceCountExchangeClient = typedClients.get(i);
// If there is a client in the list that is no longer available, create a new one to replace him.
if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
typedClients.set(i, buildReferenceCountExchangeClient(url));
continue;
}
referenceCountExchangeClient.incrementAndGetCount();
}
}
} finally {
synchronized (referenceClientMap) {
if (typedClients == null) {
referenceClientMap.remove(key);
} else {
referenceClientMap.put(key, typedClients);
}
referenceClientMap.notifyAll();
}
}
return typedClients;
}
}
发现服务以后,建立一个和服务provider的长连接,这个连接是可以和同provider的servic共享的
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Uw27O8H8-1643557300418)(img_9.png)]
最后使用JavassistProxyFactory 返回一个代理类,InvokerInvocationHandler, 就是去远程调用服务的流程逻辑
// create service proxy
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
dubbo的执行处理器,后面 进行服务调用的时候就是来执行的
/**
* InvokerHandler
*/
public class InvokerInvocationHandler implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);
private final Invoker<?> invoker;
private ConsumerModel consumerModel;
private URL url;
private String protocolServiceKey;
public static Field stackTraceField;
static {
try {
stackTraceField = Throwable.class.getDeclaredField("stackTrace");
ReflectUtils.makeAccessible(stackTraceField);
} catch (NoSuchFieldException e) {
// ignore
}
}
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
this.url = invoker.getUrl();
String serviceKey = this.url.getServiceKey();
this.protocolServiceKey = this.url.getProtocolServiceKey();
if (serviceKey != null) {
this.consumerModel = ApplicationModel.getConsumerModel(serviceKey);
}
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args);
String serviceKey = invoker.getUrl().getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);
// invoker.getUrl() returns consumer url.
RpcContext.setRpcContext(invoker.getUrl());
if (consumerModel != null) {
rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
}
return invoker.invoke(rpcInvocation).recreate();
}
}
版权声明:本文为u012768625原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。