Spring Cloud Ribbon 负载均衡策略
1、接口 IRule
public interface IRule{//选择serverpublic Server choose(Object key);//设置LoadBalancerpublic void setLoadBalancer(ILoadBalancer lb);//获取LoadBalancerpublic ILoadBalancer getLoadBalancer();
}
2、接口的抽象实现类 AbstractLoadBalancerRule
public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {private ILoadBalancer lb;@Overridepublic void setLoadBalancer(ILoadBalancer lb){this.lb = lb;}@Overridepublic ILoadBalancer getLoadBalancer(){return lb;}
}
上一篇中,IRule是在 BaseLoadBalancer 中做了一个绑定,在这里,在接口中设置了Loadbalancer
3、轮询 RoundRobinRule
public class RoundRobinRule extends AbstractLoadBalancerRule {//下一个计数private AtomicInteger nextServerCyclicCounter;private static final boolean AVAILABLE_ONLY_SERVERS = true;private static final boolean ALL_SERVERS = false;private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);public RoundRobinRule() {nextServerCyclicCounter = new AtomicInteger(0);}public RoundRobinRule(ILoadBalancer lb) {this();setLoadBalancer(lb);}public Server choose(ILoadBalancer lb, Object key) {if (lb == null) {log.warn("no load balancer");return null;}Server server = null;//尝试次数int count = 0;while (server == null && count++ < 10) {//可用的serversList<Server> reachableServers = lb.getReachableServers();//所有的serversList<Server> allServers = lb.getAllServers();int upCount = reachableServers.size();int serverCount = allServers.size();if ((upCount == 0) || (serverCount == 0)) {log.warn("No up servers available from load balancer: " + lb);return null;}//获取server,递增计数并取模int nextServerIndex = incrementAndGetModulo(serverCount);server = allServers.get(nextServerIndex);if (server == null) {/* Transient. */Thread.yield();continue;}if (server.isAlive() && (server.isReadyToServe())) {return (server);}// Next.server = null;}//尝试10次还没有获取到server,则返回空if (count >= 10) {log.warn("No available alive servers after 10 tries from load balancer: "+ lb);}return server;}//递增计数并取模private int incrementAndGetModulo(int modulo) {for (;;) {int current = nextServerCyclicCounter.get();int next = (current + 1) % modulo;if (nextServerCyclicCounter.compareAndSet(current, next))return next;}}@Overridepublic Server choose(Object key) {return choose(getLoadBalancer(), key);}@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {}
}
4、权重轮询WeightedResponseTimeRule
将server的应答时间的平均值或者百分比作为权重
public class WeightedResponseTimeRule extends RoundRobinRule {public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = new IClientConfigKey<Integer>() {@Overridepublic String key() {return "ServerWeightTaskTimerInterval";}@Overridepublic String toString() {return key();}@Overridepublic Class<Integer> type() {return Integer.class;}};//默认时间周期 30秒public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;//权重计算定时时间 默认 30秒private int serverWeightTaskTimerInterval = DEFAULT_TIMER_INTERVAL;private static final Logger logger = LoggerFactory.getLogger(WeightedResponseTimeRule.class);//服务的权重值,后面的会叠加前面的private volatile List<Double> accumulatedWeights = new ArrayList<Double>();private final Random random = new Random();protected Timer serverWeightTimer = null;protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);String name = "unknown";public WeightedResponseTimeRule() {super();}public WeightedResponseTimeRule(ILoadBalancer lb) {super(lb);}@Overridepublic void setLoadBalancer(ILoadBalancer lb) {super.setLoadBalancer(lb);if (lb instanceof BaseLoadBalancer) {name = ((BaseLoadBalancer) lb).getName();}initialize(lb);}void initialize(ILoadBalancer lb) { if (serverWeightTimer != null) {serverWeightTimer.cancel();}serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"+ name, true);serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,serverWeightTaskTimerInterval);// do a initial runServerWeight sw = new ServerWeight();sw.maintainWeights();Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {public void run() {logger.info("Stopping NFLoadBalancer-serverWeightTimer-"+ name);serverWeightTimer.cancel();}}));}public void shutdown() {if (serverWeightTimer != null) {logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + name);serverWeightTimer.cancel();}}List<Double> getAccumulatedWeights() {return Collections.unmodifiableList(accumulatedWeights);}//选择server@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")@Overridepublic Server choose(ILoadBalancer lb, Object key) {if (lb == null) {return null;}Server server = null;while (server == null) {//获取权重值List<Double> currentWeights = accumulatedWeights;if (Thread.interrupted()) {return null;}//获取所有serverList<Server> allList = lb.getAllServers();int serverCount = allList.size();if (serverCount == 0) {return null;}int serverIndex = 0;//权重值集合的最后一个,代表权重值得和double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); // 没有server或者权重值没有初始化,退化到 轮询 ruleif (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {server = super.choose(getLoadBalancer(), key);if(server == null) {return server;}} else {// 创建一个随机值,0~maxTotalWeightdouble randomWeight = random.nextDouble() * maxTotalWeight;// pick the server index based on the randomIndexint n = 0;//选择随机数小于等于权重值的serverfor (Double d : currentWeights) {if (d >= randomWeight) {serverIndex = n;break;} else {n++;}}server = allList.get(serverIndex);}if (server == null) {/* Transient. */Thread.yield();continue;}if (server.isAlive()) {return (server);}// Next.server = null;}return server;}//权重值更新定时器class DynamicServerWeightTask extends TimerTask {public void run() {ServerWeight serverWeight = new ServerWeight();try {serverWeight.maintainWeights();} catch (Exception e) {logger.error("Error running DynamicServerWeightTask for {}", name, e);}}}//权重值更新逻辑class ServerWeight {public void maintainWeights() {//获取负载均衡器ILoadBalancer lb = getLoadBalancer();if (lb == null) {return;}if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {return; }try {logger.info("Weight adjusting job started");AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;//负载均衡器的状态LoadBalancerStats stats = nlb.getLoadBalancerStats();if (stats == null) {// no statistics, nothing to doreturn;}//计算所有server的平均相应时间的综合double totalResponseTime = 0;for (Server server : nlb.getAllServers()) {// 获取服务状态ServerStats ss = stats.getSingleServerStat(server);totalResponseTime += ss.getResponseTimeAvg();}// 计算每个server的权重//weightSoFar + totalResponseTime - server的平均响应时间Double weightSoFar = 0.0;List<Double> finalWeights = new ArrayList<Double>();for (Server server : nlb.getAllServers()) {ServerStats ss = stats.getSingleServerStat(server);double weight = totalResponseTime - ss.getResponseTimeAvg();weightSoFar += weight;finalWeights.add(weightSoFar); }setWeights(finalWeights);} catch (Exception e) {logger.error("Error calculating server weights", e);} finally {serverWeightAssignmentInProgress.set(false);}}}void setWeights(List<Double> weights) {this.accumulatedWeights = weights;}@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {super.initWithNiwsConfig(clientConfig);serverWeightTaskTimerInterval = clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, DEFAULT_TIMER_INTERVAL);}}
5、ClientConfigEnabledRoundRobinRule
public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {RoundRobinRule roundRobinRule = new RoundRobinRule();@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {roundRobinRule = new RoundRobinRule();}@Overridepublic void setLoadBalancer(ILoadBalancer lb) {super.setLoadBalancer(lb);roundRobinRule.setLoadBalancer(lb);}@Overridepublic Server choose(Object key) {if (roundRobinRule != null) {return roundRobinRule.choose(key);} else {throw new IllegalArgumentException("This class has not been initialized with the RoundRobinRule class");}}}
这个策略什么也没有做,直接依赖了一个RoundRobinRule,但是它是这些策略的基础类,提供了一个兜底能力。
BestAvailableRule,PredicateBasedRule,ZoneAvoidanceRule,AvailabilityFilteringRule。
6、BestAvailableRule 最小连接数
public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {private LoadBalancerStats loadBalancerStats;@Overridepublic Server choose(Object key) {if (loadBalancerStats == null) {return super.choose(key);}List<Server> serverList = getLoadBalancer().getAllServers();int minimalConcurrentConnections = Integer.MAX_VALUE;long currentTime = System.currentTimeMillis();Server chosen = null;//遍历所有serverfor (Server server: serverList) {//获取server的状态ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);//如果没有熔断if (!serverStats.isCircuitBreakerTripped(currentTime)) {//当前server连接数int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);//判断当前可用server的连接数是否小于最小连接数//将最小连接数赋值当前连接数,循环后得到最小连接的serverif (concurrentConnections < minimalConcurrentConnections) {minimalConcurrentConnections = concurrentConnections;chosen = server;}}}//如果没有选择出server,则直接调用父类的方法选择serverif (chosen == null) {return super.choose(key);} else {return chosen;}}@Overridepublic void setLoadBalancer(ILoadBalancer lb) {super.setLoadBalancer(lb);if (lb instanceof AbstractLoadBalancer) {loadBalancerStats = ((AbstractLoadBalancer) lb).getLoadBalancerStats(); }}
}
7、PredicateBasedRule 断言基础策略
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {//获取断言public abstract AbstractServerPredicate getPredicate();//选择server,在过滤后进行轮询@Overridepublic Server choose(Object key) {ILoadBalancer lb = getLoadBalancer();Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);if (server.isPresent()) {return server.get();} else {return null;} }
}
8、AvailabilityFilteringRule 可用性过滤
public class AvailabilityFilteringRule extends PredicateBasedRule { private AbstractServerPredicate predicate;public AvailabilityFilteringRule() {super();predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null)).addFallbackPredicate(AbstractServerPredicate.alwaysTrue()).build();}@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig)).addFallbackPredicate(AbstractServerPredicate.alwaysTrue()).build();}//获得可用服务的数量@Monitor(name="AvailableServersCount", type = DataSourceType.GAUGE)public int getAvailableServersCount() {ILoadBalancer lb = getLoadBalancer();List<Server> servers = lb.getAllServers();if (servers == null) {return 0;}return Collections2.filter(servers, predicate.getServerOnlyPredicate()).size();}//用轮询策略选择server,再判断是否可用,最大尝试十次@Overridepublic Server choose(Object key) {int count = 0;Server server = roundRobinRule.choose(key);while (count++ <= 10) {//predicate.apply 是否不用跳过if (predicate.apply(new PredicateKey(server))) {return server;}server = roundRobinRule.choose(key);}return super.choose(key);}@Overridepublic AbstractServerPredicate getPredicate() {return predicate;}
}
9、ZoneAvoidanceRule
public class ZoneAvoidanceRule extends PredicateBasedRule {private static final Random random = new Random();private CompositePredicate compositePredicate;public ZoneAvoidanceRule() {super();ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);}private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {return CompositePredicate.withPredicates(p1, p2).addFallbackPredicate(p2).addFallbackPredicate(AbstractServerPredicate.alwaysTrue()).build();}@Overridepublic void initWithNiwsConfig(IClientConfig clientConfig) {ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);}static Map<String, ZoneSnapshot> createSnapshot(LoadBalancerStats lbStats) {Map<String, ZoneSnapshot> map = new HashMap<String, ZoneSnapshot>();for (String zone : lbStats.getAvailableZones()) {ZoneSnapshot snapshot = lbStats.getZoneSnapshot(zone);map.put(zone, snapshot);}return map;}static String randomChooseZone(Map<String, ZoneSnapshot> snapshot,Set<String> chooseFrom) {if (chooseFrom == null || chooseFrom.size() == 0) {return null;}String selectedZone = chooseFrom.iterator().next();if (chooseFrom.size() == 1) {return selectedZone;}int totalServerCount = 0;for (String zone : chooseFrom) {totalServerCount += snapshot.get(zone).getInstanceCount();}int index = random.nextInt(totalServerCount) + 1;int sum = 0;for (String zone : chooseFrom) {sum += snapshot.get(zone).getInstanceCount();if (index <= sum) {selectedZone = zone;break;}}return selectedZone;}//获取可用区域public static Set<String> getAvailableZones(Map<String, ZoneSnapshot> snapshot, double triggeringLoad,double triggeringBlackoutPercentage) {if (snapshot.isEmpty()) {return null;}//获取可用区域名称Set<String> availableZones = new HashSet<String>(snapshot.keySet());if (availableZones.size() == 1) {return availableZones;}Set<String> worstZones = new HashSet<String>();double maxLoadPerServer = 0;boolean limitedZoneAvailability = false;//遍历区域快照for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {//区域名称String zone = zoneEntry.getKey();//区域快照ZoneSnapshot zoneSnapshot = zoneEntry.getValue();//区域实例int instanceCount = zoneSnapshot.getInstanceCount();//该区域没有服务,移除if (instanceCount == 0) {availableZones.remove(zone);limitedZoneAvailability = true;} else {double loadPerServer = zoneSnapshot.getLoadPerServer();//该区域服务的负载小于0,或者服务的故障率大于等于阈值,移除if (((double) zoneSnapshot.getCircuitTrippedCount())/ instanceCount >= triggeringBlackoutPercentage|| loadPerServer < 0) {availableZones.remove(zone);limitedZoneAvailability = true;} else {//选出平均负载最差的区域if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {worstZones.add(zone);} else if (loadPerServer > maxLoadPerServer) {maxLoadPerServer = loadPerServer;worstZones.clear();worstZones.add(zone);}}}}//最大负载小于阈值并且没有被限制的区域if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {// zone override is not needed herereturn availableZones;}//从最差区域中轮询一个,然后从可用区域中移除String zoneToAvoid = randomChooseZone(snapshot, worstZones);if (zoneToAvoid != null) {availableZones.remove(zoneToAvoid);}return availableZones;}//获取可用的区域public static Set<String> getAvailableZones(LoadBalancerStats lbStats,double triggeringLoad, double triggeringBlackoutPercentage) {if (lbStats == null) {return null;}//生成区域快照Map<String, ZoneSnapshot> snapshot = createSnapshot(lbStats);return getAvailableZones(snapshot, triggeringLoad,triggeringBlackoutPercentage);}@Overridepublic AbstractServerPredicate getPredicate() {return compositePredicate;}
}
Spring Cloud Ribbon 负载均衡策略相关推荐
- Spring Cloud Ribbon负载均衡策略详解
通过之前的文章可以知道, Ribbon负载均衡器选择服务实例的方式是通过"选择策略"实现的, Ribbon实现了很多种选择策略,UML静态类图如上图. IRule是负载均衡的策略接 ...
- Spring Cloud Ribbon 负载均衡客户端调用示例
承接 Spring Cloud 最简入门示例 这一篇, 本篇演示如何使用负载均衡客户端 Ribbon 调用在Eureka注册的服务. Ribbon 是什么? Ribbon是Netflix 的开源项目, ...
- spring cloud中通过配置文件自定义Ribbon负载均衡策略
2019独角兽企业重金招聘Python工程师标准>>> spring cloud中通过配置文件自定义Ribbon负载均衡策略 博客分类: 微服务 一.Ribbon中的负载均衡策略 1 ...
- Spring Cloud的负载均衡Spring Cloud Ribbon和Spring Cloud Feign
一.客户端负载均衡:Spring Cloud Ribbon. Spring Cloud Ribbon是基于HTTP和TCP的客户端负载工具,它是基于Netflix Ribbon实现的.通过Spring ...
- SpringCloud组件:Ribbon负载均衡策略及执行原理!
大家好,我是磊哥. 今天我们来看下微服务中非常重要的一个组件:Ribbon.它作为负载均衡器在分布式网络中扮演着非常重要的角色. 本篇主要内容如下: 在介绍 Ribbon 之前,不得不说下负载均衡这个 ...
- 【云原生微服务八】Ribbon负载均衡策略之WeightedResponseTimeRule源码剖析(响应时间加权)
文章目录 一.前言 二.WeightedResponseTimeRule 1.计算权重? 1)如何更新权重? 2)如何计算权重? 3)例证权重的计算 2.权重的使用 1)权重区间问题? 一.前言 前置 ...
- Ribbon负载均衡策略-3
Ribbon负载均衡策略-3 常用策略 1.RoundRobinRule 轮转调度策略 2.RetryRule重试规律策略 3.RandomRule随机策略 Ribbon的负载均衡策略是由IRule接 ...
- Spring Cloud Alibaba 负载均衡:Ribbon 如何保证微服务的高可用
上一讲我们对 Nacos 的集群环境与实现原理进行了讲解,我们已经可以轻松将单个微服务接入到 Nacos 进行注册,但是微服务本不是孤岛,如何实现有效的服务间稳定通信是本文即将介绍的主要内容,本次我们 ...
- 客户端负载均衡Ribbon之一:Spring Cloud Netflix负载均衡组件Ribbon介绍
Netflix:['netfliːks] ribbon:英[ˈrɪbən]美[ˈrɪbən] n. 带; 绶带; (打印机的) 色带; 带状物; v. 把-撕成条带; 用缎带装饰; 形成带状; ...
最新文章
- 删除结果集中字段重复的方法
- boost::signals2模块实现连接类测试
- struts中如何查看配置文件中是否存在某个返回值
- mc显示服务器生命值,[1.7-1.8]CombatIndicator — 全息显示攻击伤害的数值 让我的世界服务器更有游戏感...
- Leetcode周赛复盘——第 278 场力扣周赛
- Functional ProgrammingLazy Code:被我忘记的迭代器
- 工业机器人 答案 韩建海_不可或缺:协作机器人对于制造业转型升级的意义
- linux添加定时器防抖,linux驱动2.3按键中断-定时器防抖
- 扔盘子(51Nod-1279)
- quartus仿真10:74283的基本功能
- 【Git入门之二】基本术语
- 如何枚举系统的视音频采集设备
- 二、通用、布局、导航组件
- android ionic框架,移动App开发框架—Ionic
- module 与 component 的区别
- P163、面试题29:数组中出现次数超过一半的数字
- DC/DC电路自举电容作用
- dwg在坐标转换的注意事项
- CarSim联合simulink仿真横向控制
- iOS开发之网络编程SocKet
热门文章
- python长的横线怎么打_关于Python的前后、单双下划线作用,看完这篇文章,吊打面试官!...
- 浏览器原理 17 # 宏任务和微任务
- node.js 中Mysql 查询报错 ,解决方法。
- js中的reduce的用法
- 阿里云网盘公测_号称“永不限速”的阿里云网盘开始公测预约!或将支持度盘一键迁移?...
- jdk高版本向下兼容
- java mapping_05.Java属性映射的正确姿势
- matlab仿真对比图,怎样在hfss里对比几个仿真出来的图?
- 阿里云PTS FQA
- [中英文对照]常见大学课程名称翻译