当前位置:首页 > 公众号精选 > 架构师社区
[导读]前言 负载均衡是指在集群中,将多个数据请求分散在不同单元上进行执行,主要为了提高系统容错能力和加强系统对数据的处理能力。 在 Dubbo 中,一次服务的调用就是对所有实体域 Invoker 的一次筛选过滤,最终选定具体调用的 Invoker。首先在 Directory 中获取

Dubbo 负载均衡的实现

前言

负载均衡是指在集群中,将多个数据请求分散在不同单元上进行执行,主要为了提高系统容错能力和加强系统对数据的处理能力。

在 Dubbo 中,一次服务的调用就是对所有实体域 Invoker 的一次筛选过滤,最终选定具体调用的 Invoker。首先在 Directory 中获取全部 Invoker 列表,通过路由筛选出符合规则的 Invoker,最后再经过负载均衡选出具体的 Invoker。所以 Dubbo 负载均衡机制是决定一次服务调用使用哪个提供者的服务。

整体结构

Dubbo 负载均衡的分析入口是 org.apache.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance 抽象类,查看这个类继承关系。

Dubbo 负载均衡的实现

这个被 RandomLoadBalance、LeastActiveLoadBalance、RoundRobinLoadBalance 及 ConsistentHashLoadBalance 类继承,这四个类是 Dubbo 中提供的四种负载均衡算法的实现。

名称 说明
RandomLoadBalance 随机算法,根据权重设置随机的概率
LeastActiveLoadBalance

最少活跃数算法,指请求数和完成数之差使行效率高的服务接收更多请求

RoundRobinLoadBalance 加权轮训算法,根据权重设置轮训比例
ConsistentHashLoadBalance

Hash 一致性算法,相同请求参数分配相同提供者

以上则是 Dubbo 提供的四种负载均衡算法。

从上图中,看到 AbstractLoadBalance 实现了 LoadBalance 接口,同时是一个 SPI 接口,指定默认实现为 RandomLoadBalance 随机算法机制。

Dubbo 负载均衡的实现

抽象类 AbstractLoadBalance 中,实现了负载均衡通用的逻辑,同时给子类声明了一个抽象方法供子类实现其负载均衡的逻辑。

   
  1. public abstract class AbstractLoadBalance implements LoadBalance {

  2. /**

  3. *

  4. * @param 运行时间(毫秒)

  5. * @param 预热时间(毫秒)

  6. * @param 要计算的 Invoker 权重值

  7. */

  8. static int calculateWarmupWeight(int uptime, int warmup, int weight) {

  9. // 计算预热时期的权重

  10. int ww = (int) ((float) uptime / ((float) warmup / (float) weight));

  11. // 返回的权重值区间在: 1 ~ weight

  12. return ww < 1 ? 1 : (ww > weight ? weight : ww);

  13. }


  14. @Override

  15. public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {

  16. // 校验 invokers 是否为空

  17. if (CollectionUtils.isEmpty(invokers)) {

  18. return null;

  19. }

  20. // 当到达负载均衡流程时,invokers 中只有一个 Invoker 时,直接返回该 Invoker

  21. if (invokers.size() == 1) {

  22. return invokers.get(0);

  23. }

  24. // 在不同负载均衡策略中完成具体的实现

  25. return doSelect(invokers, url, invocation);

  26. }


  27. // 声明抽象方法,在子类中具体实现

  28. protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);



  29. protected int getWeight(Invoker<?> invoker, Invocation invocation) {

  30. // 获取当前Invoker配置的权重值

  31. int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);

  32. if (weight > 0) {

  33. // 服务启动时间

  34. long timestamp = invoker.getUrl().getParameter(REMOTE_TIMESTAMP_KEY, 0L);

  35. if (timestamp > 0L) {

  36. // 服务已运行时长

  37. int uptime = (int) (System.currentTimeMillis() - timestamp);

  38. // 服务预热时间,默认 DEFAULT_WARMUP = 10 * 60 * 1000 ,预热十分钟

  39. int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);

  40. // 如果服务运行时长小于预热时长,重新计算出预热时期的权重

  41. if (uptime > 0 && uptime < warmup) {

  42. weight = calculateWarmupWeight(uptime, warmup, weight);

  43. }

  44. }

  45. }

  46. // 保证最后返回的权重值不小于0

  47. return weight >= 0 ? weight : 0;

  48. }


  49. }

