sentinel限流相关指标统计源码分析
文章目录
- *前言*
- *官方架构图*
- 滑动窗口模型
- *StatisticSlot*
- *Node*
- *NodeSelectorSlot*
- *ClusterBuilderSlot*
- *FlowSlot如何使用信号量*
前言
Sentinel 可以通过并发线程数模式的流量控制来提供信号量隔离的功能。并且结合基于响应时间的熔断降级模式,可以在不稳定资源的平均响应时间比较高的时候自动降级,防止过多的慢调用占满并发数,影响整个系统。
sentinel的限流是基于信号量机制的,因此必定在底层维护了一套限流相关指标的信号量,下面从源码的角度分析一个请求进入被sentinel保护的资源,sentinel是如何记录信号量,又是如何通过比对配置的限流规则与信号量记录对请求进行限流的。
官方架构图
首先通过官方的架构图对sentinel整体架构有个大致的了解,本篇博客接下来研究的中心为NodeSelectorSlot,StatisticSlot与ClusterBuilderSlot,并通过FlowSlot说明不同维度的限流规则都是这么使用这些统计数据的。
在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:
1.NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
2.ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
3.StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;
4.FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
5.AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;
6.DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;
7.SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;
滑动窗口模型
首先来看sentinel底层用于存储统计信号量的数据结构,滑动窗口
com.alibaba.csp.sentinel.slots.statistic.base.LeapArray
public abstract class LeapArray<T> {//单位时间窗口长度protected int windowLengthInMs;//总的桶个数protected int sampleCount;//总的时间长度protected int intervalInMs;//记录的窗口数,长度与sampleCount一样protected final AtomicReferenceArray<WindowWrap<T>> array;/*** The conditional (predicate) update lock is used only when current bucket is deprecated.*/private final ReentrantLock updateLock = new ReentrantLock();...
可以看到这个抽象类持有这五个属性,其中限流相关的信号量被WindowWrap包裹着放在 一个线程安全的数组里方便随时存取。
com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap
public class WindowWrap<T> {/*** Time length of a single window bucket in milliseconds.* 单位时间窗口长度*/private final long windowLengthInMs;/*** Start timestamp of the window in milliseconds.* 窗口开始的时间*/private long windowStart;/*** Statistic data.* 实际存放的统计数据*/private T value;
LeapArray是怎么工作的
当第一个请求到来,Sentinel会创建一个特殊的时间片(time-span)去保存运行时的数据,比如:响应时间(rt),QPS, block request,在这里叫做滑动窗口(window bucket),这个滑动窗口通过sample count定义。Sentinel通过滑动窗口有效的数据来决定当前请求是否通过,滑动窗口将记录所有的qps,将其与规则中定义的阈值进行比较。
不同的请求进来,根据不同的时间存放在不同滑动窗口中。
请求不断的进入系统,先前的滑动窗口将会过期无效。
下面看看LeapArray的几个重要方法
com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#calculateTimeIdx
计算当前时间的这个请求应该放在哪个滑动窗口中,返回的是窗口在array的下标
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {long timeId = timeMillis / windowLengthInMs;// Calculate current index so we can map the timestamp to the leap array.return (int)(timeId % array.length());
}
com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#calculateWindowStart
计算窗口(WindowWrap)的windowStart属性,可以看到windowStart都是timeMills的整数倍
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {return timeMillis - timeMillis % windowLengthInMs;
}
com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#currentWindow(long)
最重要的一个方法,获取当前时间的对应窗口,功能总结起来就三句话,有则取出,无则设置,过期则更新
public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}int idx = calculateTimeIdx(timeMillis);// Calculate current bucket start time.long windowStart = calculateWindowStart(timeMillis);/** Get bucket item at given time from the array.** (1) Bucket is absent, then just create a new bucket and CAS update to circular array.* (2) Bucket is up-to-date, then just return the bucket.* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.*/while (true) {WindowWrap<T> old = array.get(idx);if (old == null) {// 无则设置WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));if (array.compareAndSet(idx, null, window)) {// Successfully updated, return the created bucket.return window;} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart == old.windowStart()) {// 有则取出return old;} else if (windowStart > old.windowStart()) {// 过期则更新,这里为了防止并发更新用了锁if (updateLock.tryLock()) {try {// Successfully get the update lock, now we reset the bucket.return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart < old.windowStart()) {// Should not go through here, as the provided time is already behind.// 这里是异常情况,实际不会走到该分支return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}}
StatisticSlot
StatisticSlot是slotChain中负责记录统计数据的slot,因此自然使用了LeapArray,下面通过分析源码说明StatisticSlot是怎么通过LeapArray记录限流信号量的。
分析slot自然首先从它的entry方法入手
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry
@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {// Do some checking.fireEntry(context, resourceWrapper, node, count, prioritized, args);// Request passed, add thread count and pass count.node.increaseThreadNum();node.addPassRequest(count);if (context.getCurEntry().getOriginNode() != null) {// Add count for origin node.context.getCurEntry().getOriginNode().increaseThreadNum();context.getCurEntry().getOriginNode().addPassRequest(count);}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseThreadNum();Constants.ENTRY_NODE.addPassRequest(count);}// Handle pass event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}} catch (PriorityWaitException ex) {...
这里贴的代码省略了异常情况,我们先看看正常情况下的处理流程
- 首先对传入方法的node进行线程数和通过请求数的增加
- 然后获取context持有的当前请求的源node进行线程数和通过请求数的增加
- 判断请求类型是不是In类型,是则对全局的ENTRY_NODE进行线程数和请求数的增加
- 框架扩展点,使用者可以切入进来实现一些逻辑
可以看到所有信号量都是通过node来记录的,那么node是什么?
Node
com.alibaba.csp.sentinel.node.Node
node是sentinel的一个底层接口,提供了所有设置和获取流控信号量的接口,实现类需要实现这些方法来完成对流控信号量的获取和设置。
public interface Node extends OccupySupport, DebugSupport {/*** Get incoming request per minute ({@code pass + block}).** @return total request count per minute*/long totalRequest();/*** Get pass count per minute.** @return total passed request count per minute* @since 1.5.0*/long totalPass();/*** Get {@link Entry#exit()} count per minute.** @return total completed request count per minute*/long totalSuccess();/*** Get blocked request count per minute (totalBlockRequest).** @return total blocked request count per minute*/long blockRequest();/*** Get exception count per minute.** @return total business exception count per minute*/long totalException();/*** Get pass request per second.** @return QPS of passed requests*/double passQps();/*** Get block request per second.** @return QPS of blocked requests*/double blockQps();/*** Get {@link #passQps()} + {@link #blockQps()} request per second.** @return QPS of passed and blocked requests*/double totalQps();/*** Get {@link Entry#exit()} request per second.** @return QPS of completed requests*/double successQps();/*** Get estimated max success QPS till now.** @return max completed QPS*/double maxSuccessQps();/*** Get exception count per second.** @return QPS of exception occurs*/double exceptionQps();/*** Get average rt per second.** @return average response time per second*/double avgRt();/*** Get minimal response time.** @return recorded minimal response time*/double minRt();/*** Get current active thread count.** @return current active thread count*/int curThreadNum();/*** Get last second block QPS.*/double previousBlockQps();/*** Last window QPS.*/double previousPassQps();/*** Fetch all valid metric nodes of resources.** @return valid metric nodes of resources*/Map<Long, MetricNode> metrics();List<MetricNode> rawMetricsInMin(Predicate<Long> timePredicate);/*** Add pass count.** @param count count to add pass*/void addPassRequest(int count);/*** Add rt and success count.** @param rt response time* @param success success count to add*/void addRtAndSuccess(long rt, int success);/*** Increase the block count.** @param count count to add*/void increaseBlockQps(int count);/*** Add the biz exception count.** @param count count to add*/void increaseExceptionQps(int count);/*** Increase current thread count.*/void increaseThreadNum();/*** Decrease current thread count.*/void decreaseThreadNum();/*** Reset the internal counter. Reset is needed when {@link IntervalProperty#INTERVAL} or* {@link SampleCountProperty#SAMPLE_COUNT} is changed.*/void reset();
}
我们来分析一下他的实现类
com.alibaba.csp.sentinel.node.DefaultNode
public class DefaultNode extends StatisticNode {/*** The resource associated with the node.*/private ResourceWrapper id;/*** The list of all child nodes.*/private volatile Set<Node> childList = new HashSet<>();/*** Associated cluster node.*/private ClusterNode clusterNode;
DefaultNode继承了StatisticNode,以一个ResourceWrapper作为标识(区分不同被保护的资源),同时还持有了一个子node的集合以及clusterNode,DefaultNode重写了父类StatisticNode实现的Node接口的一些方法,以increaseBlockQps为例
@Overridepublic void increaseExceptionQps(int count) {super.increaseExceptionQps(count);this.clusterNode.increaseExceptionQps(count);}
可以看到转化为了对StatisticNode和ClusterNode对应方法的操作,本身并没有直接操作滑动窗口,那么相关逻辑想必是在父类中了。我们看看父类StatisticNode的实现
com.alibaba.csp.sentinel.node.StatisticNode
public class StatisticNode implements Node {/*** Holds statistics of the recent {@code INTERVAL} seconds. The {@code INTERVAL} is divided into time spans* by given {@code sampleCount}.*/private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,IntervalProperty.INTERVAL);/*** Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,* meaning each bucket per second, in this way we can get accurate statistics of each second.*/private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);/*** The counter for thread count.*/private AtomicInteger curThreadNum = new AtomicInteger(0);/*** The last timestamp when metrics were fetched.*/private long lastFetchTime = -1;...
看到一个能和滑动窗口粘上边的东西ArrayMetric,还分了分钟和秒数两个版本,我们继续看ArrayMetric的实现,ArrayMetric实现了Metric接口,Metric,顾名思义,映射。是对什么的映射呢?是对信号量操作的映射。
com.alibaba.csp.sentinel.slots.statistic.metric.Metric
public interface Metric extends DebugSupport {/*** Get total success count.** @return success count*/long success();/*** Get max success count.** @return max success count*/long maxSuccess();/*** Get total exception count.** @return exception count*/long exception();/*** Get total block count.** @return block count*/long block();/*** Get total pass count. not include {@link #occupiedPass()}** @return pass count*/long pass();/*** Get total response time.** @return total RT*/long rt();/*** Get the minimal RT.** @return minimal RT*/long minRt();/*** Get aggregated metric nodes of all resources.** @return metric node list of all resources*/List<MetricNode> details();List<MetricNode> detailsOnCondition(Predicate<Long> timePredicate);/*** Get the raw window array.** @return window metric array*/MetricBucket[] windows();/*** Add current exception count.** @param n count to add*/void addException(int n);/*** Add current block count.** @param n count to add*/void addBlock(int n);/*** Add current completed count.** @param n count to add*/void addSuccess(int n);/*** Add current pass count.** @param n count to add*/void addPass(int n);/*** Add given RT to current total RT.** @param rt RT*/void addRT(long rt);/*** Get the sliding window length in seconds.** @return the sliding window length*/double getWindowIntervalInSec();/*** Get sample count of the sliding window.** @return sample count of the sliding window.*/int getSampleCount();/*** Note: this operation will not perform refreshing, so will not generate new buckets.** @param timeMillis valid time in ms* @return pass count of the bucket exactly associated to provided timestamp, or 0 if the timestamp is invalid* @since 1.5.0*/long getWindowPass(long timeMillis);// Occupy-based (@since 1.5.0)/*** Add occupied pass, which represents pass requests that borrow the latter windows' token.** @param acquireCount tokens count.* @since 1.5.0*/void addOccupiedPass(int acquireCount);/*** Add request that occupied.** @param futureTime future timestamp that the acquireCount should be added on.* @param acquireCount tokens count.* @since 1.5.0*/void addWaiting(long futureTime, int acquireCount);/*** Get waiting pass account** @return waiting pass count* @since 1.5.0*/long waiting();/*** Get occupied pass count.** @return occupied pass count* @since 1.5.0*/long occupiedPass();// Tool methods.long previousWindowBlock();long previousWindowPass();
}
metric里定义了一系列对限流信号量的操作的方法,它只有一个实现类:
com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric
public class ArrayMetric implements Metric {private final LeapArray<MetricBucket> data;public ArrayMetric(int sampleCount, int intervalInMs) {this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);}...
我们终于找到了上一节说的用于存放底层信号量的数据结构滑动窗口,而ArrayMetric对于Metric的实现也都是通过操作底层的滑动窗口进行的,映射其实是对滑动窗口的映射。而滑动窗口存放的底层信号量是MetricBucket:
com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket
public class MetricBucket {private final LongAdder[] counters;private volatile long minRt;public MetricBucket() {MetricEvent[] events = MetricEvent.values();this.counters = new LongAdder[events.length];for (MetricEvent event : events) {counters[event.ordinal()] = new LongAdder();}initMinRt();}...
乍一看除了minRT似乎没有什么和流控信号量相关的成员变量,但是构造函数向另一个成员变量counters里初始化了一些数据,初始化的是什么呢?其实看一下MetricEvents的源码就一目了然了:
com.alibaba.csp.sentinel.slots.statistic.MetricEvent
public enum MetricEvent {/*** Normal pass.*/PASS,/*** Normal block.*/BLOCK,EXCEPTION,SUCCESS,RT,/*** Passed in future quota (pre-occupied, since 1.5.0).*/OCCUPIED_PASS
}
原来其它的流控信号量全部存在了一个LongAdder数组里。为什么要使用LongAdder类型呢?继续阅读源码:
com.alibaba.csp.sentinel.slots.statistic.base.LongAdder
public class LongAdder extends Striped64 implements Serializable {private static final long serialVersionUID = 7249069246863182397L;/*** Version of plus for use in retryUpdate*/final long fn(long v, long x) { return v + x; }/*** Creates a new adder with initial sum of zero.*/public LongAdder() {}public void add(long x) {Cell[] as;long b, v;HashCode hc;Cell a;int n;if ((as = cells) != null || !casBase(b = base, b + x)) {boolean uncontended = true;int h = (hc = threadHashCode.get()).code;if (as == null || (n = as.length) < 1 ||(a = as[(n - 1) & h]) == null ||!(uncontended = a.cas(v = a.value, v + x))) { retryUpdate(x, hc, uncontended); }}}/*** Equivalent to {@code add(1)}.*/public void increment() {add(1L);}/*** Equivalent to {@code add(-1)}.*/public void decrement() {add(-1L);}...
LongAdder继承了Striped64并提供了一些计数方法,对Striped64不了解的小伙伴这里只需要简单的知道
Striped64是在java8中添加用来支持累加器的并发组件,它可以在并发环境下使用来做某种计数
iped64的设计思路是在竞争激烈的时候尽量分散竞争,在实现上,Striped64维护了一个base Count和一个Cell数组,计数线程会首先试图更新base变量,如果成功则退出计数,否则会认为当前竞争是很激烈的,那么就会通过Cell数组来分散计数,Striped64根据线程来计算哈希,然后将不同的线程分散到不同的Cell数组的index上,然后这个线程的计数内容就会保存在该Cell的位置上面,基于这种设计,最后的总计数需要结合base以及散落在Cell数组中的计数内容。
让我们回到对StatisticNode的分析,其本身也封装了一些信号量操作的api供上层调用,而最终都会转化为对ArrayMetric->LeapArray的操作,最终数据保存在MetricBucket里。
这样我们就把被保护资源的流控信号量以隔离的方式保存了下来,接下来就是根据实际情况更新和使用这些信号量以达到流控的目的。在这之前我们先看一下sentinel是如何区分和找到不同资源对应的Node的。
NodeSelectorSlot
sentinel允许我们为被保护的资源取任意的名字,只要对同一资源的命名保持一致并且对不同资源命名可以区分即可
public static Entry entry(String name, int resourceType, EntryType type) throws BlockException {return Env.sph.entryWithType(name, resourceType, type, 1, OBJECTS0);
}
而如果sentinel与其他框架整合,通常也会在结合点定义通用的资源取名规则,使用者只需要根据这些取名规则进行对应资源的流控配置即可,这里以sentinel-dubbo-adapter为例
String resourceName = DubboUtils.getResourceName(invoker, invocation);
String interfaceName = invoker.getInterface().getName();
com.alibaba.csp.sentinel.adapter.dubbo.DubboUtils
public static String getResourceName(Invoker<?> invoker, Invocation invocation) {StringBuilder buf = new StringBuilder(64);buf.append(invoker.getInterface().getName()).append(":").append(invocation.getMethodName()).append("(");boolean isFirst = true;for (Class<?> clazz : invocation.getParameterTypes()) {if (!isFirst) {buf.append(",");}buf.append(clazz.getName());isFirst = false;}buf.append(")");return buf.toString();}
sentinel整合dubbo支持dubbo service和dubbo service method两种级别的资源保护,资源名的定义如源码所述,这也是为什么官方文档要求必须这样定义流控规则的原因:
服务接口:resourceName 为 接口全限定名,如 com.alibaba.csp.sentinel.demo.dubbo.FooService
服务方法:resourceName 为 接口全限定名:方法签名,如 com.alibaba.csp.sentinel.demo.dubbo.FooService:sayHello(java.lang.String)
不同Node资源的区分找到了,接下来看sentinel是怎么根据资源名找到其对应的Node的。
首先从流控的入口开始
com.alibaba.csp.sentinel.SphU#entry(java.lang.String, int, com.alibaba.csp.sentinel.EntryType)
public static Entry entry(String name, int resourceType, EntryType type) throws BlockException {return Env.sph.entryWithType(name, resourceType, type, 1, OBJECTS0);
}
com.alibaba.csp.sentinel.CtSph#entryWithType(java.lang.String, int, com.alibaba.csp.sentinel.EntryType, int, boolean, java.lang.Object[])
@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,Object[] args) throws BlockException {StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);return entryWithPriority(resource, count, prioritized, args);
}
到这里我们看到了entryWithType方法根据资源名构造了一个StringResourceWrapper对象,而StringResourceWrapper是ResourceWrapper的子类,我们之前在哪看到过有使用这个类呢,在DefaultNode类里,持有了一个被命名为id的ResourceWrapper的成员变量,听名字就知道这是用来区分不同资源对应的Node的,看来我们离真相很近了,继续阅读源码
com.alibaba.csp.sentinel.CtSph#entryWithPriority(com.alibaba.csp.sentinel.slotchain.ResourceWrapper, int, boolean, java.lang.Object…)
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)throws BlockException {Context context = ContextUtil.getContext();if (context instanceof NullContext) {// The {@link NullContext} indicates that the amount of context has exceeded the threshold,// so here init the entry only. No rule checking will be done.return new CtEntry(resourceWrapper, null, context);}if (context == null) {// Using default context.context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);}// Global switch is close, no rule checking will do.if (!Constants.ON) {return new CtEntry(resourceWrapper, null, context);}ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);/** Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},* so no rule checking will be done.*/if (chain == null) {return new CtEntry(resourceWrapper, null, context);}Entry e = new CtEntry(resourceWrapper, chain, context);try {chain.entry(context, resourceWrapper, null, count, prioritized, args);} catch (BlockException e1) {e.exit(count, args);throw e1;} catch (Throwable e1) {// This should not happen, unless there are errors existing in Sentinel internal.RecordLog.info("Sentinel unexpected exception", e1);}return e;}
先根据resourceWrapper获取到ProcessorSlot,然后调用ProcessorSlot的entry方法继续传递,ProcessorSlot以resourceWrapper为key存放在一个map集合里:
com.alibaba.csp.sentinel.CtSph#lookProcessChain
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {ProcessorSlotChain chain = chainMap.get(resourceWrapper);if (chain == null) {synchronized (LOCK) {chain = chainMap.get(resourceWrapper);if (chain == null) {// Entry size limit.if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {return null;}chain = SlotChainProvider.newSlotChain();Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);newMap.putAll(chainMap);newMap.put(resourceWrapper, chain);chainMap = newMap;}}}return chain;}
com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain
public static ProcessorSlotChain newSlotChain() {if (builder != null) {return builder.build();}resolveSlotChainBuilder();if (builder == null) {RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");builder = new DefaultSlotChainBuilder();}return builder.build();}
com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder
public class DefaultSlotChainBuilder implements SlotChainBuilder {@Overridepublic ProcessorSlotChain build() {ProcessorSlotChain chain = new DefaultProcessorSlotChain();chain.addLast(new NodeSelectorSlot());chain.addLast(new ClusterBuilderSlot());chain.addLast(new LogSlot());chain.addLast(new StatisticSlot());chain.addLast(new SystemSlot());chain.addLast(new AuthoritySlot());chain.addLast(new FlowSlot());chain.addLast(new DegradeSlot());return chain;}
}
可以看到默认的slotChain支持NodeSelectorSlot,ClusterBuilderSlot,LogSlot等一系列默认插件,
继续看ProcessorSlot的entry方法
com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain#entry
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)throws Throwable {super.fireEntry(context, resourceWrapper, t, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {super.fireExit(context, resourceWrapper, count, args);}
};public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)throws Throwable {first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}
com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot#transformEntry
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)throws Throwable {T t = (T)o;entry(context, resourceWrapper, t, count, prioritized, args);}
这里的entry又调回了first对于entry方法的实现
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)throws Throwable {super.fireEntry(context, resourceWrapper, t, count, prioritized, args);}
com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot#fireEntry
private AbstractLinkedProcessorSlot<?> next = null;@Overridepublic void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)throws Throwable {if (next != null) {next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);}}
fireEntry这个方法类似于netty的消息透传方法,会将对被保护资源的请求在整个slotChain链路上传递下去,之前提到的sentinel默认支持的slot组件都是AbatractLinkedProcessorSlot的子类
之前在DefaultSlotChainBuilder的build方法中已经看到了,第一个加入的slot就是NodeSelectorSlot,那么他的entry方法也将被最先执行,顾名思义,这个slot的作用应该就是用来寻找资源对应的Node,我们来阅读源码
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {/*** {@link DefaultNode}s of the same resource in different context.*/private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)throws Throwable {/** It's interesting that we use context name rather resource name as the map key.** Remember that same resource({@link ResourceWrapper#equals(Object)}) will share* the same {@link ProcessorSlotChain} globally, no matter in which context. So if* code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, Object...)},* the resource name must be same but context name may not.** If we use {@link com.alibaba.csp.sentinel.SphU#entry(String resource)} to* enter same resource in different context, using context name as map key can* distinguish the same resource. In this case, multiple {@link DefaultNode}s will be created* of the same resource name, for every distinct context (different context name) each.** Consider another question. One resource may have multiple {@link DefaultNode},* so what is the fastest way to get total statistics of the same resource?* The answer is all {@link DefaultNode}s with same resource name share one* {@link ClusterNode}. See {@link ClusterBuilderSlot} for detail.*/DefaultNode node = map.get(context.getName());if (node == null) {synchronized (this) {node = map.get(context.getName());if (node == null) {node = new DefaultNode(resourceWrapper, null);HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());cacheMap.putAll(map);cacheMap.put(context.getName(), node);map = cacheMap;}// Build invocation tree((DefaultNode)context.getLastNode()).addChild(node);}}context.setCurNode(node);fireEntry(context, resourceWrapper, node, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}
}
NodeSelectorSlot维护了一个存放Node的map,而它的key却不是ResourceWrapper而是context.getName,为什么要这么做的原因官方在注释里说明的很清楚,同时解释了一下ClusterNode与ClusterBuilderSlot存在的原因:
- 使用context.getName作为key是为了满足获取不同context下相同资源统计数据的需求
- 为了快速的统计不同context下同一resource的总和数据,提供了ClusterNode与ClusterBuilderSlot
通过NodeSelectorSlot获取到当前context对应的被保护资源后,会将该获取到的资源透传给之后的slot处理,而在StatisticSlot中也可以接收到这个node对其做统计处理。我们之前已经分析过StatisticSlot的entry方法这里不再赘述。
至此sentinel统计信号量的整体流程已经说明的差不多了,接下来补充说明ClusterNode与ClusterBuilderSlot以及FlowSlot是如何使用这些统计信号量的。
ClusterBuilderSlot
上一节说明了clusterBuilderSlot存在的意义是为了整合散落在不同context的同一资源的统计结果,那么是怎么做整合的呢?既然是一个slot,那首先从entry方法入手
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {/*** <p>* Remember that same resource({@link ResourceWrapper#equals(Object)}) will share* the same {@link ProcessorSlotChain} globally, no matter in witch context. So if* code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, boolean, Object...)},* the resource name must be same but context name may not.* </p>* <p>* To get total statistics of the same resource in different context, same resource* shares the same {@link ClusterNode} globally. All {@link ClusterNode}s are cached* in this map.* </p>* <p>* The longer the application runs, the more stable this mapping will* become. so we don't concurrent map but a lock. as this lock only happens* at the very beginning while concurrent map will hold the lock all the time.* </p>*/private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();private static final Object lock = new Object();private volatile ClusterNode clusterNode = null;@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args)throws Throwable {if (clusterNode == null) {synchronized (lock) {if (clusterNode == null) {// Create the cluster node.clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));newMap.putAll(clusterNodeMap);newMap.put(node.getId(), clusterNode);clusterNodeMap = newMap;}}}node.setClusterNode(clusterNode);/** if context origin is set, we should get or create a new {@link Node} of* the specific origin.*/if (!"".equals(context.getOrigin())) {Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());context.getCurEntry().setOriginNode(originNode);}fireEntry(context, resourceWrapper, node, count, prioritized, args);}
same resourc shares the same {@link ClusterNode} globally. All {@link ClusterNode}s are cached
结合官方的注释不难看懂,由于同一个resource会拥有同一条slot chain,因此每条slot chain中的ClusterBuilderSlot实例持有的ClusterNode都是这个resource唯一对应的clusterNode,但同时这个类还维护了一个全局的静态变量clusterNodeMap,这个变量以资源对应的ResourceWrapper为key维护了系统中所有资源对应的ClusterNode,为什么要这样做官方肯定有使用到这个特性的地方,之后发现了也会和大家分享,这里先简单看下entry里的逻辑:
- 检查当前resource对应的culsterNode是否为空
- 为空则加锁创建,设置culsterNode变量,填充clusterNodeMap
- 最后一个有意思的地方,他会将clusterNode塞进context对应的Node(这里是DefaultNode类型)里,而clusterNode整合所有相同资源统计信号量的能力就是通过这一句简单的代码实现的,还记得前几节对于DefaultNode的分析吗,DefaultNode的所有信号量更新的操作都不止针对它自己,同时也同步更新了他持有的clusterNode,而所有相同资源的DefaultNode持有的clusterNode其实都是同一个,这样就做到了数据的共享,以increaseExceptionQps方法举例
@Overridepublic void increaseExceptionQps(int count) {super.increaseExceptionQps(count);this.clusterNode.increaseExceptionQps(count);}
ClusterNode也是StatisticNode的子类,因此拥有StatisticNode赋予的所有统计能力,这里就不赘述了感兴趣的可以自己阅读源码。
FlowSlot如何使用信号量
最后一节我们通过FlowSlot举例说明用于限流相关的slot是如何使用这些收集到的信号量的。从entry方法切入
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot#entry
@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {checkFlow(resourceWrapper, context, node, count, prioritized);fireEntry(context, resourceWrapper, node, count, prioritized, args);}
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot#checkFlow
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)throws BlockException {checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);}
com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker#checkFlow
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {if (ruleProvider == null || resource == null) {return;}Collection<FlowRule> rules = ruleProvider.apply(resource.getName());if (rules != null) {for (FlowRule rule : rules) {if (!canPassCheck(rule, context, node, count, prioritized)) {throw new FlowException(rule.getLimitApp(), rule);}}}}
到这一步我们已经根据资源名获取到了我们配置的限流规则,然后循环调用canPassCheck检查当前资源的状态是否能通过限流规则的检查,如果通过不了就好抛出熟悉的FlowException异常。
com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker#canPassCheck()
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {String limitApp = rule.getLimitApp();if (limitApp == null) {return true;}if (rule.isClusterMode()) {return passClusterCheck(rule, context, node, acquireCount, prioritized);}return passLocalCheck(rule, context, node, acquireCount, prioritized);}
这里会判断限流规则是否是针对集群,走不同的逻辑,这里不是这篇博客研究的重点我们直接看对本地资源的检查逻辑
com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker#passLocalCheck
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);if (selectedNode == null) {return true;}return rule.getRater().canPass(selectedNode, acquireCount, prioritized);}
通过rule自身持有的rater去检查,rule会根据定义规则时声明的属性去生成对应的rater,这里同样不是我们研究的重点,我们随便看一个rater(com.alibaba.csp.sentinel.slots.block.flow.TrafficShapingController) canPass方法的实现:
com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController#canPass(com.alibaba.csp.sentinel.node.Node, int, boolean)
public boolean canPass(Node node, int acquireCount, boolean prioritized) {int curCount = avgUsedTokens(node);if (curCount + acquireCount > count) {if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {long currentTime;long waitInMs;currentTime = TimeUtil.currentTimeMillis();waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {node.addWaitingRequest(currentTime + waitInMs, acquireCount);node.addOccupiedPass(acquireCount);sleep(waitInMs);// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.throw new PriorityWaitException(waitInMs);}}return false;}return true;}
先根据node获取资源当前的使用数然后加上这次请求的数量和规则比对看能否通过,如果不能通过针对启用了prioritized属性的资源还有后续的处理,这里分析一下avgUsedTokens方法
com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController#avgUsedTokens
private int avgUsedTokens(Node node) {if (node == null) {return DEFAULT_AVG_USED_TOKENS;}return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());}
根据FlowRule的具体限流模式去node取统计的线程数或是qps数据,逻辑很清楚没什么好说的。注意这里使用的是DefaultNode而不是ClusterNode,我们对于被保护资源的限流难道不应该是全局的吗?这就和sentinel中Context的定义有关了,下篇博客将会说明sentinel的Context组件。
sentinel限流相关指标统计源码分析相关推荐
- Redis如何实现分布式锁延时队列以及限流应用丨Redis源码原理|跳表|B+树|分布式锁|中间件|主从同步|存储原理
Redis如何实现分布式锁延时队列以及限流应用 视频讲解如下,点击观看: Redis如何实现分布式锁延时队列以及限流应用丨Redis源码原理|跳表|B+树|分布式锁|中间件|主从同步|存储原理|数据模 ...
- Apache Storm 实时流处理系统通信机制源码分析
我们今天就来仔细研究一下Apache Storm 2.0.0-SNAPSHOT的通信机制.下面我将从大致思想以及源码分析,然后我们细致分析实时流处理系统中源码通信机制研究. 1. 简介 Worker间 ...
- 从flink-example分析flink组件(3)WordCount 流式实战及源码分析
前面介绍了批量处理的WorkCount是如何执行的 <从flink-example分析flink组件(1)WordCount batch实战及源码分析> <从flink-exampl ...
- Sentinel滑动时间窗限流算法原理及源码解析(上)
文章目录 时间窗限流算法 滑动时间窗口 滑动时间窗口算法改进 滑动时间窗口源码解析 时间窗限流算法 10t到16t 10个请求 16t-20t 50个请求 20t-26t 60个请求 26t到30t ...
- Sentinel滑动时间窗限流算法原理及源码解析(下)
文章目录 对统计数据如何使用 获取之前统计好的数据 对统计数据如何使用 流控快速失败 获取之前统计好的数据
- Sentinel滑动时间窗限流算法原理及源码解析(中)
文章目录 MetricBucket MetricEvent数据统计的维度 WindowWrap样本窗口实例 范型T为MetricBucket windowLengthInMs 样本窗口长度 windo ...
- 源码分析 Sentinel 实时数据采集实现原理
本篇将重点关注 Sentienl 实时数据收集,即 Sentienl 具体是如何收集调用信息,以此来判断是否需要触发限流或熔断. 本节目录 1.源码分析 StatisticSlot 1.1 Stati ...
- Alibaba Sentinel 限流与熔断初探(技巧篇)
温馨提示:源码分析 Alibaba Sentinel 专栏开始连载,本文展示如何学习一个全新的技术的方法.该专栏基于 1.7.0 版本. 在学习一个新技术或新框架时,建议先查看其官方文档, Senti ...
- 详述 Spring MVC 启动流程及相关源码分析
文章目录 Web 应用部署初始化过程(Web Application Deployement) Spring MVC 启动过程 Listener 的初始化过程 Filter 的初始化 Servlet ...
最新文章
- Spark Streaming实践和优化
- 分布式服务框架 Zookeeper(二)官方介绍
- Simple Introduction to Dirichlet Process
- 微信小程序怎么在wxml中插入多个图片_兰州小程序开发流程费用推荐品牌_便企网...
- 用faster-rcnn训练自己的数据集(VOC2007格式,python版)
- UML类图与类的关系详解【转】
- (更新)网络规划设计师考试大纲 及教程_下载
- POJChallengeRound2 Guideposts 【单位根反演】【快速幂】
- 项目安装使用uuid_在uniapp中使用fingerprint2实现游客设备标识
- 不学无数——SpringBoot入门VI
- java excel导入前台_java后台生成了一个表格,用流传到前台,请问怎么接收呀?在线等...
- nosql数据库之Redis集群
- 深度学习---之显存单位,KiB,MiB与MB区别
- 服务器硬盘数据备份到nas,这么设置USBCopy数据就能轻松备份至NAS
- 计算机基础及Python简介
- char[]和char* 输出长度不同
- 【区块链108将】千方基金点付大头:投资区块链,不要让过往认知限制你的想象
- Ubuntu 设置合上笔记本盖子不休眠的方法
- 美国大学计算机科学博士生排名,usnews美国大学研究生计算机科学专业完整排名...
- Vim 大小写切换快捷键
热门文章
- php判断图片有没有ps过,你知道你PS过的图片会侵犯别人的权利吗?
- Pytorch kaggle 房价预测实战
- 漫游费概念模糊,运营商自食其果
- 使用html2canvas 生成h5项目中需要的海报效果,uniapp 生成h5海报
- 电子工作室培训(一)单片机环境搭建,keil5 新建工程及流水灯
- 在线安装rancher2.4管理K8S集群并部署服务
- matlab中霍夫线检测函数,matlab 霍夫检测
- 顺序队列为空的条件_合成中心丙烯压缩空冷器冬季防冻及自动化运行项目顺利完成...
- mysql 创建用户、授权、修改密码
- [附源码]计算机毕业设计基于Springboot校园运动会管理系统