1、接口 IRule

public interface IRule{//选择serverpublic Server choose(Object key);//设置LoadBalancerpublic void setLoadBalancer(ILoadBalancer lb);//获取LoadBalancerpublic ILoadBalancer getLoadBalancer();
}

2、接口的抽象实现类 AbstractLoadBalancerRule

public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {private ILoadBalancer lb;@Overridepublic void setLoadBalancer(ILoadBalancer lb){this.lb = lb;}@Overridepublic ILoadBalancer getLoadBalancer(){return lb;}
}

上一篇中,IRule是在 BaseLoadBalancer 中做了一个绑定,在这里,在接口中设置了Loadbalancer

3、轮询 RoundRobinRule

public class RoundRobinRule extends AbstractLoadBalancerRule {//下一个计数private AtomicInteger nextServerCyclicCounter;private static final boolean AVAILABLE_ONLY_SERVERS = true;private static final boolean ALL_SERVERS = false;private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);public RoundRobinRule() {nextServerCyclicCounter = new AtomicInteger(0);}public RoundRobinRule(ILoadBalancer lb) {this();setLoadBalancer(lb);}public Server choose(ILoadBalancer lb, Object key) {if (lb == null) {log.warn("no load balancer");return null;}Server server = null;//尝试次数int count = 0;while (server == null && count++ < 10) {//可用的serversList<Server> reachableServers = lb.getReachableServers();//所有的serversList<Server> allServers = lb.getAllServers();int upCount = reachableServers.size();int serverCount = allServers.size();if ((upCount == 0) || (serverCount == 0)) {log.warn("No up servers available from load balancer: " + lb);return null;}//获取server,递增计数并取模int nextServerIndex = incrementAndGetModulo(serverCount);server = allServers.get(nextServerIndex);if (server == null) {/* Transient. */Thread.yield();continue;}if (server.isAlive() && (server.isReadyToServe())) {return (server);}// Next.server = null;}//尝试10次还没有获取到server,则返回空if (count >= 10) {log.warn("No available alive servers after 10 tries from load balancer: "+ lb);}return server;}//递增计数并取模private int incrementAndGetModulo(int modulo) {for (;;) {int current = nextServerCyclicCounter.get();int next = (current + 1) % modulo;if (nextServerCyclicCounter.compareAndSet(current, next))return next;}}@Overridepublic Server choose(Object key) {return choose(getLoadBalancer(), key);}@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {}
}

4、权重轮询WeightedResponseTimeRule

将server的应答时间的平均值或者百分比作为权重