在 AbstractLoadBalance 中,getWeight 和 calculateWarmupWeight 方法是获取和计算当前 Invoker 的权重值。

getWeight 中获取当前权重值,通过 URL 获取当前 Invoker 设置的权重,如果当前服务提供者启动时间小于预热时间,则会重新计算权重值,对服务进行降权处理,保证服务能在启动初期不分发设置比例的全部流量,健康运行下去。

calculateWarmupWeight 是重新计算权重值的方法,计算公式为: 服务运行时长/(预热时长/设置的权重值),等价于 (服务运行时长/预热时长)*设置的权重值,同时条件 服务运行时长<预热时长。由该公式可知,预热时长和设置的权重值不变,服务运行时间越长,计算出的值越接近 weight,但不会等于 weight。在返回计算后的权重结果中,对小于1和大于设置的权重值进行了处理,当重新计算后的权重小于1时返回1;处于1和设置的权重值之间时,直接返回计算后的结果;当权重大于设置的权重值时(因为条件限制,不会出现该类情况),返回设置的权重值。所以得出结论:重新计算后的权重值为 1 ~ 设置的权重值,运行时间越长,计算出的权重值越接近设置的权重值

配置方式

服务端

通过 XML 配置方式:

   
  1. <!-- 服务级别配置 -->

  2. <dubbo:service id="xXXXService" interface="top.ytao.service.XXXXService" class="top.ytao.service.impl.XXXXServiceImpl" loadbalance="负载策略" />


  3. <!-- 方法级别配置 -->

  4. <dubbo:service id="xXXXService" interface="top.ytao.service.XXXXService" class="top.ytao.service.impl.XXXXServiceImpl">

  5. <dubbo:method name="方法名" loadbalance="负载策略"/>

  6. </dubbo:service>

通过 Properties 配置:

   
  1. dubbo.service.loadbalance=负载策略

通过注解方式:

   
  1. @Service(loadbalance = "负载策略")

客户端

通过 XML 配置方式:

   
  1. <!-- 服务级别配置 -->

  2. <dubbo:reference id="xXXXService" interface="top.ytao.service.XXXXService" loadbalance="负载策略" />


  3. <!-- 方法级别配置 -->

  4. <dubbo:reference id="xXXXService" interface="top.ytao.service.XXXXService">

  5. <dubbo:method name="方法名" loadbalance="负载策略"/>

  6. </dubbo:reference>

通过 Properties 配置:

   
  1. dubbo.reference.loadbalance=负载策略

通过注解配置方式:

   
  1. @Reference(loadbalance = "负载策略")

实现方式也可通过 Dubbo-Admin 管理后台进行配置,如图:

Dubbo 负载均衡的实现

随机算法

加权随机算法负载均衡策略(RandomLoadBalance)是 dubbo 负载均衡的默认实现方式,根据权重分配各个 Invoker 随机选中的比例。这里的意思是:将到达负载均衡流程的 Invoker 列表中的 权重进行求和,然后求出单个 Invoker 权重在总权重中的占比,随机数就在总权重值的范围内生成。

如图,假如当前有 192.168.1.10和 192.168.1.11两个负载均衡的服务,权重分别为 4、6 ,则它们的被选中的比例为 2/5、3/5。

Dubbo 负载均衡的实现

当生成随机数为 6 时,就会选中 192.168.1.11的服务。

