SpringCloud服务发现组件Eureka源码解析(启动流程)

  • Post author:
  • Post category:其他


本文从源码视角简述Eureka Server的启动流程。


一.入口


eureka-server包结构如下:

在这里插入图片描述

可以看到,一共包含5个目录,其中/eureka存放配置信息,/static/eureka与/templates/eureka存放静态文件,/org目录存放Java相关代码,/META-INF存放应用的主要信息。

打开/META-INF中的spring.factories,内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

@EnableAutoConfiguration注解在SpringBoot应用启动的时候会自动加载。


二.流程



1.加载EurekaServerAutoConfiguration


由上可知,在SpringBoot应用启动时会加载EurekaServerAutoConfiguration这个Bean。

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
		InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
//省略以上代码
	@Bean
	public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
			PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
		return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
				registry, peerEurekaNodes, this.applicationInfoManager);
	}
//省略以下代码
}

根据上述代码,发现主要内容如下:

<1>通过@Configuration注解,声明一个配置类。

<2>通过@Import注解,导入了EurekaServerInitializerConfiguration。

<3>通过@ConditionalOnBean注解,当EurekaServerMarkerConfiguration这个Bean存在时才注入。这个非常重要,

主要通过它来控制是否开启Eureka Server。


SpringBoot在集成Eureka Server时,需要在主类上加入@EableEurekaServer注解。@EnableEurekaServer 内容如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}

可以看到,在@EableEurekaServer注解中导入了EurekaServerMarkerConfiguration。也就是说,加入@EableEurekaServer注解,就意味着开启了Eureka Server。

<4>通过@EnableConfigurationProperties,@PropertySource注解,导入配置信息。

<5>定义了EurekaServerContext这个Bean。


2.加载EurekaServerInitializerConfiguration


在上一步的加载中,导入了EurekaServerInitializerConfiguration。该Bean实现了SmartLifecycle接口,当Spring容器加载完所有Bean并完成初始化之后,会回调实现该接口的类中的start方法。

<1>EurekaServerInitializerConfiguration的start方法

@Configuration
public class EurekaServerInitializerConfiguration
		implements ServletContextAware, SmartLifecycle, Ordered {
	@Override
	public void start() {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
                    //1.初始化Eureka Server,并启动
				    eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
					log.info("Started Eureka Server");
                    //2.发布Eureka Server的注册事件
					publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
					//3.设置Eureka Server状态为已启动
					EurekaServerInitializerConfiguration.this.running = true;
					//4.发送Eureka Server的启动事件 
					publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
				}
				catch (Exception ex) {
					// Help!
					log.error("Could not initialize Eureka servlet context", ex);
				}
			}
		}).start();
	}

<2>EurekaServerBootstrap的contextInitialized方法

在上一步,通过contextInitialized方法,初始化Eureka Server,并启动,继续跟踪:

	public void contextInitialized(ServletContext context) {
		try {
		    //1.初始化Eureka环境变量
			initEurekaEnvironment();
			//2.初始化Eureka上下文
			initEurekaServerContext();

			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
		}
		catch (Throwable e) {
			log.error("Cannot bootstrap eureka server :", e);
			throw new RuntimeException("Cannot bootstrap eureka server :", e);
		}
	}

里面主要方法为initEurekaEnvironment,initEurekaServerContext,分别跟踪:

<3>EurekaServerBootstrap的initEurekaEnvironment方法

	protected void initEurekaEnvironment() throws Exception {
		log.info("Setting the eureka configuration..");

		String dataCenter = ConfigurationManager.getConfigInstance()
				.getString(EUREKA_DATACENTER);
		if (dataCenter == null) {
			log.info(
					"Eureka data center value eureka.datacenter is not set, defaulting to default");
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
		}
		else {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
		}
		String environment = ConfigurationManager.getConfigInstance()
				.getString(EUREKA_ENVIRONMENT);
		if (environment == null) {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
			log.info(
					"Eureka environment value eureka.environment is not set, defaulting to test");
		}
		else {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
		}
	}

没什么好说的,设置一些变量值,如果没有,就取默认值。