public class WeightedResponseTimeRule extends RoundRobinRule {public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = new IClientConfigKey<Integer>() {@Overridepublic String key() {return "ServerWeightTaskTimerInterval";}@Overridepublic String toString() {return key();}@Overridepublic Class<Integer> type() {return Integer.class;}};//默认时间周期 30秒public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;//权重计算定时时间 默认 30秒private int serverWeightTaskTimerInterval = DEFAULT_TIMER_INTERVAL;private static final Logger logger = LoggerFactory.getLogger(WeightedResponseTimeRule.class);//服务的权重值,后面的会叠加前面的private volatile List<Double> accumulatedWeights = new ArrayList<Double>();private final Random random = new Random();protected Timer serverWeightTimer = null;protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);String name = "unknown";public WeightedResponseTimeRule() {super();}public WeightedResponseTimeRule(ILoadBalancer lb) {super(lb);}@Overridepublic void setLoadBalancer(ILoadBalancer lb) {super.setLoadBalancer(lb);if (lb instanceof BaseLoadBalancer) {name = ((BaseLoadBalancer) lb).getName();}initialize(lb);}void initialize(ILoadBalancer lb) {        if (serverWeightTimer != null) {serverWeightTimer.cancel();}serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"+ name, true);serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,serverWeightTaskTimerInterval);// do a initial runServerWeight sw = new ServerWeight();sw.maintainWeights();Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {public void run() {logger.info("Stopping NFLoadBalancer-serverWeightTimer-"+ name);serverWeightTimer.cancel();}}));}public void shutdown() {if (serverWeightTimer != null) {logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + name);serverWeightTimer.cancel();}}List<Double> getAccumulatedWeights() {return Collections.unmodifiableList(accumulatedWeights);}//选择server@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")@Overridepublic Server choose(ILoadBalancer lb, Object key) {if (lb == null) {return null;}Server server = null;while (server == null) {//获取权重值List<Double> currentWeights = accumulatedWeights;if (Thread.interrupted()) {return null;}//获取所有serverList<Server> allList = lb.getAllServers();int serverCount = allList.size();if (serverCount == 0) {return null;}int serverIndex = 0;//权重值集合的最后一个,代表权重值得和double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); // 没有server或者权重值没有初始化,退化到 轮询 ruleif (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {server =  super.choose(getLoadBalancer(), key);if(server == null) {return server;}} else {// 创建一个随机值,0~maxTotalWeightdouble randomWeight = random.nextDouble() * maxTotalWeight;// pick the server index based on the randomIndexint n = 0;//选择随机数小于等于权重值的serverfor (Double d : currentWeights) {if (d >= randomWeight) {serverIndex = n;break;} else {n++;}}server = allList.get(serverIndex);}if (server == null) {/* Transient. */Thread.yield();continue;}if (server.isAlive()) {return (server);}// Next.server = null;}return server;}//权重值更新定时器class DynamicServerWeightTask extends TimerTask {public void run() {ServerWeight serverWeight = new ServerWeight();try {serverWeight.maintainWeights();} catch (Exception e) {logger.error("Error running DynamicServerWeightTask for {}", name, e);}}}//权重值更新逻辑class ServerWeight {public void maintainWeights() {//获取负载均衡器ILoadBalancer lb = getLoadBalancer();if (lb == null) {return;}if (!serverWeightAssignmentInProgress.compareAndSet(false,  true))  {return; }try {logger.info("Weight adjusting job started");AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;//负载均衡器的状态LoadBalancerStats stats = nlb.getLoadBalancerStats();if (stats == null) {// no statistics, nothing to doreturn;}//计算所有server的平均相应时间的综合double totalResponseTime = 0;for (Server server : nlb.getAllServers()) {// 获取服务状态ServerStats ss = stats.getSingleServerStat(server);totalResponseTime += ss.getResponseTimeAvg();}// 计算每个server的权重//weightSoFar + totalResponseTime - server的平均响应时间Double weightSoFar = 0.0;List<Double> finalWeights = new ArrayList<Double>();for (Server server : nlb.getAllServers()) {ServerStats ss = stats.getSingleServerStat(server);double weight = totalResponseTime - ss.getResponseTimeAvg();weightSoFar += weight;finalWeights.add(weightSoFar);   }setWeights(finalWeights);} catch (Exception e) {logger.error("Error calculating server weights", e);} finally {serverWeightAssignmentInProgress.set(false);}}}void setWeights(List<Double> weights) {this.accumulatedWeights = weights;}@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {super.initWithNiwsConfig(clientConfig);serverWeightTaskTimerInterval = clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, DEFAULT_TIMER_INTERVAL);}}

5、ClientConfigEnabledRoundRobinRule

public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {RoundRobinRule roundRobinRule = new RoundRobinRule();@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {roundRobinRule = new RoundRobinRule();}@Overridepublic void setLoadBalancer(ILoadBalancer lb) {super.setLoadBalancer(lb);roundRobinRule.setLoadBalancer(lb);}@Overridepublic Server choose(Object key) {if (roundRobinRule != null) {return roundRobinRule.choose(key);} else {throw new IllegalArgumentException("This class has not been initialized with the RoundRobinRule class");}}}

这个策略什么也没有做,直接依赖了一个RoundRobinRule,但是它是这些策略的基础类,提供了一个兜底能力。

BestAvailableRule,PredicateBasedRule,ZoneAvoidanceRule,AvailabilityFilteringRule。

6、BestAvailableRule 最小连接数

public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {private LoadBalancerStats loadBalancerStats;@Overridepublic Server choose(Object key) {if (loadBalancerStats == null) {return super.choose(key);}List<Server> serverList = getLoadBalancer().getAllServers();int minimalConcurrentConnections = Integer.MAX_VALUE;long currentTime = System.currentTimeMillis();Server chosen = null;//遍历所有serverfor (Server server: serverList) {//获取server的状态ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);//如果没有熔断if (!serverStats.isCircuitBreakerTripped(currentTime)) {//当前server连接数int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);//判断当前可用server的连接数是否小于最小连接数//将最小连接数赋值当前连接数,循环后得到最小连接的serverif (concurrentConnections < minimalConcurrentConnections) {minimalConcurrentConnections = concurrentConnections;chosen = server;}}}//如果没有选择出server,则直接调用父类的方法选择serverif (chosen == null) {return super.choose(key);} else {return chosen;}}@Overridepublic void setLoadBalancer(ILoadBalancer lb) {super.setLoadBalancer(lb);if (lb instanceof AbstractLoadBalancer) {loadBalancerStats = ((AbstractLoadBalancer) lb).getLoadBalancerStats();            }}
}

7、PredicateBasedRule 断言基础策略

public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {//获取断言public abstract AbstractServerPredicate getPredicate();//选择server,在过滤后进行轮询@Overridepublic Server choose(Object key) {ILoadBalancer lb = getLoadBalancer();Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);if (server.isPresent()) {return server.get();} else {return null;}       }
}

8、AvailabilityFilteringRule 可用性过滤

public class AvailabilityFilteringRule extends PredicateBasedRule {    private AbstractServerPredicate predicate;public AvailabilityFilteringRule() {super();predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null)).addFallbackPredicate(AbstractServerPredicate.alwaysTrue()).build();}@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig)).addFallbackPredicate(AbstractServerPredicate.alwaysTrue()).build();}//获得可用服务的数量@Monitor(name="AvailableServersCount", type = DataSourceType.GAUGE)public int getAvailableServersCount() {ILoadBalancer lb = getLoadBalancer();List<Server> servers = lb.getAllServers();if (servers == null) {return 0;}return Collections2.filter(servers, predicate.getServerOnlyPredicate()).size();}//用轮询策略选择server,再判断是否可用,最大尝试十次@Overridepublic Server choose(Object key) {int count = 0;Server server = roundRobinRule.choose(key);while (count++ <= 10) {//predicate.apply 是否不用跳过if (predicate.apply(new PredicateKey(server))) {return server;}server = roundRobinRule.choose(key);}return super.choose(key);}@Overridepublic AbstractServerPredicate getPredicate() {return predicate;}
}