dubbo 中 RandomLoadBalance 的 doSelect 实现代码:

   
  1. public class RandomLoadBalance extends AbstractLoadBalance {


  2. public static final String NAME = "random";


  3. @Override

  4. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

  5. // Invoker 数量

  6. int length = invokers.size();

  7. // 标识所有 Invoker 的权重是否都一样

  8. boolean sameWeight = true;

  9. // 用一个数组保存每个 Invoker 的权重

  10. int[] weights = new int[length];

  11. // 第一个 Invoker 的权重

  12. int firstWeight = getWeight(invokers.get(0), invocation);

  13. weights[0] = firstWeight;

  14. // 求和总权重

  15. int totalWeight = firstWeight;

  16. for (int i = 1; i < length; i++) {

  17. int weight = getWeight(invokers.get(i), invocation);

  18. // 保存每个 Invoker 的权重到数组总

  19. weights[i] = weight;

  20. // 累加求和总权重

  21. totalWeight += weight;

  22. // 如果不是所有 Invoker 的权重都一样,就给标记上 sameWeight = false

  23. if (sameWeight && weight != firstWeight) {

  24. sameWeight = false;

  25. }

  26. }

  27. // 计算随机数取到的 Invoker,条件是必须总权重大于0,并且每个 Invoker 的权重都不一样

  28. if (totalWeight > 0 && !sameWeight) {

  29. // 基于 0~总数 范围内生成随机数

  30. int offset = ThreadLocalRandom.current().nextInt(totalWeight);

  31. // 计算随机数对应的 Invoker

  32. for (int i = 0; i < length; i++) {

  33. offset -= weights[i];

  34. if (offset < 0) {

  35. return invokers.get(i);

  36. }

  37. }

  38. }

  39. // 如果所有 Invoker 的权重都一样则随机从 Invoker 列表中返回一个

  40. return invokers.get(ThreadLocalRandom.current().nextInt(length));

  41. }


  42. }

以上就是加权随机策略的实现,这里比较主要关注计算生成的随机数对应的 Invoker。通过遍历权重数组,生成的数累减当前权重值,当 offset 为 0 时,就表示 offset 对应当前的 Invoker 服务。

以生成的随机数为 6 为例,遍历 Invokers 长度:

  1. 第一轮:offset = 6 - 4 = 2 不满足 offset < 0,继续遍历。


  2. 第二轮:offset = 2 - 6 = -4 满足 offset < 0,返回当前索引对应的 Invoker。因为 offset 返回负数,表示 offset 落在当前 Invoker 权重的区间里。


加权随机策略并非一定按照比例被选到,理论上调用次数越多,分布的比例越接近权重所占的比例。

最少活跃数算法

最小活跃数负载均衡策略(LeastActiveLoadBalance)是从最小活跃数的 Invoker 中进行选择。什么是活跃数呢?活跃数是一个 Invoker 正在处理的请求的数量,当 Invoker 开始处理请求时,会将活跃数加 1,完成请求处理后,将相应 Invoker 的活跃数减 1。找出最小活跃数后,最后根据权重进行选择最终的 Invoker。如果最后找出的最小活跃数相同,则随机从中选中一个 Invoker。

   
  1. public class LeastActiveLoadBalance extends AbstractLoadBalance {


  2. public static final String NAME = "leastactive";


  3. @Override

  4. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

  5. // Invoker 数量

  6. int length = invokers.size();

  7. // 所有 Invoker 中的最小活跃值都是 -1

  8. int leastActive = -1;

  9. // 最小活跃值 Invoker 的数量

  10. int leastCount = 0;

  11. // 最小活跃值 Invoker 在 Invokers 列表中对应的下标位置

  12. int[] leastIndexes = new int[length];

  13. // 保存每个 Invoker 的权重

  14. int[] weights = new int[length];

  15. // 总权重

  16. int totalWeight = 0;

  17. // 第一个最小活跃数的权重

  18. int firstWeight = 0;

  19. // 最小活跃数 Invoker 列表的权重是否一样

  20. boolean sameWeight = true;


  21. // 找出最小活跃数 Invoker 的下标

  22. for (int i = 0; i < length; i++) {

  23. Invoker<T> invoker = invokers.get(i);

  24. // 获取最小活跃数

  25. int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();

  26. // 获取权重

  27. int afterWarmup = getWeight(invoker, invocation);

  28. // 保存权重

  29. weights[i] = afterWarmup;

  30. // 如果当前最小活跃数为-1(-1为最小值)或小于leastActive

  31. if (leastActive == -1 || active < leastActive) {

  32. // 重置最小活跃数

  33. leastActive = active;

  34. // 重置最小活跃数 Invoker 的数量

  35. leastCount = 1;

  36. // 保存当前 Invoker 在 Invokers 列表中的索引至leastIndexes数组中

  37. leastIndexes[0] = i;

  38. // 重置最小活跃数 invoker 的总权重值

  39. totalWeight = afterWarmup;

  40. // 记录当前 Invoker 权重为第一个最小活跃数 Invoker 的权重

  41. firstWeight = afterWarmup;

  42. // 因为当前 Invoker 重置为第一个最小活跃数 Invoker ,所以标识所有最小活跃数 Invoker 权重都一样的值为 true

  43. sameWeight = true;

  44. // 如果当前最小活跃数和已声明的最小活跃数相等

  45. } else if (active == leastActive) {

  46. // 记录当前 Invoker 的位置

  47. leastIndexes[leastCount++] = i;

  48. // 累加当前 Invoker 权重到总权重中

  49. totalWeight += afterWarmup;

  50. // 如果当前权重与firstWeight不相等,则将 sameWeight 改为 false

  51. if (sameWeight && i > 0

  52. && afterWarmup != firstWeight) {

  53. sameWeight = false;

  54. }

  55. }

  56. }

  57. // 如果最小活跃数 Invoker 只有一个,直接返回该 Invoker

  58. if (leastCount == 1) {

  59. return invokers.get(leastIndexes[0]);

  60. }

  61. if (!sameWeight && totalWeight > 0) {

  62. // 根据权重随机从最小活跃数 Invoker 列表中选择一个

  63. int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);

  64. for (int i = 0; i < leastCount; i++) {

  65. int leastIndex = leastIndexes[i];

  66. offsetWeight -= weights[leastIndex];

  67. if (offsetWeight < 0) {

  68. return invokers.get(leastIndex);

  69. }

  70. }

  71. }

  72. // 如果所有 Invoker 的权重都一样则随机从 Invoker 列表中返回一个

  73. return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);

  74. }

  75. }