<4>EurekaServerBootstrap的initEurekaServerContext方法

	protected void initEurekaServerContext() throws Exception {
		//省略以上代码
        //1.初始化Eureka Server上下文
		EurekaServerContextHolder.initialize(this.serverContext);

		log.info("Initialized server context");

		//2.从相邻的Eureka节点复制注册表 
		int registryCount = this.registry.syncUp();
		//3.开启定时任务,用于清理60秒没有心跳的客户端
		this.registry.openForTraffic(this.applicationInfoManager, registryCount);

		//4.注册所有监控统计信息
		EurekaMonitors.registerAllStats();
	}

通过该方法,实现了Eureka Server上下文初始化,注册表复制以及定时任务的开启。


3.加载DefaultEurekaServerContext


在第一步EurekaServerAutoConfiguration中,定义了一个Bean:EurekaServerContext。

这个Bean通过DefaultEurekaServerContext注入。在DefaultEurekaServerContext中,存在initialize方法,该方法被@PostConstruct注解,因此该方法会在servlet被加载时运行。

<1>DefaultEurekaServerContext的initialize方法

public class DefaultEurekaServerContext implements EurekaServerContext {
    @PostConstruct
    @Override
    public void initialize() throws Exception {
        logger.info("Initializing ...");
        //1.启动线程,读取其他集群节点的信息
        peerEurekaNodes.start();
        //2.初始化其他集群节点
        registry.init(peerEurekaNodes);
        logger.info("Initialized");
    }
    //省略其他代码
}

<2>start方法

继续跟踪:如何启动线程并读取其他集群节点的信息

    public void start() {
       //省略以上代码
        try {
            //1.第一次进入时,直接更新集群节点信息
            updatePeerEurekaNodes(resolvePeerUrls());
            //2.创建一个线程,用来更新集群节点信息
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
            //3.定时执行上面创建的线程,定时更新集群节点信息
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  " + node.getServiceUrl());
        }
    }

在上面的分析中,可以看到:第一次进入时,直接更新集群节点信息,后续再创建定时任务去定时执行updatePeerEurekaNodes方法。为什么这里要先执行一遍呢?

因为更新集群节点信息时,需要先初始化Eureka集群其他节点的列表。所以需要先调用updatePeerEurekaNodes方法做一次初始化。而定时任务执行的周期默认是10分钟,如果等定时调度来执行初始化的话,太慢。因此需要先执行一遍。

<3>updatePeerEurekaNodes方法

最后,看下updatePeerEurekaNodes方法是如何更新集群节点信息的。

    protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
        if (newPeerUrls.isEmpty()) {
            logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
            return;
        }

        Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
        toShutdown.removeAll(newPeerUrls);
        Set<String> toAdd = new HashSet<>(newPeerUrls);
        toAdd.removeAll(peerEurekaNodeUrls);
        
        //1.校验新的URL集合与旧有的URL集合是否一致。如果一致,则不需要更新,直接返回
        if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
            return;
        }
        
        //2.移除旧集合中不可用的节点信息
        List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
        if (!toShutdown.isEmpty()) {
            logger.info("Removing no longer available peer nodes {}", toShutdown);
            int i = 0;
            while (i < newNodeList.size()) {
                PeerEurekaNode eurekaNode = newNodeList.get(i);
                if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                    newNodeList.remove(i);
                    eurekaNode.shutDown();
                } else {
                    i++;
                }
            }
        }

        //3.添加新增加的节点信息
        if (!toAdd.isEmpty()) {
            logger.info("Adding new peer nodes {}", toAdd);
            for (String peerUrl : toAdd) {
                newNodeList.add(createPeerEurekaNode(peerUrl));
            }
        }

        //4.更新peerEurekaNodes与peerEurekaNodeUrls
        this.peerEurekaNodes = newNodeList;
        this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
    }

该方法根据传入的新集群URL集合完成节点的更新,首先校验新旧URL集合是否一致,如果不一致,则移除旧集合中不可用的节点信息,并添加新增加的节点信息,最后更新。

自此,Eureka Server算是启动完毕了。


三.总结


最后,通过一个图和一段话来总结整个过程,如下:

在这里插入图片描述

EurekaServerAutoConfiguration是整个Eureka Server的启动入口,启动内容包括:

1.导入EurekaServerInitializerConfiguration

<1>设置Eureka Server环境变量,如果没有设置,则取默认值

<2>初始化Eureka Server上下文,主要完成注册表的复制,开启定时任务来定期清理客户端

2.定义DefaultEurekaServerContext

初始化加载后,通过initialize方法创建定时任务,定时更新集群节点信息



版权声明:本文为qq_15898739原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。