9、ZoneAvoidanceRule

public class ZoneAvoidanceRule extends PredicateBasedRule {private static final Random random = new Random();private CompositePredicate compositePredicate;public ZoneAvoidanceRule() {super();ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);}private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {return CompositePredicate.withPredicates(p1, p2).addFallbackPredicate(p2).addFallbackPredicate(AbstractServerPredicate.alwaysTrue()).build();}@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);}static Map<String, ZoneSnapshot> createSnapshot(LoadBalancerStats lbStats) {Map<String, ZoneSnapshot> map = new HashMap<String, ZoneSnapshot>();for (String zone : lbStats.getAvailableZones()) {ZoneSnapshot snapshot = lbStats.getZoneSnapshot(zone);map.put(zone, snapshot);}return map;}static String randomChooseZone(Map<String, ZoneSnapshot> snapshot,Set<String> chooseFrom) {if (chooseFrom == null || chooseFrom.size() == 0) {return null;}String selectedZone = chooseFrom.iterator().next();if (chooseFrom.size() == 1) {return selectedZone;}int totalServerCount = 0;for (String zone : chooseFrom) {totalServerCount += snapshot.get(zone).getInstanceCount();}int index = random.nextInt(totalServerCount) + 1;int sum = 0;for (String zone : chooseFrom) {sum += snapshot.get(zone).getInstanceCount();if (index <= sum) {selectedZone = zone;break;}}return selectedZone;}//获取可用区域public static Set<String> getAvailableZones(Map<String, ZoneSnapshot> snapshot, double triggeringLoad,double triggeringBlackoutPercentage) {if (snapshot.isEmpty()) {return null;}//获取可用区域名称Set<String> availableZones = new HashSet<String>(snapshot.keySet());if (availableZones.size() == 1) {return availableZones;}Set<String> worstZones = new HashSet<String>();double maxLoadPerServer = 0;boolean limitedZoneAvailability = false;//遍历区域快照for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {//区域名称String zone = zoneEntry.getKey();//区域快照ZoneSnapshot zoneSnapshot = zoneEntry.getValue();//区域实例int instanceCount = zoneSnapshot.getInstanceCount();//该区域没有服务,移除if (instanceCount == 0) {availableZones.remove(zone);limitedZoneAvailability = true;} else {double loadPerServer = zoneSnapshot.getLoadPerServer();//该区域服务的负载小于0,或者服务的故障率大于等于阈值,移除if (((double) zoneSnapshot.getCircuitTrippedCount())/ instanceCount >= triggeringBlackoutPercentage|| loadPerServer < 0) {availableZones.remove(zone);limitedZoneAvailability = true;} else {//选出平均负载最差的区域if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {worstZones.add(zone);} else if (loadPerServer > maxLoadPerServer) {maxLoadPerServer = loadPerServer;worstZones.clear();worstZones.add(zone);}}}}//最大负载小于阈值并且没有被限制的区域if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {// zone override is not needed herereturn availableZones;}//从最差区域中轮询一个,然后从可用区域中移除String zoneToAvoid = randomChooseZone(snapshot, worstZones);if (zoneToAvoid != null) {availableZones.remove(zoneToAvoid);}return availableZones;}//获取可用的区域public static Set<String> getAvailableZones(LoadBalancerStats lbStats,double triggeringLoad, double triggeringBlackoutPercentage) {if (lbStats == null) {return null;}//生成区域快照Map<String, ZoneSnapshot> snapshot = createSnapshot(lbStats);return getAvailableZones(snapshot, triggeringLoad,triggeringBlackoutPercentage);}@Overridepublic AbstractServerPredicate getPredicate() {return compositePredicate;}
}