这段代码的整个逻辑就是,从 Invokers 列表中筛选出最小活跃数的 Invoker,然后类似加权随机算法策略方式选择最终的 Invoker 服务。

轮询算法

加权轮询负载均衡策略(RoundRobinLoadBalance)是基于权重来决定轮询的比例。普通轮询会将请求均匀的分布在每个节点,但不能很好调节不同性能服务器的请求处理,所以加权负载均衡来根据权重在轮询机制中分配相对应的请求比例给每台服务器。

   
  1. public class RoundRobinLoadBalance extends AbstractLoadBalance {

  2. public static final String NAME = "roundrobin";


  3. private static final int RECYCLE_PERIOD = 60000;


  4. protected static class WeightedRoundRobin {

  5. private int weight;

  6. private AtomicLong current = new AtomicLong(0);

  7. private long lastUpdate;

  8. public int getWeight() {

  9. return weight;

  10. }

  11. public void setWeight(int weight) {

  12. this.weight = weight;

  13. current.set(0);

  14. }

  15. public long increaseCurrent() {

  16. return current.addAndGet(weight);

  17. }

  18. public void sel(int total) {

  19. current.addAndGet(-1 * total);

  20. }

  21. public long getLastUpdate() {

  22. return lastUpdate;

  23. }

  24. public void setLastUpdate(long lastUpdate) {

  25. this.lastUpdate = lastUpdate;

  26. }

  27. }


  28. private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();

  29. private AtomicBoolean updateLock = new AtomicBoolean();


  30. /**

  31. * get invoker addr list cached for specified invocation

  32. * <p>

  33. * <b>for unit test only</b>

  34. *

  35. * @param invokers

  36. * @param invocation

  37. * @return

  38. */

  39. protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {

  40. String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();

  41. Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);

  42. if (map != null) {

  43. return map.keySet();

  44. }

  45. return null;

  46. }


  47. @Override

  48. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

  49. // key 为 接口名+方法名

  50. String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();

  51. // 查看缓存中是否存在相应服务接口的信息,如果没有则新添加一个元素到缓存中

  52. ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);

  53. if (map == null) {

  54. methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());

  55. map = methodWeightMap.get(key);

  56. }

  57. // 总权重

  58. int totalWeight = 0;

  59. long maxCurrent = Long.MIN_VALUE;

  60. // 当前时间戳

  61. long now = System.currentTimeMillis();

  62. // 最大 current 的 Invoker

  63. Invoker<T> selectedInvoker = null;

  64. // 保存选中的 WeightedRoundRobin 对象

  65. WeightedRoundRobin selectedWRR = null;

  66. // 遍历 Invokers 列表

  67. for (Invoker<T> invoker : invokers) {

  68. // 从缓存中获取 WeightedRoundRobin 对象

  69. String identifyString = invoker.getUrl().toIdentityString();

  70. WeightedRoundRobin weightedRoundRobin = map.get(identifyString);

  71. // 获取当前 Invoker 对象

  72. int weight = getWeight(invoker, invocation);


  73. // 如果当前 Invoker 没有对应的 WeightedRoundRobin 对象,则新增一个

  74. if (weightedRoundRobin == null) {

  75. weightedRoundRobin = new WeightedRoundRobin();

  76. weightedRoundRobin.setWeight(weight);

  77. map.putIfAbsent(identifyString, weightedRoundRobin);

  78. }

  79. // 如果当前 Invoker 权重不等于对应的 WeightedRoundRobin 对象中的权重,则重新设置当前权重到对应的 WeightedRoundRobin 对象中

  80. if (weight != weightedRoundRobin.getWeight()) {

  81. weightedRoundRobin.setWeight(weight);

  82. }

  83. // 累加权重到 current 中

  84. long cur = weightedRoundRobin.increaseCurrent();

  85. // 设置 weightedRoundRobin 对象最后更新时间

  86. weightedRoundRobin.setLastUpdate(now);

  87. // 最大 current 的 Invoker,并赋值给相应的变量

  88. if (cur > maxCurrent) {

  89. maxCurrent = cur;

  90. selectedInvoker = invoker;

  91. selectedWRR = weightedRoundRobin;

  92. }

  93. // 累加权重到总权重中

  94. totalWeight += weight;

  95. }

  96. // 如果 Invokers 列表中的数量不等于缓存map中的数量

  97. if (!updateLock.get() && invokers.size() != map.size()) {

  98. if (updateLock.compareAndSet(false, true)) {

  99. try {

  100. // 拷贝 map 到 newMap 中

  101. ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();

  102. newMap.putAll(map);

  103. // newMap 转化为 Iterator

  104. Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();

  105. // 循环删除超过设定时长没更新的缓存

  106. while (it.hasNext()) {

  107. Entry<String, WeightedRoundRobin> item = it.next();

  108. if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {

  109. it.remove();

  110. }

  111. }

  112. // 将当前newMap服务缓存中

  113. methodWeightMap.put(key, newMap);

  114. } finally {

  115. updateLock.set(false);

  116. }

  117. }

  118. }

  119. // 如果存在被选中的 Invoker

  120. if (selectedInvoker != null) {

  121. // 计算 current = current - totalWeight

  122. selectedWRR.sel(totalWeight);

  123. return selectedInvoker;

  124. }

  125. // 正常情况这里不会到达

  126. return invokers.get(0);

  127. }


  128. }

