概念
dubbo的负载均衡,从本质上来说是客户端负载均衡,按照官网的文档说明,一共有四种负载均衡模式,缺省条件下为random。
策略(描述为
官网描述
)
父类
org.apache.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance
这里主要实现了一个通用的权重算法
(
有一个预热的过程,服务刚启动过程承受的最终负载比较少,随着服务运行时间的增长,承受的负载渐渐逼近真实所需要承受的负载
)
/**
* Calculate the weight according to the uptime proportion of warmup time
* the new weight will be within 1(inclusive) to weight(inclusive)
*
* @param uptime the uptime in milliseconds
* @param warmup the warmup time in milliseconds
* @param weight the weight of an invoker
* @return weight which takes warmup into account
*/
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
int ww = (int) ( uptime / ((float) warmup / weight));
return ww < 1 ? 1 : (Math.min(ww, weight));
}
/**
* Get the weight of the invoker's invocation which takes warmup time into account
* if the uptime is within the warmup time, the weight will be reduce proportionally
*
* @param invoker the invoker
* @param invocation the invocation of this invoker
* @return weight
*/
int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight;
URL url = invoker.getUrl();
// 先获取服务的权重
if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) {
weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);
} else {
weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
if (weight > 0) {
// 得到服务的启动时间戳
long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
//服务运行时间
long uptime = System.currentTimeMillis() - timestamp;
if (uptime < 0) {
return 1;
}
//获取设置的预热时间 (10分钟)
int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight((int)uptime, warmup, weight);
}
}
}
}
return Math.max(weight, 0);
}
各个实现
Random LoadBalance
-
随机
,按权重设置随机概率。 - 在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
ex: 根据权重大小来生成概率区间,例如结果为 a,b,c对应的比例为1:2:3。那么随机区间为
[0,1),[1,3),[3,6)。
实现代码 org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
比较简单的加权随机
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// Every invoker has the same weight?
boolean sameWeight = true;
// the maxWeight of every invokers, the minWeight = 0 or the maxWeight of the last invoker
int[] weights = new int[length];
// The sum of weights
int totalWeight = 0;
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
// Sum
totalWeight += weight;
// save for later use
weights[i] = totalWeight;
if (sameWeight && totalWeight != weight * (i + 1)) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {
if (offset < weights[i]) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
RoundRobin LoadBalance
-
轮询
,按公约后的权重设置轮询比率。 - 存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
String identifyString = invoker.getUrl().toIdentityString();
int weight = getWeight(invoker, invocation);
WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
WeightedRoundRobin wrr = new WeightedRoundRobin();
wrr.setWeight(weight);
return wrr;
});
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
if (invokers.size() != map.size()) {
map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
}
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}
思路,在权重的最大公约数的次数中,按照权重比例进行轮询
ex: a,b,c三个相同provider,设置的权重为1:3:2,那么按照调用顺序6次内为 a,b,c,b,c,b
LeastActive LoadBalance
- 最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
- 使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。
实现代码 org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
public class LeastActiveLoadBalance extends AbstractLoadBalance {
public static final String NAME = "leastactive";
private final Random random = new Random();
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // 总个数
int leastActive = -1; // 最小的活跃数
int leastCount = 0; // 相同最小活跃数的个数
int[] leastIndexs = new int[length]; // 相同最小活跃数的下标
int totalWeight = 0; // 总权重
int firstWeight = 0; // 第一个权重,用于于计算是否相同
boolean sameWeight = true; // 是否所有权重相同
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 活跃数
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 权重
if (leastActive == -1 || active < leastActive) { // 发现更小的活跃数,重新开始
leastActive = active; // 记录最小活跃数
leastCount = 1; // 重新统计相同最小活跃数的个数
leastIndexs[0] = i; // 重新记录最小活跃数下标
totalWeight = weight; // 重新累计总权重
firstWeight = weight; // 记录第一个权重
sameWeight = true; // 还原权重相同标识
} else if (active == leastActive) { // 累计相同最小的活跃数
leastIndexs[leastCount ++] = i; // 累计相同最小活跃数下标
totalWeight += weight; // 累计总权重
// 判断所有权重是否一样
if (sameWeight && i > 0
&& weight != firstWeight) {
sameWeight = false;
}
}
}
// assert(leastCount > 0)
if (leastCount == 1) {
// 如果只有一个最小则直接返回
return invokers.get(leastIndexs[0]);
}
if (! sameWeight && totalWeight > 0) {
// 如果权重不相同且权重大于0则按总权重数随机
int offsetWeight = random.nextInt(totalWeight);
// 并确定随机值落在哪个片断上
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexs[i];
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
if (offsetWeight <= 0)
return invokers.get(leastIndex);
}
}
// 如果权重相同或权重为0则均等随机
return invokers.get(leastIndexs[random.nextInt(leastCount)]);
}
}
思路:
当请求集群时,根据当前最少的活跃数来筛选要请求的服务,相同权重的invoker再根据权重来随机分配。这里对活跃数的处理是各invoker维护一个计数器,请求处理时计数器+1,请求处理完成,计数器-1(越低性能越高)
ConsistentHash LoadBalance
- 一致性 Hash,相同参数的请求总是发到同一提供者。
- 当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
- 算法参见:http://en.wikipedia.org/wiki/Consistent_hashing
- 缺省只对第一个参数 Hash,如果要修改,请配置 <dubbo:parameter key=“hash.arguments” value=“0,1” />
- 缺省用 160 份虚拟节点,如果要修改,请配置 <dubbo:parameter key=“hash.nodes” value=“320” />
实现类 org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 获取调用方法名
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 生成调用列表hashCode
int identityHashCode = System.identityHashCode(invokers);
// 以调用方法名为key,获取一致性hash选择器
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
// 若不存在则创建新的选择器
if (selector == null || selector.getIdentityHashCode() != identityHashCode) {
// 创建ConsistentHashSelector时会生成所有虚拟结点
selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
// 获取选择器
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
// 选择结点
return selector.select(invocation);
}
hash环的实现
private static final class ConsistentHashSelector<T> {
private final TreeMap<Long, Invoker<T>> virtualInvokers; //虚拟节点
private final int replicaNumber; //环上虚拟点的个数
private final int identityHashCode;
private final int[] argumentIndex; //参数索引数组
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
//默认虚拟节点的个数
this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
//根据哪些参数来生产hash值,默认是第一个参数
String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
//为每一个Invoker创建replicaNumber 个虚拟节点,每一个节点的Hashcode不同。
// 同一个Invoker不同hashcode的创建逻辑为:
//invoker.getUrl().getAddress() + i (0-39)的值,每个md5的值是128位16个字节,然后去每32位4个字节区分开来4个,存到long的后32位中,long的前32位为0。
// 一致性hash实现的一个关键是如果将一个Invoker创建的replicaNumber 个虚拟节点(hashcode)能够均匀分布在Hash环上
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = Bytes.getMD5(address + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
public Invoker<T> select(Invocation invocation) {
String key = toKey(invocation.getArguments());
byte[] digest = Bytes.getMD5(key);
return selectForKey(hash(digest, 0));
}
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}
private Invoker<T> selectForKey(long hash) {
//ceilingEntry(K key) 方法用来返回与该键至少大于或等于给定键,如果不存在这样的键的键 - 值映射,则返回null相关联。
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
}