Spring Cloud Ribbon 负载均衡策略相关推荐

  1. Spring Cloud Ribbon负载均衡策略详解

    通过之前的文章可以知道, Ribbon负载均衡器选择服务实例的方式是通过"选择策略"实现的, Ribbon实现了很多种选择策略,UML静态类图如上图. IRule是负载均衡的策略接 ...

  2. Spring Cloud Ribbon 负载均衡客户端调用示例

    承接 Spring Cloud 最简入门示例 这一篇, 本篇演示如何使用负载均衡客户端 Ribbon 调用在Eureka注册的服务. Ribbon 是什么? Ribbon是Netflix 的开源项目, ...

  3. spring cloud中通过配置文件自定义Ribbon负载均衡策略

    2019独角兽企业重金招聘Python工程师标准>>> spring cloud中通过配置文件自定义Ribbon负载均衡策略 博客分类: 微服务 一.Ribbon中的负载均衡策略 1 ...

  4. Spring Cloud的负载均衡Spring Cloud Ribbon和Spring Cloud Feign

    一.客户端负载均衡:Spring Cloud Ribbon. Spring Cloud Ribbon是基于HTTP和TCP的客户端负载工具,它是基于Netflix Ribbon实现的.通过Spring ...

  5. SpringCloud组件:Ribbon负载均衡策略及执行原理!

    大家好,我是磊哥. 今天我们来看下微服务中非常重要的一个组件:Ribbon.它作为负载均衡器在分布式网络中扮演着非常重要的角色. 本篇主要内容如下: 在介绍 Ribbon 之前,不得不说下负载均衡这个 ...

  6. 【云原生微服务八】Ribbon负载均衡策略之WeightedResponseTimeRule源码剖析(响应时间加权)

    文章目录 一.前言 二.WeightedResponseTimeRule 1.计算权重? 1)如何更新权重? 2)如何计算权重? 3)例证权重的计算 2.权重的使用 1)权重区间问题? 一.前言 前置 ...

  7. Ribbon负载均衡策略-3

    Ribbon负载均衡策略-3 常用策略 1.RoundRobinRule 轮转调度策略 2.RetryRule重试规律策略 3.RandomRule随机策略 Ribbon的负载均衡策略是由IRule接 ...

  8. Spring Cloud Alibaba 负载均衡:Ribbon 如何保证微服务的高可用

    上一讲我们对 Nacos 的集群环境与实现原理进行了讲解,我们已经可以轻松将单个微服务接入到 Nacos 进行注册,但是微服务本不是孤岛,如何实现有效的服务间稳定通信是本文即将介绍的主要内容,本次我们 ...

  9. 客户端负载均衡Ribbon之一:Spring Cloud Netflix负载均衡组件Ribbon介绍

    Netflix:['netfliːks] ribbon:英[ˈrɪbən]美[ˈrɪbən] n. 带; 绶带; (打印机的) 色带; 带状物; v. 把-撕成条带; 用缎带装饰; 形成带状;     ...

最新文章

  1. 删除结果集中字段重复的方法
  2. boost::signals2模块实现连接类测试
  3. struts中如何查看配置文件中是否存在某个返回值
  4. mc显示服务器生命值,[1.7-1.8]CombatIndicator — 全息显示攻击伤害的数值 让我的世界服务器更有游戏感...
  5. Leetcode周赛复盘——第 278 场力扣周赛
  6. Functional ProgrammingLazy Code:被我忘记的迭代器
  7. 工业机器人 答案 韩建海_不可或缺:协作机器人对于制造业转型升级的意义
  8. linux添加定时器防抖,linux驱动2.3按键中断-定时器防抖
  9. 扔盘子(51Nod-1279)
  10. quartus仿真10:74283的基本功能
  11. 【Git入门之二】基本术语
  12. 如何枚举系统的视音频采集设备
  13. 二、通用、布局、导航组件
  14. android ionic框架,移动App开发框架—Ionic
  15. module 与 component 的区别
  16. P163、面试题29:数组中出现次数超过一半的数字
  17. DC/DC电路自举电容作用
  18. dwg在坐标转换的注意事项
  19. CarSim联合simulink仿真横向控制
  20. iOS开发之网络编程SocKet

热门文章

  1. python长的横线怎么打_关于Python的前后、单双下划线作用,看完这篇文章,吊打面试官!...
  2. 浏览器原理 17 # 宏任务和微任务
  3. node.js 中Mysql 查询报错 ,解决方法。
  4. js中的reduce的用法
  5. 阿里云网盘公测_号称“永不限速”的阿里云网盘开始公测预约!或将支持度盘一键迁移?...
  6. jdk高版本向下兼容
  7. java mapping_05.Java属性映射的正确姿势
  8. matlab仿真对比图,怎样在hfss里对比几个仿真出来的图?
  9. 阿里云PTS FQA
  10. [中英文对照]常见大学课程名称翻译