Ribbon负载均衡策略DynamicServerListLoadBalancer的ServerList解读

  • Post author:
  • Post category:其他



一 DynamicServerListLoadBalancer在类图中的位置


二 DynamicServerListLoadBalancer源码解读

1 关键代码请见注释

2 源码位置:ribbon-master\ribbon-loadbalancer\src\main\java\com\netflix\loadbalancer\DynamicServerListLoadBalancer.java

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);

    boolean isSecure = false;
    boolean useTunnel = false;

    protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);
    //服务列表操作对象
    //这里泛型T是一个Server的子类,代表了一个具体的服务实例扩展类
    volatile ServerList<T> serverListImpl;

    volatile ServerListFilter<T> filter;

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };

    protected volatile ServerListUpdater serverListUpdater;

    public DynamicServerListLoadBalancer() {
        super();
    }

    @Deprecated
    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
            ServerList<T> serverList, ServerListFilter<T> filter) {
        this(
                clientConfig,
                rule,
                ping,
                serverList,
                filter,
                new PollingServerListUpdater()
        );
    }

    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }

    public DynamicServerListLoadBalancer(IClientConfig clientConfig) {
        initWithNiwsConfig(clientConfig);
    }
    
    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        try {
            super.initWithNiwsConfig(clientConfig);
            String niwsServerListClassName = clientConfig.getPropertyAsString(
                    CommonClientConfigKey.NIWSServerListClassName,
                    DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS);

            ServerList<T> niwsServerListImpl = (ServerList<T>) ClientFactory
                    .instantiateInstanceWithClientConfig(niwsServerListClassName, clientConfig);
            this.serverListImpl = niwsServerListImpl;

            if (niwsServerListImpl instanceof AbstractServerList) {
                AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl)
                        .getFilterImpl(clientConfig);
                niwsFilter.setLoadBalancerStats(getLoadBalancerStats());
                this.filter = niwsFilter;
            }

            String serverListUpdaterClassName = clientConfig.getPropertyAsString(
                    CommonClientConfigKey.ServerListUpdaterClassName,
                    DefaultClientConfigImpl.DEFAULT_SERVER_LIST_UPDATER_CLASS
            );

            this.serverListUpdater = (ServerListUpdater) ClientFactory
                    .instantiateInstanceWithClientConfig(serverListUpdaterClassName, clientConfig);

            restOfInit(clientConfig);
        } catch (Exception e) {
            throw new RuntimeException(
                    "Exception while initializing NIWSDiscoveryLoadBalancer:"
                            + clientConfig.getClientName()
                            + ", niwsClientConfig:" + clientConfig, e);
        }
    }

    void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();

        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }
    
    
    @Override
    public void setServersList(List lsrv) {
        super.setServersList(lsrv);
        List<T> serverList = (List<T>) lsrv;
        Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
        for (Server server : serverList) {
            // make sure ServerStats is created to avoid creating them on hot
            // path
            getLoadBalancerStats().getSingleServerStat(server);
            String zone = server.getZone();
            if (zone != null) {
                zone = zone.toLowerCase();
                List<Server> servers = serversInZones.get(zone);
                if (servers == null) {
                    servers = new ArrayList<Server>();
                    serversInZones.put(zone, servers);
                }
                servers.add(server);
            }
        }
        setServerListForZones(serversInZones);
    }

    protected void setServerListForZones(
            Map<String, List<Server>> zoneServersMap) {
        LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
        getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
    }

    public ServerList<T> getServerListImpl() {
        return serverListImpl;
    }

    public void setServerListImpl(ServerList<T> niwsServerList) {
        this.serverListImpl = niwsServerList;
    }

    public ServerListFilter<T> getFilter() {
        return filter;
    }

    public void setFilter(ServerListFilter<T> filter) {
        this.filter = filter;
    }

    public ServerListUpdater getServerListUpdater() {
        return serverListUpdater;
    }

    public void setServerListUpdater(ServerListUpdater serverListUpdater) {
        this.serverListUpdater = serverListUpdater;
    }

    @Override
    public void forceQuickPing() {
        // no-op
    }

    public void enableAndInitLearnNewServersFeature() {
        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
        serverListUpdater.start(updateAction);
    }

    private String getIdentifier() {
        return this.getClientConfig().getClientName();
    }

    public void stopServerListRefreshing() {
        if (serverListUpdater != null) {
            serverListUpdater.stop();
        }
    }

    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

    /**
     * Update the AllServer list in the LoadBalancer if necessary and enabled
     *
     * @param ls
     */
    protected void updateAllServerList(List<T> ls) {
        // other threads might be doing this - in which case, we pass
        if (serverListUpdateInProgress.compareAndSet(false, true)) {
            try {
                for (T s : ls) {
                    s.setAlive(true); // set so that clients can start using these
                                      // servers right away instead
                                      // of having to wait out the ping cycle.
                }
                setServersList(ls);
                super.forceQuickPing();
            } finally {
                serverListUpdateInProgress.set(false);
            }
        }
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder("DynamicServerListLoadBalancer:");
        sb.append(super.toString());
        sb.append("ServerList:" + String.valueOf(serverListImpl));
        return sb.toString();
    }
    
    @Override
    public void shutdown() {
        super.shutdown();
        stopServerListRefreshing();
    }


    @Monitor(name="LastUpdated", type=DataSourceType.INFORMATIONAL)
    public String getLastUpdate() {
        return serverListUpdater.getLastUpdate();
    }

    @Monitor(name="DurationSinceLastUpdateMs", type= DataSourceType.GAUGE)
    public long getDurationSinceLastUpdateMs() {
        return serverListUpdater.getDurationSinceLastUpdateMs();
    }

    @Monitor(name="NumUpdateCyclesMissed", type=DataSourceType.GAUGE)
    public int getNumberMissedCycles() {
        return serverListUpdater.getNumberMissedCycles();
    }

    @Monitor(name="NumThreads", type=DataSourceType.GAUGE)
    public int getCoreThreads() {
        return serverListUpdater.getCoreThreads();
    }
}


