一 DynamicServerListLoadBalancer在类图中的位置
二 DynamicServerListLoadBalancer源码解读
1 关键代码请见注释
2 源码位置:ribbon-master\ribbon-loadbalancer\src\main\java\com\netflix\loadbalancer\DynamicServerListLoadBalancer.java
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);
boolean isSecure = false;
boolean useTunnel = false;
protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);
//服务列表操作对象
//这里泛型T是一个Server的子类,代表了一个具体的服务实例扩展类
volatile ServerList<T> serverListImpl;
volatile ServerListFilter<T> filter;
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
protected volatile ServerListUpdater serverListUpdater;
public DynamicServerListLoadBalancer() {
super();
}
@Deprecated
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter) {
this(
clientConfig,
rule,
ping,
serverList,
filter,
new PollingServerListUpdater()
);
}
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
public DynamicServerListLoadBalancer(IClientConfig clientConfig) {
initWithNiwsConfig(clientConfig);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
try {
super.initWithNiwsConfig(clientConfig);
String niwsServerListClassName = clientConfig.getPropertyAsString(
CommonClientConfigKey.NIWSServerListClassName,
DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS);
ServerList<T> niwsServerListImpl = (ServerList<T>) ClientFactory
.instantiateInstanceWithClientConfig(niwsServerListClassName, clientConfig);
this.serverListImpl = niwsServerListImpl;
if (niwsServerListImpl instanceof AbstractServerList) {
AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl)
.getFilterImpl(clientConfig);
niwsFilter.setLoadBalancerStats(getLoadBalancerStats());
this.filter = niwsFilter;
}
String serverListUpdaterClassName = clientConfig.getPropertyAsString(
CommonClientConfigKey.ServerListUpdaterClassName,
DefaultClientConfigImpl.DEFAULT_SERVER_LIST_UPDATER_CLASS
);
this.serverListUpdater = (ServerListUpdater) ClientFactory
.instantiateInstanceWithClientConfig(serverListUpdaterClassName, clientConfig);
restOfInit(clientConfig);
} catch (Exception e) {
throw new RuntimeException(
"Exception while initializing NIWSDiscoveryLoadBalancer:"
+ clientConfig.getClientName()
+ ", niwsClientConfig:" + clientConfig, e);
}
}
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
@Override
public void setServersList(List lsrv) {
super.setServersList(lsrv);
List<T> serverList = (List<T>) lsrv;
Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
for (Server server : serverList) {
// make sure ServerStats is created to avoid creating them on hot
// path
getLoadBalancerStats().getSingleServerStat(server);
String zone = server.getZone();
if (zone != null) {
zone = zone.toLowerCase();
List<Server> servers = serversInZones.get(zone);
if (servers == null) {
servers = new ArrayList<Server>();
serversInZones.put(zone, servers);
}
servers.add(server);
}
}
setServerListForZones(serversInZones);
}
protected void setServerListForZones(
Map<String, List<Server>> zoneServersMap) {
LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
}
public ServerList<T> getServerListImpl() {
return serverListImpl;
}
public void setServerListImpl(ServerList<T> niwsServerList) {
this.serverListImpl = niwsServerList;
}
public ServerListFilter<T> getFilter() {
return filter;
}
public void setFilter(ServerListFilter<T> filter) {
this.filter = filter;
}
public ServerListUpdater getServerListUpdater() {
return serverListUpdater;
}
public void setServerListUpdater(ServerListUpdater serverListUpdater) {
this.serverListUpdater = serverListUpdater;
}
@Override
public void forceQuickPing() {
// no-op
}
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
private String getIdentifier() {
return this.getClientConfig().getClientName();
}
public void stopServerListRefreshing() {
if (serverListUpdater != null) {
serverListUpdater.stop();
}
}
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
/**
* Update the AllServer list in the LoadBalancer if necessary and enabled
*
* @param ls
*/
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("DynamicServerListLoadBalancer:");
sb.append(super.toString());
sb.append("ServerList:" + String.valueOf(serverListImpl));
return sb.toString();
}
@Override
public void shutdown() {
super.shutdown();
stopServerListRefreshing();
}
@Monitor(name="LastUpdated", type=DataSourceType.INFORMATIONAL)
public String getLastUpdate() {
return serverListUpdater.getLastUpdate();
}
@Monitor(name="DurationSinceLastUpdateMs", type= DataSourceType.GAUGE)
public long getDurationSinceLastUpdateMs() {
return serverListUpdater.getDurationSinceLastUpdateMs();
}
@Monitor(name="NumUpdateCyclesMissed", type=DataSourceType.GAUGE)
public int getNumberMissedCycles() {
return serverListUpdater.getNumberMissedCycles();
}
@Monitor(name="NumThreads", type=DataSourceType.GAUGE)
public int getCoreThreads() {
return serverListUpdater.getCoreThreads();
}
}
三 ServerList在类图中的位置
四 ServerList源码解读
public interface ServerList<T extends Server> {
//用于获取初始化的服务实例清单
public List<T> getInitialListOfServers();
//获取更新服务实例清单
public List<T> getUpdatedListOfServers();
}
五 EurekaRibbonClientConfiguration类源码解读
//该类实现了DynamicServerListLoadBalancer类中的serverListImpl
@Configuration
public class EurekaRibbonClientConfiguration {
private static final Log log = LogFactory.getLog(EurekaRibbonClientConfiguration.class);
@Value("${ribbon.eureka.approximateZoneFromHostname:false}")
private boolean approximateZoneFromHostname = false;
@RibbonClientName
private String serviceId = "client";
@Autowired(required = false)
private EurekaClientConfig clientConfig;
@Autowired(required = false)
private EurekaInstanceConfig eurekaConfig;
@Autowired
private PropertiesFactory propertiesFactory;
public EurekaRibbonClientConfiguration() {
}
public EurekaRibbonClientConfiguration(EurekaClientConfig clientConfig,
String serviceId, EurekaInstanceConfig eurekaConfig,
boolean approximateZoneFromHostname) {
this.clientConfig = clientConfig;
this.serviceId = serviceId;
this.eurekaConfig = eurekaConfig;
this.approximateZoneFromHostname = approximateZoneFromHostname;
}
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
return this.propertiesFactory.get(IPing.class, config, serviceId);
}
NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
ping.initWithNiwsConfig(config);
return ping;
}
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
//这里创建了一个DomainExtractingServerList实例,discoveryServerList的类型是DiscoveryEnabledNIWSServerList
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
@Bean
public ServerIntrospector serverIntrospector() {
return new EurekaServerIntrospector();
}
@PostConstruct
public void preprocess() {
String zone = ConfigurationManager.getDeploymentContext()
.getValue(ContextKey.zone);
if (this.clientConfig != null && StringUtils.isEmpty(zone)) {
if (this.approximateZoneFromHostname && this.eurekaConfig != null) {
String approxZone = ZoneUtils
.extractApproximateZone(this.eurekaConfig.getHostName(false));
log.debug("Setting Zone To " + approxZone);
ConfigurationManager.getDeploymentContext().setValue(ContextKey.zone,
approxZone);
}
else {
String availabilityZone = this.eurekaConfig == null ? null
: this.eurekaConfig.getMetadataMap().get("zone");
if (availabilityZone == null) {
String[] zones = this.clientConfig
.getAvailabilityZones(this.clientConfig.getRegion());
// Pick the first one from the regions we want to connect to
availabilityZone = zones != null && zones.length > 0 ? zones[0]
: null;
}
if (availabilityZone != null) {
// You can set this with archaius.deployment.* (maybe requires
// custom deployment context)?
ConfigurationManager.getDeploymentContext().setValue(ContextKey.zone,
availabilityZone);
}
}
}
RibbonUtils.initializeRibbonDefaults(serviceId);
}
}
六 DomainExtractingServerList源码解读
//DomainExtractingServerList实现了ServerList
public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {
//内部定义了ServerList
private ServerList<DiscoveryEnabledServer> list;
private final RibbonProperties ribbon;
private boolean approximateZoneFromHostname;
//构造函数的参数list是DiscoveryEnabledNIWSServerList
public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
IClientConfig clientConfig, boolean approximateZoneFromHostname) {
this.list = list;
this.ribbon = RibbonProperties.from(clientConfig);
this.approximateZoneFromHostname = approximateZoneFromHostname;
}
//getInitialListOfServers委托给内部list对象的getInitialListOfServers方法
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(this.list
//下面这个函数获取到最新的服务实例清单
.getInitialListOfServers());
return servers;
}
//getUpdatedListOfServers委托给内部list对象的getUpdatedListOfServers方法
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(this.list
.getUpdatedListOfServers());
return servers;
}
//将DiscoveryEnabledServer对象转换为其子类对象DomainExtractingServer
private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
List<DiscoveryEnabledServer> result = new ArrayList<>();
boolean isSecure = this.ribbon.isSecure(true);
boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
for (DiscoveryEnabledServer server : servers) {
result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
this.approximateZoneFromHostname));
}
return result;
}
}
七 DiscoveryEnabledNIWSServerList解读
//DomainExtractingServerList实现了ServerList
public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {
//内部定义了ServerList
private ServerList<DiscoveryEnabledServer> list;
private final RibbonProperties ribbon;
private boolean approximateZoneFromHostname;
//构造函数的参数list是DiscoveryEnabledNIWSServerList
public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
IClientConfig clientConfig, boolean approximateZoneFromHostname) {
this.list = list;
this.ribbon = RibbonProperties.from(clientConfig);
this.approximateZoneFromHostname = approximateZoneFromHostname;
}
//getInitialListOfServers委托给内部list对象的getInitialListOfServers方法
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(this.list
//下面这个函数获取到最新的服务实例清单
.getInitialListOfServers());
return servers;
}
//getUpdatedListOfServers委托给内部list对象的getUpdatedListOfServers方法
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(this.list
.getUpdatedListOfServers());
return servers;
}
//将DiscoveryEnabledServer对象转换为其子类对象DomainExtractingServer
private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
List<DiscoveryEnabledServer> result = new ArrayList<>();
boolean isSecure = this.ribbon.isSecure(true);
boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
for (DiscoveryEnabledServer server : servers) {
result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
this.approximateZoneFromHostname));
}
return result;
}
}
八 DomainExtractingServer解读
//该类是DiscoveryEnabledServer子类,该类一般用于DiscoveryEnabledServer到DomainExtractingServer的转换
class DomainExtractingServer extends DiscoveryEnabledServer {
private String id;
@Override
public String getId() {
return id;
}
@Override
public void setId(String id) {
this.id = id;
}
//设置了服务实例的一些必要属性,比如id、zone、isAliveFlag
public DomainExtractingServer(DiscoveryEnabledServer server, boolean useSecurePort,
boolean useIpAddr, boolean approximateZoneFromHostname) {
// host and port are set in super()
super(server.getInstanceInfo(), useSecurePort, useIpAddr);
if (server.getInstanceInfo().getMetadata().containsKey("zone")) {
setZone(server.getInstanceInfo().getMetadata().get("zone"));
}
else if (approximateZoneFromHostname) {
setZone(ZoneUtils.extractApproximateZone(server.getHost()));
}
else {
setZone(server.getZone());
}
setId(extractId(server));
setAlive(server.isAlive());
setReadyToServe(server.isReadyToServe());
}
private String extractId(Server server) {
if (server instanceof DiscoveryEnabledServer) {
DiscoveryEnabledServer enabled = (DiscoveryEnabledServer) server;
InstanceInfo instance = enabled.getInstanceInfo();
if (instance.getMetadata().containsKey("instanceId")) {
return instance.getHostName()+":"+instance.getMetadata().get("instanceId");
}
}
return super.getId();
}
}
版权声明:本文为chengqiuming原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。