上面选中 Invoker 逻辑为:每个 Invoker 都有一个 current 值,初始值为自身权重。在每个 Invoker 中 current=current+weight。遍历完 Invoker 后,current 最大的那个 Invoker 就是本次选中的 Invoker。选中 Invoker 后,将本次 current 值计算 current=current-totalWeight

以上面 192.168.1.10和 192.168.1.11两个负载均衡的服务,权重分别为 4、6 。基于选中前 current=current+weight、选中后 current=current-totalWeight计算公式得出如下

请求次数 选中前 current 选中后 current 被选中服务
1 [4, 6] [4, -4] 192.168.1.11
2 [8, 2] [-2, 2] 192.168.1.10
3 [2, 8] [2, -2] 192.168.1.11
4 [6, 4] [-4, 4] 192.168.1.10
5 [0, 10] [0, 0] 192.168.1.11

一致性 Hash 算法

一致性 Hash 负载均衡策略(ConsistentHashLoadBalance)是让参数相同的请求分配到同一机器上。把每个服务节点分布在一个环上,请求也分布在环形中。以请求在环上的位置,顺时针寻找换上第一个服务节点。如图所示:

Dubbo 负载均衡的实现

同时,为避免请求散列不均匀,dubbo 中会将每个 Invoker 再虚拟多个节点出来,使得请求调用更加均匀。

一致性 Hash 修改配置如下:

   
  1. <!-- dubbo 默认只对第一个参数进行 hash 标识,指定hash参数 -->

  2. <dubbo:parameter key="hash.arguments" value="1" />


  3. <!-- 虚拟节点数量 -->

  4. <dubbo:parameter key="hash.nodes" value="200" />