三 ServerList在类图中的位置


四 ServerList源码解读

public interface ServerList<T extends Server> {
    //用于获取初始化的服务实例清单
    public List<T> getInitialListOfServers();
    //获取更新服务实例清单
    public List<T> getUpdatedListOfServers();   

}


五 EurekaRibbonClientConfiguration类源码解读

//该类实现了DynamicServerListLoadBalancer类中的serverListImpl
@Configuration
public class EurekaRibbonClientConfiguration {

    private static final Log log = LogFactory.getLog(EurekaRibbonClientConfiguration.class);

    @Value("${ribbon.eureka.approximateZoneFromHostname:false}")
    private boolean approximateZoneFromHostname = false;

    @RibbonClientName
    private String serviceId = "client";

    @Autowired(required = false)
    private EurekaClientConfig clientConfig;

    @Autowired(required = false)
    private EurekaInstanceConfig eurekaConfig;

    @Autowired
    private PropertiesFactory propertiesFactory;

    public EurekaRibbonClientConfiguration() {
    }

    public EurekaRibbonClientConfiguration(EurekaClientConfig clientConfig,
            String serviceId, EurekaInstanceConfig eurekaConfig,
            boolean approximateZoneFromHostname) {
        this.clientConfig = clientConfig;
        this.serviceId = serviceId;
        this.eurekaConfig = eurekaConfig;
        this.approximateZoneFromHostname = approximateZoneFromHostname;
    }

    @Bean
    @ConditionalOnMissingBean
    public IPing ribbonPing(IClientConfig config) {
        if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
            return this.propertiesFactory.get(IPing.class, config, serviceId);
        }
        NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
        ping.initWithNiwsConfig(config);
        return ping;
    }

    @Bean
    @ConditionalOnMissingBean
    public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
        if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
            return this.propertiesFactory.get(ServerList.class, config, serviceId);
        }
        
        DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
                config, eurekaClientProvider);
        //这里创建了一个DomainExtractingServerList实例,discoveryServerList的类型是DiscoveryEnabledNIWSServerList
        DomainExtractingServerList serverList = new DomainExtractingServerList(
                discoveryServerList, config, this.approximateZoneFromHostname);
        return serverList;
    }

    @Bean
    public ServerIntrospector serverIntrospector() {
        return new EurekaServerIntrospector();
    }

    @PostConstruct
    public void preprocess() {
        String zone = ConfigurationManager.getDeploymentContext()
                .getValue(ContextKey.zone);
        if (this.clientConfig != null && StringUtils.isEmpty(zone)) {
            if (this.approximateZoneFromHostname && this.eurekaConfig != null) {
                String approxZone = ZoneUtils
                        .extractApproximateZone(this.eurekaConfig.getHostName(false));
                log.debug("Setting Zone To " + approxZone);
                ConfigurationManager.getDeploymentContext().setValue(ContextKey.zone,
                        approxZone);
            }
            else {
                String availabilityZone = this.eurekaConfig == null ? null
                        : this.eurekaConfig.getMetadataMap().get("zone");
                if (availabilityZone == null) {
                    String[] zones = this.clientConfig
                            .getAvailabilityZones(this.clientConfig.getRegion());
                    // Pick the first one from the regions we want to connect to
                    availabilityZone = zones != null && zones.length > 0 ? zones[0]
                            : null;
                }
                if (availabilityZone != null) {
                    // You can set this with archaius.deployment.* (maybe requires
                    // custom deployment context)?
                    ConfigurationManager.getDeploymentContext().setValue(ContextKey.zone,
                            availabilityZone);
                }
            }
        }
        RibbonUtils.initializeRibbonDefaults(serviceId);
    }

}


