一、Ribbon的负载均衡实现
在Riibon中一个非常重要的组件为LoadBalancerClient,它作为负载均衡的一个客户端。它在spring-cloud-commons包下:的LoadBalancerClient是一个接口,它继承ServiceInstanceChooser,它的实现类是RibbonLoadBalancerClient
其中LoadBalancerClient接口,有如下三个方法,其中excute()为执行请求,reconstructURI()用来重构URL:
public interface LoadBalancerClient extends ServiceInstanceChooser {
<T> T execute(String serviceId, LoadBalancerRequest<T> request)
throws IOException;
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request)
throws IOException;
URI reconstructURI(ServiceInstance instance, URI original);
}
ServiceInstanceChooser接口,主要有一个方法,用来根据serviceId来获取ServiceInstance,代码如下:
public interface ServiceInstanceChooser { ServiceInstance choose(String serviceId); }
LoadBalancerClient的实现类为RibbonLoadBalancerClient,这个类是非常重要的一个类,最终的负载均衡的请求处理,由它来执行。它的部分源码如下:
public class RibbonLoadBalancerClient implements LoadBalancerClient {
...//省略代码
@Override
public ServiceInstance choose(String serviceId) {
Server server = getServer(serviceId);
if (server == null) {
return null;
}
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}
protected Server getServer(String serviceId) {
return getServer(getLoadBalancer(serviceId));
}
protected Server getServer(ILoadBalancer loadBalancer) {
if (loadBalancer == null) {
return null;
}
return loadBalancer.chooseServer("default"); // TODO: better handling of key
}
protected ILoadBalancer getLoadBalancer(String serviceId) {
return this.clientFactory.getLoadBalancer(serviceId);
}
...//省略代码
在RibbonLoadBalancerClient的源码中,其中choose()方法是选择具体服务实例的一个方法。该方法通过getServer()方法去获取实例,经过源码跟踪,最终交给了ILoadBalancer类去选择服务实例。
ILoadBalancer在ribbon-loadbalancer的jar包下,它是定义了实现软件负载均衡的一个接口,它需要一组可供选择的服务注册列表信息,以及根据特定方法去选择服务,它的源码如下 :
public interface ILoadBalancer { public void addServers(List<Server> newServers); public Server chooseServer(Object key); public void markServerDown(Server server); public List<Server> getReachableServers(); public List<Server> getAllServers(); }
- addServers()方法是添加一个Server集合;
- chooseServer()方法是根据key去获取Server;
- markServerDown()方法用来标记某个服务下线;
- getReachableServers()方法获取可用的Server集合;
- getAllServers()方法获取所有的Server集合;
它的继承类为BaseLoadBalancer,它的实现类为DynamicServerListLoadBalancer
查看上述三个类的源码,可用发现,配置以下信息,IClientConfig、IRule、IPing、ServerList、ServerListFilter和ILoadBalancer,查看BaseLoadBalancer类,它默认的情况下,实现了以下配置:
- IClientConfig ribbonClientConfig: DefaultClientConfigImpl配置
- IRule ribbonRule: RoundRobinRule 路由策略
- IPing ribbonPing: DummyPing
- ServerList ribbonServerList: ConfigurationBasedServerList
- ServerListFilter ribbonServerListFilter: ZonePreferenceServerListFilter
- ILoadBalancer ribbonLoadBalancer: ZoneAwareLoadBalancer
- IClientConfig 用于对客户端或者负载均衡的配置,它的默认实现类为DefaultClientConfigImpl。
IRule用于复杂均衡的策略,它有三个方法
choose()是根据key 来获取server,
setLoadBalancer()是用来设置和ILoadBalancer
getLoadBalancer()是用来获取ILoadBalancer
它的源码如下:
public interface IRule{ public Server choose(Object key); public void setLoadBalancer(ILoadBalancer lb); public ILoadBalancer getLoadBalancer(); }
IRule有很多默认的实现类,这些实现类根据不同的算法和逻辑来处理负载均衡。Ribbon实现的IRule有一下。在大多数情况下,这些默认的实现类是可以满足需求的,如果有特性的需求,可以自己实现。
- BestAvailableRule 选择最小请求数
- ClientConfigEnabledRoundRobinRule 轮询
- RandomRule 随机选择一个server
- RoundRobinRule 轮询选择server
- RetryRule 根据轮询的方式重试
- WeightedResponseTimeRule 根据响应时间去分配一个weight ,weight越低,被选择的可能性就越低
- ZoneAvoidanceRule 根据server的zone区域和可用性来轮询选择
IPing是用来向server发生”ping”,来判断该server是否有响应,从而判断该server是否可用。它有一个isAlive()方法,它的源码如下
public interface IPing {
public boolean isAlive(Server server);
}
IPing的实现类有PingUrl、PingConstant、NoOpPing、DummyPing和NIWSDiscoveryPing
- PingUrl 真实的去ping 某个url,判断其是否alive。
- PingConstant 固定返回某服务是否可用,默认返回true,即可用。
- NoOpPing 不去ping,直接返回true,即可用。
- DummyPing 直接返回true,并实现了initWithNiwsConfig方法。
- NIWSDiscoveryPing,根据DiscoveryEnabledServer的InstanceInfo的InstanceStatus去判断,如果为InstanceStatus.UP,则为可用,否则不可用。
ServerList是定义获取所有的server的注册列表信息的接口,它的代码如下:
public interface ServerList<T extends Server> {
public List<T> getInitialListOfServers();
public List<T> getUpdatedListOfServers();
}
ServerListFilter接口,定于了可根据配置去过滤或者根据特性动态获取符合条件的server列表的方法,代码如下:
public interface ServerListFilter<T extends Server> {
public List<T> getFilteredListOfServers(List<T> servers);
}
阅读DynamicServerListLoadBalancer的源码,DynamicServerListLoadBalancer的构造函数中有个initWithNiwsConfig()方法。在这个方法中,经过一系列的初始化配置,最终执行了restOfInit()方法。其代码如下:
public DynamicServerListLoadBalancer(IClientConfig clientConfig) {
initWithNiwsConfig(clientConfig);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
try {
super.initWithNiwsConfig(clientConfig);
String niwsServerListClassName = clientConfig.getPropertyAsString(
CommonClientConfigKey.NIWSServerListClassName,
DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS);
ServerList<T> niwsServerListImpl = (ServerList<T>) ClientFactory
.instantiateInstanceWithClientConfig(niwsServerListClassName, clientConfig);
this.serverListImpl = niwsServerListImpl;
if (niwsServerListImpl instanceof AbstractServerList) {
AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl)
.getFilterImpl(clientConfig);
niwsFilter.setLoadBalancerStats(getLoadBalancerStats());
this.filter = niwsFilter;
}
String serverListUpdaterClassName = clientConfig.getPropertyAsString(
CommonClientConfigKey.ServerListUpdaterClassName,
DefaultClientConfigImpl.DEFAULT_SERVER_LIST_UPDATER_CLASS
);
this.serverListUpdater = (ServerListUpdater) ClientFactory
.instantiateInstanceWithClientConfig(serverListUpdaterClassName, clientConfig);
restOfInit(clientConfig);
} catch (Exception e) {
throw new RuntimeException(
"Exception while initializing NIWSDiscoveryLoadBalancer:"
+ clientConfig.getClientName()
+ ", niwsClientConfig:" + clientConfig, e);
}
}
在restOfInit()方法上,有一个 updateListOfServers()的方法,该方法是用来获取所有的ServerList的。
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
进一步跟踪updateListOfServers()方法的源码,最终由serverListImpl.getUpdatedListOfServers()获取所有的服务列表的,代码如下:
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
而serverListImpl是ServerList接口的具体实现类。跟踪代码,ServerList的实现类为DiscoveryEnabledNIWSServerList,在ribbon-eureka.jar的com.netflix.niws.loadbalancer下。其中DiscoveryEnabledNIWSServerList有 getInitialListOfServers()和getUpdatedListOfServers()方法,具体代码如下:
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
return obtainServersViaDiscovery();
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
继续跟踪源码,obtainServersViaDiscovery(),是根据eurekaClientProvider.get()来获取EurekaClient,再根据EurekaClient来获取注册列表信息,代码如下:
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// copy is necessary since the InstanceInfo builder just uses the original reference,
// and we don't want to corrupt the global eureka copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
des.setZone(DiscoveryClient.getZone(ii));
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}
其中eurekaClientProvider的实现类是LegacyEurekaClientProvider,它是一个获取eurekaClient类,通过静态的方法去获取eurekaClient,其代码如下:
class LegacyEurekaClientProvider implements Provider<EurekaClient> {
private volatile EurekaClient eurekaClient;
@Override
public synchronized EurekaClient get() {
if (eurekaClient == null) {
eurekaClient = DiscoveryManager.getInstance().getDiscoveryClient();
}
return eurekaClient;
}
}
EurekaClient的实现类为DiscoveryClient,在之前已经分析了它具有服务注册、获取服务注册列表等的全部功能。
由此可见,负载均衡器是从EurekaClient获取服务信息,并根据IRule去路由,并且根据IPing去判断服务的可用性。
那么现在还有个问题,负载均衡器多久一次去获取一次从Eureka Client获取注册信息呢。
在BaseLoadBalancer类下,BaseLoadBalancer的构造函数,该构造函数开启了一个PingTask任务,代码如下:
public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
IPing ping, IPingStrategy pingStrategy) {
...//代码省略
setupPingTask();
...//代码省略
}
setupPingTask()的具体代码逻辑,它开启了ShutdownEnabledTimer执行PingTask任务,在默认情况下pingIntervalSeconds为10,即每10秒钟,想EurekaClient发送一次”ping”。
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}
PingTask源码,即new一个Pinger对象,并执行runPinger()方法。
class PingTask extends TimerTask {
public void run() {
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error pinging", name, e);
}
}
}
查看Pinger的runPinger()方法,最终根据 pingerStrategy.pingServers(ping, allServers)来获取服务的可用性,如果该返回结果,如之前相同,则不去向EurekaClient获取注册列表,如果不同则通知ServerStatusChangeListener或者changeListeners发生了改变,进行更新或者重新拉取。
public void runPinger() throws Exception {
if (!pingInProgress.compareAndSet(false, true)) {
return; // Ping in progress - nothing to do
}
// we are "in" - we get to Ping
Server[] allServers = null;
boolean[] results = null;
Lock allLock = null;
Lock upLock = null;
try {
/*
* The readLock should be free unless an addServer operation is
* going on...
*/
allLock = allServerLock.readLock();
allLock.lock();
allServers = allServerList.toArray(new Server[allServerList.size()]);
allLock.unlock();
int numCandidates = allServers.length;
results = pingerStrategy.pingServers(ping, allServers);
final List<Server> newUpList = new ArrayList<Server>();
final List<Server> changedServers = new ArrayList<Server>();
for (int i = 0; i < numCandidates; i++) {
boolean isAlive = results[i];
Server svr = allServers[i];
boolean oldIsAlive = svr.isAlive();
svr.setAlive(isAlive);
if (oldIsAlive != isAlive) {
changedServers.add(svr);
logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
}
if (isAlive) {
newUpList.add(svr);
}
}
upLock = upServerLock.writeLock();
upLock.lock();
upServerList = newUpList;
upLock.unlock();
notifyServerStatusChangeListener(changedServers);
} finally {
pingInProgress.set(false);
}
}
由此可见,LoadBalancerClient是在初始化的时候,会向Eureka回去服务注册列表,并且向通过10s一次向EurekaClient发送“ping”,来判断服务的可用性,如果服务的可用性发生了改变或者服务数量和之前的不一致,则更新或者重新拉取。LoadBalancerClient有了这些服务注册列表,就可以根据具体的IRule来进行负载均衡。
二、RestTemplate是如何和Ribbon结合的
在使用Ribbon进行服务消费的时候,我们用到了RestTemplate,但是熟悉Spring的人是否产生过这样的疑问:RestTemplate不是Spring自己就有的吗?跟Ribbon的客户端负载均衡又有什么关系呢?带着这些疑问,我们就来看RestTemplate和Ribbon是如何联系起来并实现客户端负载均衡的。
首先,对于消费者示例:我们是如何实现客户端负载均衡的?仔细观察一下代码之前的代码,我们可以发现在消费者的例子中,可能就是这个注解@LoadBalanced是之前没有接触过的,并且从命名上来看也与负载均衡相关。我们不妨根据这个线索来看看源码实现的机制。
从@LoadBalanced注解源码的注释中,我们可以知道该注解用来给RestTemplate标记,以使用负载均衡的客户端(LoadBalancerClient)来配置它。
package org.springframework.cloud.client.loadbalancer;
import org.springframework.beans.factory.annotation.Qualifier;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient
* @author Spencer Gibb
*/
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
通过搜索LoadBalancerClient,我们可以发现这是Spring Cloud中定义的一个接口:
package org.springframework.cloud.client.loadbalancer;
import org.springframework.cloud.client.ServiceInstance;
import java.io.IOException;
import java.net.URI;
/**
* Represents a client side load balancer
* @author Spencer Gibb
*/
public interface LoadBalancerClient extends ServiceInstanceChooser {
/**
* execute request using a ServiceInstance from the LoadBalancer for the specified
* service
* @param serviceId the service id to look up the LoadBalancer
* @param request allows implementations to execute pre and post actions such as
* incrementing metrics
* @return the result of the LoadBalancerRequest callback on the selected
* ServiceInstance
*/
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
/**
* execute request using a ServiceInstance from the LoadBalancer for the specified
* service
* @param serviceId the service id to look up the LoadBalancer
* @param serviceInstance the service to execute the request to
* @param request allows implementations to execute pre and post actions such as
* incrementing metrics
* @return the result of the LoadBalancerRequest callback on the selected
* ServiceInstance
*/
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
/**
* Create a proper URI with a real host and port for systems to utilize.
* Some systems use a URI with the logical serivce name as the host,
* such as http://myservice/path/to/service. This will replace the
* service name with the host:port from the ServiceInstance.
* @param instance
* @param original a URI with the host as a logical service name
* @return a reconstructed URI
*/
URI reconstructURI(ServiceInstance instance, URI original);
}
该接口中继承了ServiceInstanceChooser接口,我们可以通过定义的抽象方法来了解到客户端负载均衡器中应具备的几种能力:
- ServiceInstance choose(String serviceId):根据传入的服务名serviceId,从负载均衡器中挑选一个对应服务的实例。
- T execute(String serviceId, LoadBalancerRequest request) throws IOException:使用从负载均衡器中挑选出的服务实例来执行请求内容。
- URI reconstructURI(ServiceInstance instance, URI original):为系统构建一个合适的“host:port”形式的URI。在分布式系统中,我们使用逻辑上的服务名称作为host来构建URI(替代服务实例的“host:port”形式)进行请求。
比如:http://myservice/path/to/service。在该操作的定义中,前者ServiceInstance对象是带有host和port的具体服务实例,而后者URI对象则是使用逻辑服务名定义为host的URI,而返回的URI内容则是通过ServiceInstance的服务实例详情拼接出的具体“host:post”形式的请求地址。
顺着LoadBalancerClient接口的所属包org.springframework.cloud.client.loadbalancer,我们对其内容进行整理,可以得出如下图的关系:
从类的命名上我们初步判断LoadBalancerAutoConfiguration为实现客户端负载均衡器的自动化配置类。通过查看源码,我们可以验证这一点假设:
@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
final List<RestTemplateCustomizer> customizers) {
return new SmartInitializingSingleton() {
@Override
public void afterSingletonsInstantiated() {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
}
};
}
@Autowired(required = false)
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(
LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
}
@Configuration
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return new RestTemplateCustomizer() {
@Override
public void customize(RestTemplate restTemplate) {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
}
};
}
}
@Configuration
@ConditionalOnClass(RetryTemplate.class)
public static class RetryAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
template.setThrowLastExceptionOnExhausted(true);
return template;
}
@Bean
@ConditionalOnMissingBean
public LoadBalancedRetryPolicyFactory loadBalancedRetryPolicyFactory() {
return new LoadBalancedRetryPolicyFactory.NeverRetryFactory();
}
@Bean
@ConditionalOnMissingBean
public LoadBalancedBackOffPolicyFactory loadBalancedBackOffPolicyFactory() {
return new LoadBalancedBackOffPolicyFactory.NoBackOffPolicyFactory();
}
@Bean
@ConditionalOnMissingBean
public LoadBalancedRetryListenerFactory loadBalancedRetryListenerFactory() {
return new LoadBalancedRetryListenerFactory.DefaultRetryListenerFactory();
}
}
@Configuration
@ConditionalOnClass(RetryTemplate.class)
public static class RetryInterceptorAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public RetryLoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties,
LoadBalancedRetryPolicyFactory lbRetryPolicyFactory,
LoadBalancerRequestFactory requestFactory,
LoadBalancedBackOffPolicyFactory backOffPolicyFactory,
LoadBalancedRetryListenerFactory retryListenerFactory) {
return new RetryLoadBalancerInterceptor(loadBalancerClient, properties,
lbRetryPolicyFactory, requestFactory, backOffPolicyFactory, retryListenerFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
return new RestTemplateCustomizer() {
@Override
public void customize(RestTemplate restTemplate) {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
}
};
}
}
}
从LoadBalancerAutoConfiguration类头上的注解可以知道Ribbon实现的负载均衡自动化配置需要满足下面两个条件:
- @ConditionalOnClass(RestTemplate.class):RestTemplate类必须存在于当前工程的环境中。
- @ConditionalOnBean(LoadBalancerClient.class):在Spring的Bean工程中有必须有LoadBalancerClient的实现Bean。
在该自动化配置类中,主要做了下面三件事:
- 创建了一个LoadBalancerInterceptor的Bean,用于实现对客户端发起请求时进行拦截,以实现客户端负载均衡。
- 创建了一个RestTemplateCustomizer的Bean,用于给RestTemplate增加LoadBalancerInterceptor拦截器。
- 维护了一个被@LoadBalanced注解修饰的RestTemplate对象列表,并在这里进行初始化,通过调用RestTemplateCustomizer的实例来给需要客户端负载均衡的RestTemplate增加LoadBalancerInterceptor拦截器。
接下来,我们看看LoadBalancerInterceptor拦截器是如何将一个普通的RestTemplate变成客户端负载均衡的:
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}
}
通过源码以及之前的自动化配置类,我们可以看到在拦截器中注入了LoadBalancerClient的实现。当一个被@LoadBalanced注解修饰的RestTemplate对象向外发起HTTP请求时,会被LoadBalancerInterceptor类的intercept函数所拦截。由于我们在使用RestTemplate时候采用了服务名作为host,所以直接从HttpRequest的URI对象中通过getHost()就可以拿到服务名,然后调用execute函数去根据服务名来选择实例并发起实际的请求。
分析到这里,LoadBalancerClient还只是一个抽象的负载均衡器接口,所以我们还需要找到它的具体实现类来进一步分析。通过查看ribbon的源码,我们可以很容易的在org.springframework.cloud.netflix.ribbon包下找到对应的实现类:RibbonLoadBalancerClient。
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if(serviceInstance instanceof RibbonServer) {
server = ((RibbonServer)serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
可以看到,在execute函数的实现中,首先从服务列表中获取RibbonServer,然后根据服务ID获取RibbonLoadBalanceContext上下文对象,最后创建一个Ribbon的统计记录对象,代码如下:
public static class RibbonServer implements ServiceInstance {
private final String serviceId;
private final Server server;
private final boolean secure;
private Map<String, String> metadata;
public RibbonServer(String serviceId, Server server) {
this(serviceId, server, false, Collections.<String, String> emptyMap());
}
public RibbonServer(String serviceId, Server server, boolean secure,
Map<String, String> metadata) {
this.serviceId = serviceId;
this.server = server;
this.secure = secure;
this.metadata = metadata;
}
@Override
public String getServiceId() {
return this.serviceId;
}
@Override
public String getHost() {
return this.server.getHost();
}
@Override
public int getPort() {
return this.server.getPort();
}
@Override
public boolean isSecure() {
return this.secure;
}
@Override
public URI getUri() {
return DefaultServiceInstance.getUri(this);
}
@Override
public Map<String, String> getMetadata() {
return this.metadata;
}
public Server getServer() {
return this.server;
}
@Override
public String toString() {
final StringBuffer sb = new StringBuffer("RibbonServer{");
sb.append("serviceId='").append(serviceId).append('\'');
sb.append(", server=").append(server);
sb.append(", secure=").append(secure);
sb.append(", metadata=").append(metadata);
sb.append('}');
return sb.toString();
}
}
三、总结
综上所述,Ribbon的负载均衡,主要通过LoadBalancerClient来实现的,而LoadBalancerClient具体交给了ILoadBalancer来处理,ILoadBalancer通过配置IRule、IPing等信息,并向EurekaClient获取注册列表的信息,并默认10秒一次向EurekaClient发送“ping”,进而检查是否更新服务列表,最后,得到注册列表后,ILoadBalancer根据IRule的策略进行负载均衡。
而RestTemplate 被@LoadBalance注解后,能过用负载均衡,主要是维护了一个被@LoadBalance注解的RestTemplate列表,并给列表中的RestTemplate添加拦截器,进而交给负载均衡器去处理。