一致性 Hash 实现如下:

   
  1. public class ConsistentHashLoadBalance extends AbstractLoadBalance {

  2. public static final String NAME = "consistenthash";


  3. /**

  4. * Hash nodes name

  5. */

  6. public static final String HASH_NODES = "hash.nodes";


  7. /**

  8. * Hash arguments name

  9. */

  10. public static final String HASH_ARGUMENTS = "hash.arguments";


  11. private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();


  12. @SuppressWarnings("unchecked")

  13. @Override

  14. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

  15. // 获取请求的方法名

  16. String methodName = RpcUtils.getMethodName(invocation);

  17. // key = 接口名+方法名

  18. String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;

  19. // invokers 的 hashcode

  20. int identityHashCode = System.identityHashCode(invokers);

  21. // 查看缓存中是否存在对应 key 的数据,或 Invokers 列表是否有过变动。如果没有,则新添加到缓存中,并且返回负载均衡得出的 Invoker

  22. ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);

  23. if (selector == null || selector.identityHashCode != identityHashCode) {

  24. selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));

  25. selector = (ConsistentHashSelector<T>) selectors.get(key);

  26. }

  27. return selector.select(invocation);

  28. }


  29. // ConsistentHashSelector class ...

  30. }

doSelect 中主要实现缓存检查和 Invokers 变动检查,一致性 hash 负载均衡的实现在这个内部类 ConsistentHashSelector 中实现。

   
  1. private static final class ConsistentHashSelector<T> {


  2. // 存储虚拟节点

  3. private final TreeMap<Long, Invoker<T>> virtualInvokers;


  4. // 节点数

  5. private final int replicaNumber;


  6. // invoker 列表的 hashcode,用来判断 Invoker 列表是否变化

  7. private final int identityHashCode;


  8. // 请求中用来作Hash映射的参数的索引

  9. private final int[] argumentIndex;


  10. ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {

  11. this.virtualInvokers = new TreeMap<Long, Invoker<T>>();

  12. this.identityHashCode = identityHashCode;

  13. URL url = invokers.get(0).getUrl();

  14. // 获取节点数

  15. this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);

  16. // 获取配置中的 参数索引

  17. String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));

  18. argumentIndex = new int[index.length];

  19. for (int i = 0; i < index.length; i++) {

  20. argumentIndex[i] = Integer.parseInt(index[i]);

  21. }


  22. for (Invoker<T> invoker : invokers) {

  23. // 获取 Invoker 中的地址,包括端口号

  24. String address = invoker.getUrl().getAddress();

  25. // 创建虚拟节点

  26. for (int i = 0; i < replicaNumber / 4; i++) {

  27. byte[] digest = md5(address + i);

  28. for (int h = 0; h < 4; h++) {

  29. long m = hash(digest, h);

  30. virtualInvokers.put(m, invoker);

  31. }

  32. }

  33. }

  34. }


  35. // 找出 Invoker

  36. public Invoker<T> select(Invocation invocation) {

  37. // 将参数转为字符串

  38. String key = toKey(invocation.getArguments());

  39. // 字符串参数转换为 md5

  40. byte[] digest = md5(key);

  41. // 根据 md5 找出 Invoker

  42. return selectForKey(hash(digest, 0));

  43. }


  44. // 将参数拼接成字符串

  45. private String toKey(Object[] args) {

  46. StringBuilder buf = new StringBuilder();

  47. for (int i : argumentIndex) {

  48. if (i >= 0 && i < args.length) {

  49. buf.append(args[i]);

  50. }

  51. }

  52. return buf.toString();

  53. }


  54. // 利用 md5 匹配到对应的 Invoker

  55. private Invoker<T> selectForKey(long hash) {

  56. // 找到第一个大于当前 hash 的 Invoker

  57. Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);

  58. if (entry == null) {

  59. entry = virtualInvokers.firstEntry();

  60. }

  61. return entry.getValue();

  62. }


  63. // hash 运算

  64. private long hash(byte[] digest, int number) {

  65. return (((long) (digest[3 + number * 4] & 0xFF) << 24)

  66. | ((long) (digest[2 + number * 4] & 0xFF) << 16)

  67. | ((long) (digest[1 + number * 4] & 0xFF) << 8)

  68. | (digest[number * 4] & 0xFF))

  69. & 0xFFFFFFFFL;

  70. }


  71. // md5 运算

  72. private byte[] md5(String value) {

  73. MessageDigest md5;

  74. try {

  75. md5 = MessageDigest.getInstance("MD5");

  76. } catch (NoSuchAlgorithmException e) {

  77. throw new IllegalStateException(e.getMessage(), e);

  78. }

  79. md5.reset();

  80. byte[] bytes = value.getBytes(StandardCharsets.UTF_8);

  81. md5.update(bytes);

  82. return md5.digest();

  83. }


  84. }