六 DomainExtractingServerList源码解读

//DomainExtractingServerList实现了ServerList
public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {

    //内部定义了ServerList
    private ServerList<DiscoveryEnabledServer> list;
    private final RibbonProperties ribbon;

    private boolean approximateZoneFromHostname;

    //构造函数的参数list是DiscoveryEnabledNIWSServerList
    public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
            IClientConfig clientConfig, boolean approximateZoneFromHostname) {
        this.list = list;
        this.ribbon = RibbonProperties.from(clientConfig);
        this.approximateZoneFromHostname = approximateZoneFromHostname;
    }

    //getInitialListOfServers委托给内部list对象的getInitialListOfServers方法
    @Override
    public List<DiscoveryEnabledServer> getInitialListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(this.list
            //下面这个函数获取到最新的服务实例清单    
            .getInitialListOfServers());
        return servers;
    }

    //getUpdatedListOfServers委托给内部list对象的getUpdatedListOfServers方法
    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(this.list
                .getUpdatedListOfServers());
        return servers;
    }
    
    //将DiscoveryEnabledServer对象转换为其子类对象DomainExtractingServer
    private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
        List<DiscoveryEnabledServer> result = new ArrayList<>();
        boolean isSecure = this.ribbon.isSecure(true);
        boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
        for (DiscoveryEnabledServer server : servers) {
            result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
                    this.approximateZoneFromHostname));
        }
        return result;
    }

}


七 DiscoveryEnabledNIWSServerList解读

//DomainExtractingServerList实现了ServerList
public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {

    //内部定义了ServerList
    private ServerList<DiscoveryEnabledServer> list;
    private final RibbonProperties ribbon;

    private boolean approximateZoneFromHostname;

    //构造函数的参数list是DiscoveryEnabledNIWSServerList
    public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
            IClientConfig clientConfig, boolean approximateZoneFromHostname) {
        this.list = list;
        this.ribbon = RibbonProperties.from(clientConfig);
        this.approximateZoneFromHostname = approximateZoneFromHostname;
    }

    //getInitialListOfServers委托给内部list对象的getInitialListOfServers方法
    @Override
    public List<DiscoveryEnabledServer> getInitialListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(this.list
            //下面这个函数获取到最新的服务实例清单    
            .getInitialListOfServers());
        return servers;
    }

    //getUpdatedListOfServers委托给内部list对象的getUpdatedListOfServers方法
    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
        List<DiscoveryEnabledServer> servers = setZones(this.list
                .getUpdatedListOfServers());
        return servers;
    }
    
    //将DiscoveryEnabledServer对象转换为其子类对象DomainExtractingServer
    private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
        List<DiscoveryEnabledServer> result = new ArrayList<>();
        boolean isSecure = this.ribbon.isSecure(true);
        boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
        for (DiscoveryEnabledServer server : servers) {
            result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
                    this.approximateZoneFromHostname));
        }
        return result;
    }

}


八 DomainExtractingServer解读

//该类是DiscoveryEnabledServer子类,该类一般用于DiscoveryEnabledServer到DomainExtractingServer的转换
class DomainExtractingServer extends DiscoveryEnabledServer {

    private String id;

    @Override
    public String getId() {
        return id;
    }

    @Override
    public void setId(String id) {
        this.id = id;
    }

    //设置了服务实例的一些必要属性,比如id、zone、isAliveFlag
    public DomainExtractingServer(DiscoveryEnabledServer server, boolean useSecurePort,
            boolean useIpAddr, boolean approximateZoneFromHostname) {
        // host and port are set in super()
        super(server.getInstanceInfo(), useSecurePort, useIpAddr);
        if (server.getInstanceInfo().getMetadata().containsKey("zone")) {
            setZone(server.getInstanceInfo().getMetadata().get("zone"));
        }
        else if (approximateZoneFromHostname) {
            setZone(ZoneUtils.extractApproximateZone(server.getHost()));
        }
        else {
            setZone(server.getZone());
        }
        setId(extractId(server));
        setAlive(server.isAlive());
        setReadyToServe(server.isReadyToServe());
    }

    private String extractId(Server server) {
        if (server instanceof DiscoveryEnabledServer) {
            DiscoveryEnabledServer enabled = (DiscoveryEnabledServer) server;
            InstanceInfo instance = enabled.getInstanceInfo();
            if (instance.getMetadata().containsKey("instanceId")) {
                return instance.getHostName()+":"+instance.getMetadata().get("instanceId");
            }
        }
        return super.getId();
    }
}



版权声明:本文为chengqiuming原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。