一致 hash 实现过程就是先创建好虚拟节点,虚拟节点保存在 TreeMap 中。TreeMap 的 key 为配置的参数先进行 md5 运算,然后将 md5 值进行 hash 运算。TreeMap 的 value 为被选中的 Invoker。

最后请求时,计算参数的 hash 值,去从 TreeMap 中获取 Invoker。

总结

Dubbo 负载均衡的实现,技巧上还是比较优雅,可以多多学习其编码思维。在研究其代码时,需要仔细研究其实现原理,否则比较难懂其思想。

特别推荐一个分享架构+算法的优质内容,还没关注的小伙伴,可以长按关注一下:

Dubbo 负载均衡的实现

长按订阅更多精彩▼

Dubbo 负载均衡的实现

如有收获,点个在看,诚挚感谢

免责声明:本文内容由21ic获得授权后发布,版权归原作者所有,本平台仅提供信息存储服务。文章仅代表作者个人观点,不代表本平台立场,如有问题,请联系我们,谢谢!

本站声明: 本文章由作者或相关机构授权发布,目的在于传递更多信息,并不代表本站赞同其观点,本站亦不保证或承诺内容真实性等。需要转载请联系该专栏作者,如若文章内容侵犯您的权益,请及时联系本站删除。
换一批
延伸阅读

9月2日消息,不造车的华为或将催生出更大的独角兽公司,随着阿维塔和赛力斯的入局,华为引望愈发显得引人瞩目。

关键字: 阿维塔 塞力斯 华为

加利福尼亚州圣克拉拉县2024年8月30日 /美通社/ -- 数字化转型技术解决方案公司Trianz今天宣布,该公司与Amazon Web Services (AWS)签订了...

关键字: AWS AN BSP 数字化

伦敦2024年8月29日 /美通社/ -- 英国汽车技术公司SODA.Auto推出其旗舰产品SODA V,这是全球首款涵盖汽车工程师从创意到认证的所有需求的工具,可用于创建软件定义汽车。 SODA V工具的开发耗时1.5...

关键字: 汽车 人工智能 智能驱动 BSP

北京2024年8月28日 /美通社/ -- 越来越多用户希望企业业务能7×24不间断运行,同时企业却面临越来越多业务中断的风险,如企业系统复杂性的增加,频繁的功能更新和发布等。如何确保业务连续性,提升韧性,成...

关键字: 亚马逊 解密 控制平面 BSP

8月30日消息,据媒体报道,腾讯和网易近期正在缩减他们对日本游戏市场的投资。

关键字: 腾讯 编码器 CPU

8月28日消息,今天上午,2024中国国际大数据产业博览会开幕式在贵阳举行,华为董事、质量流程IT总裁陶景文发表了演讲。

关键字: 华为 12nm EDA 半导体

8月28日消息,在2024中国国际大数据产业博览会上,华为常务董事、华为云CEO张平安发表演讲称,数字世界的话语权最终是由生态的繁荣决定的。

关键字: 华为 12nm 手机 卫星通信

要点: 有效应对环境变化,经营业绩稳中有升 落实提质增效举措,毛利润率延续升势 战略布局成效显著,战新业务引领增长 以科技创新为引领,提升企业核心竞争力 坚持高质量发展策略,塑强核心竞争优势...

关键字: 通信 BSP 电信运营商 数字经济

北京2024年8月27日 /美通社/ -- 8月21日,由中央广播电视总台与中国电影电视技术学会联合牵头组建的NVI技术创新联盟在BIRTV2024超高清全产业链发展研讨会上宣布正式成立。 活动现场 NVI技术创新联...

关键字: VI 传输协议 音频 BSP

北京2024年8月27日 /美通社/ -- 在8月23日举办的2024年长三角生态绿色一体化发展示范区联合招商会上,软通动力信息技术(集团)股份有限公司(以下简称"软通动力")与长三角投资(上海)有限...

关键字: BSP 信息技术
关闭
关闭