
  • *前言*
  • *官方架构图*
  • 滑动窗口模型
  • *StatisticSlot*
  • *Node*
  • *NodeSelectorSlot*
  • *ClusterBuilderSlot*
  • *FlowSlot如何使用信号量*


Sentinel 可以通过并发线程数模式的流量控制来提供信号量隔离的功能。并且结合基于响应时间的熔断降级模式,可以在不稳定资源的平均响应时间比较高的时候自动降级,防止过多的慢调用占满并发数,影响整个系统。




在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:
1.NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
2.ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
3.StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;
4.FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
5.AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;
6.DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;
7.SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;



public abstract class LeapArray<T> {//单位时间窗口长度protected int windowLengthInMs;//总的桶个数protected int sampleCount;//总的时间长度protected int intervalInMs;//记录的窗口数,长度与sampleCount一样protected final AtomicReferenceArray<WindowWrap<T>> array;/*** The conditional (predicate) update lock is used only when current bucket is deprecated.*/private final ReentrantLock updateLock = new ReentrantLock();...

可以看到这个抽象类持有这五个属性,其中限流相关的信号量被WindowWrap包裹着放在 一个线程安全的数组里方便随时存取。

public class WindowWrap<T> {/*** Time length of a single window bucket in milliseconds.* 单位时间窗口长度*/private final long windowLengthInMs;/*** Start timestamp of the window in milliseconds.* 窗口开始的时间*/private long windowStart;/*** Statistic data.* 实际存放的统计数据*/private T value;

当第一个请求到来,Sentinel会创建一个特殊的时间片(time-span)去保存运行时的数据,比如:响应时间(rt),QPS, block request,在这里叫做滑动窗口(window bucket),这个滑动窗口通过sample count定义。Sentinel通过滑动窗口有效的数据来决定当前请求是否通过,滑动窗口将记录所有的qps,将其与规则中定义的阈值进行比较。


private int calculateTimeIdx(/*@Valid*/ long timeMillis) {long timeId = timeMillis / windowLengthInMs;// Calculate current index so we can map the timestamp to the leap array.return (int)(timeId % array.length());


protected long calculateWindowStart(/*@Valid*/ long timeMillis) {return timeMillis - timeMillis % windowLengthInMs;


public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}int idx = calculateTimeIdx(timeMillis);// Calculate current bucket start time.long windowStart = calculateWindowStart(timeMillis);/** Get bucket item at given time from the array.** (1) Bucket is absent, then just create a new bucket and CAS update to circular array.* (2) Bucket is up-to-date, then just return the bucket.* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.*/while (true) {WindowWrap<T> old = array.get(idx);if (old == null) {// 无则设置WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));if (array.compareAndSet(idx, null, window)) {// Successfully updated, return the created bucket.return window;} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart == old.windowStart()) {// 有则取出return old;} else if (windowStart > old.windowStart()) {// 过期则更新,这里为了防止并发更新用了锁if (updateLock.tryLock()) {try {// Successfully get the update lock, now we reset the bucket.return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart < old.windowStart()) {// Should not go through here, as the provided time is already behind.// 这里是异常情况,实际不会走到该分支return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}}



@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {// Do some checking.fireEntry(context, resourceWrapper, node, count, prioritized, args);// Request passed, add thread count and pass count.node.increaseThreadNum();node.addPassRequest(count);if (context.getCurEntry().getOriginNode() != null) {// Add count for origin node.context.getCurEntry().getOriginNode().increaseThreadNum();context.getCurEntry().getOriginNode().addPassRequest(count);}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseThreadNum();Constants.ENTRY_NODE.addPassRequest(count);}// Handle pass event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}} catch (PriorityWaitException ex) {...


  1. 首先对传入方法的node进行线程数和通过请求数的增加
  2. 然后获取context持有的当前请求的源node进行线程数和通过请求数的增加
  3. 判断请求类型是不是In类型,是则对全局的ENTRY_NODE进行线程数和请求数的增加
  4. 框架扩展点,使用者可以切入进来实现一些逻辑




public interface Node extends OccupySupport, DebugSupport {/*** Get incoming request per minute ({@code pass + block}).** @return total request count per minute*/long totalRequest();/*** Get pass count per minute.** @return total passed request count per minute* @since 1.5.0*/long totalPass();/*** Get {@link Entry#exit()} count per minute.** @return total completed request count per minute*/long totalSuccess();/*** Get blocked request count per minute (totalBlockRequest).** @return total blocked request count per minute*/long blockRequest();/*** Get exception count per minute.** @return total business exception count per minute*/long totalException();/*** Get pass request per second.** @return QPS of passed requests*/double passQps();/*** Get block request per second.** @return QPS of blocked requests*/double blockQps();/*** Get {@link #passQps()} + {@link #blockQps()} request per second.** @return QPS of passed and blocked requests*/double totalQps();/*** Get {@link Entry#exit()} request per second.** @return QPS of completed requests*/double successQps();/*** Get estimated max success QPS till now.** @return max completed QPS*/double maxSuccessQps();/*** Get exception count per second.** @return QPS of exception occurs*/double exceptionQps();/*** Get average rt per second.** @return average response time per second*/double avgRt();/*** Get minimal response time.** @return recorded minimal response time*/double minRt();/*** Get current active thread count.** @return current active thread count*/int curThreadNum();/*** Get last second block QPS.*/double previousBlockQps();/*** Last window QPS.*/double previousPassQps();/*** Fetch all valid metric nodes of resources.** @return valid metric nodes of resources*/Map<Long, MetricNode> metrics();List<MetricNode> rawMetricsInMin(Predicate<Long> timePredicate);/*** Add pass count.** @param count count to add pass*/void addPassRequest(int count);/*** Add rt and success count.** @param rt      response time* @param success success count to add*/void addRtAndSuccess(long rt, int success);/*** Increase the block count.** @param count count to add*/void increaseBlockQps(int count);/*** Add the biz exception count.** @param count count to add*/void increaseExceptionQps(int count);/*** Increase current thread count.*/void increaseThreadNum();/*** Decrease current thread count.*/void decreaseThreadNum();/*** Reset the internal counter. Reset is needed when {@link IntervalProperty#INTERVAL} or* {@link SampleCountProperty#SAMPLE_COUNT} is changed.*/void reset();


public class DefaultNode extends StatisticNode {/*** The resource associated with the node.*/private ResourceWrapper id;/*** The list of all child nodes.*/private volatile Set<Node> childList = new HashSet<>();/*** Associated cluster node.*/private ClusterNode clusterNode;


    @Overridepublic void increaseExceptionQps(int count) {super.increaseExceptionQps(count);this.clusterNode.increaseExceptionQps(count);}


public class StatisticNode implements Node {/*** Holds statistics of the recent {@code INTERVAL} seconds. The {@code INTERVAL} is divided into time spans* by given {@code sampleCount}.*/private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,IntervalProperty.INTERVAL);/*** Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,* meaning each bucket per second, in this way we can get accurate statistics of each second.*/private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);/*** The counter for thread count.*/private AtomicInteger curThreadNum = new AtomicInteger(0);/*** The last timestamp when metrics were fetched.*/private long lastFetchTime = -1;...


public interface Metric extends DebugSupport {/*** Get total success count.** @return success count*/long success();/*** Get max success count.** @return max success count*/long maxSuccess();/*** Get total exception count.** @return exception count*/long exception();/*** Get total block count.** @return block count*/long block();/*** Get total pass count. not include {@link #occupiedPass()}** @return pass count*/long pass();/*** Get total response time.** @return total RT*/long rt();/*** Get the minimal RT.** @return minimal RT*/long minRt();/*** Get aggregated metric nodes of all resources.** @return metric node list of all resources*/List<MetricNode> details();List<MetricNode> detailsOnCondition(Predicate<Long> timePredicate);/*** Get the raw window array.** @return window metric array*/MetricBucket[] windows();/*** Add current exception count.** @param n count to add*/void addException(int n);/*** Add current block count.** @param n count to add*/void addBlock(int n);/*** Add current completed count.** @param n count to add*/void addSuccess(int n);/*** Add current pass count.** @param n count to add*/void addPass(int n);/*** Add given RT to current total RT.** @param rt RT*/void addRT(long rt);/*** Get the sliding window length in seconds.** @return the sliding window length*/double getWindowIntervalInSec();/*** Get sample count of the sliding window.** @return sample count of the sliding window.*/int getSampleCount();/*** Note: this operation will not perform refreshing, so will not generate new buckets.** @param timeMillis valid time in ms* @return pass count of the bucket exactly associated to provided timestamp, or 0 if the timestamp is invalid* @since 1.5.0*/long getWindowPass(long timeMillis);// Occupy-based (@since 1.5.0)/*** Add occupied pass, which represents pass requests that borrow the latter windows' token.** @param acquireCount tokens count.* @since 1.5.0*/void addOccupiedPass(int acquireCount);/*** Add request that occupied.** @param futureTime   future timestamp that the acquireCount should be added on.* @param acquireCount tokens count.* @since 1.5.0*/void addWaiting(long futureTime, int acquireCount);/*** Get waiting pass account** @return waiting pass count* @since 1.5.0*/long waiting();/*** Get occupied pass count.** @return occupied pass count* @since 1.5.0*/long occupiedPass();// Tool methods.long previousWindowBlock();long previousWindowPass();


public class ArrayMetric implements Metric {private final LeapArray<MetricBucket> data;public ArrayMetric(int sampleCount, int intervalInMs) {this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);}...


public class MetricBucket {private final LongAdder[] counters;private volatile long minRt;public MetricBucket() {MetricEvent[] events = MetricEvent.values();this.counters = new LongAdder[events.length];for (MetricEvent event : events) {counters[event.ordinal()] = new LongAdder();}initMinRt();}...


public enum MetricEvent {/*** Normal pass.*/PASS,/*** Normal block.*/BLOCK,EXCEPTION,SUCCESS,RT,/*** Passed in future quota (pre-occupied, since 1.5.0).*/OCCUPIED_PASS


public class LongAdder extends Striped64 implements Serializable {private static final long serialVersionUID = 7249069246863182397L;/*** Version of plus for use in retryUpdate*/final long fn(long v, long x) { return v + x; }/*** Creates a new adder with initial sum of zero.*/public LongAdder() {}public void add(long x) {Cell[] as;long b, v;HashCode hc;Cell a;int n;if ((as = cells) != null || !casBase(b = base, b + x)) {boolean uncontended = true;int h = (hc = threadHashCode.get()).code;if (as == null || (n = as.length) < 1 ||(a = as[(n - 1) & h]) == null ||!(uncontended = a.cas(v = a.value, v + x))) { retryUpdate(x, hc, uncontended); }}}/*** Equivalent to {@code add(1)}.*/public void increment() {add(1L);}/*** Equivalent to {@code add(-1)}.*/public void decrement() {add(-1L);}...


iped64的设计思路是在竞争激烈的时候尽量分散竞争,在实现上,Striped64维护了一个base Count和一个Cell数组,计数线程会首先试图更新base变量,如果成功则退出计数,否则会认为当前竞争是很激烈的,那么就会通过Cell数组来分散计数,Striped64根据线程来计算哈希,然后将不同的线程分散到不同的Cell数组的index上,然后这个线程的计数内容就会保存在该Cell的位置上面,基于这种设计,最后的总计数需要结合base以及散落在Cell数组中的计数内容。





public static Entry entry(String name, int resourceType, EntryType type) throws BlockException {return Env.sph.entryWithType(name, resourceType, type, 1, OBJECTS0);


String resourceName = DubboUtils.getResourceName(invoker, invocation);
String interfaceName = invoker.getInterface().getName();


public static String getResourceName(Invoker<?> invoker, Invocation invocation) {StringBuilder buf = new StringBuilder(64);buf.append(invoker.getInterface().getName()).append(":").append(invocation.getMethodName()).append("(");boolean isFirst = true;for (Class<?> clazz : invocation.getParameterTypes()) {if (!isFirst) {buf.append(",");}buf.append(clazz.getName());isFirst = false;}buf.append(")");return buf.toString();}

sentinel整合dubbo支持dubbo service和dubbo service method两种级别的资源保护,资源名的定义如源码所述,这也是为什么官方文档要求必须这样定义流控规则的原因:

服务接口:resourceName 为 接口全限定名,如 com.alibaba.csp.sentinel.demo.dubbo.FooService
服务方法:resourceName 为 接口全限定名:方法签名,如 com.alibaba.csp.sentinel.demo.dubbo.FooService:sayHello(java.lang.String)

com.alibaba.csp.sentinel.SphU#entry(java.lang.String, int, com.alibaba.csp.sentinel.EntryType)

public static Entry entry(String name, int resourceType, EntryType type) throws BlockException {return Env.sph.entryWithType(name, resourceType, type, 1, OBJECTS0);

com.alibaba.csp.sentinel.CtSph#entryWithType(java.lang.String, int, com.alibaba.csp.sentinel.EntryType, int, boolean, java.lang.Object[])

public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,Object[] args) throws BlockException {StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);return entryWithPriority(resource, count, prioritized, args);

com.alibaba.csp.sentinel.CtSph#entryWithPriority(com.alibaba.csp.sentinel.slotchain.ResourceWrapper, int, boolean, java.lang.Object…)

   private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)throws BlockException {Context context = ContextUtil.getContext();if (context instanceof NullContext) {// The {@link NullContext} indicates that the amount of context has exceeded the threshold,// so here init the entry only. No rule checking will be done.return new CtEntry(resourceWrapper, null, context);}if (context == null) {// Using default context.context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);}// Global switch is close, no rule checking will do.if (!Constants.ON) {return new CtEntry(resourceWrapper, null, context);}ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);/** Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},* so no rule checking will be done.*/if (chain == null) {return new CtEntry(resourceWrapper, null, context);}Entry e = new CtEntry(resourceWrapper, chain, context);try {chain.entry(context, resourceWrapper, null, count, prioritized, args);} catch (BlockException e1) {e.exit(count, args);throw e1;} catch (Throwable e1) {// This should not happen, unless there are errors existing in Sentinel internal.RecordLog.info("Sentinel unexpected exception", e1);}return e;}


ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {ProcessorSlotChain chain = chainMap.get(resourceWrapper);if (chain == null) {synchronized (LOCK) {chain = chainMap.get(resourceWrapper);if (chain == null) {// Entry size limit.if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {return null;}chain = SlotChainProvider.newSlotChain();Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);newMap.putAll(chainMap);newMap.put(resourceWrapper, chain);chainMap = newMap;}}}return chain;}


 public static ProcessorSlotChain newSlotChain() {if (builder != null) {return builder.build();}resolveSlotChainBuilder();if (builder == null) {RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");builder = new DefaultSlotChainBuilder();}return builder.build();}


public class DefaultSlotChainBuilder implements SlotChainBuilder {@Overridepublic ProcessorSlotChain build() {ProcessorSlotChain chain = new DefaultProcessorSlotChain();chain.addLast(new NodeSelectorSlot());chain.addLast(new ClusterBuilderSlot());chain.addLast(new LogSlot());chain.addLast(new StatisticSlot());chain.addLast(new SystemSlot());chain.addLast(new AuthoritySlot());chain.addLast(new FlowSlot());chain.addLast(new DegradeSlot());return chain;}



AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)throws Throwable {super.fireEntry(context, resourceWrapper, t, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {super.fireExit(context, resourceWrapper, count, args);}
};public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)throws Throwable {first.transformEntry(context, resourceWrapper, t, count, prioritized, args);


void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)throws Throwable {T t = (T)o;entry(context, resourceWrapper, t, count, prioritized, args);}


public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)throws Throwable {super.fireEntry(context, resourceWrapper, t, count, prioritized, args);}


    private AbstractLinkedProcessorSlot<?> next = null;@Overridepublic void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)throws Throwable {if (next != null) {next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);}}



public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {/*** {@link DefaultNode}s of the same resource in different context.*/private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)throws Throwable {/** It's interesting that we use context name rather resource name as the map key.** Remember that same resource({@link ResourceWrapper#equals(Object)}) will share* the same {@link ProcessorSlotChain} globally, no matter in which context. So if* code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, Object...)},* the resource name must be same but context name may not.** If we use {@link com.alibaba.csp.sentinel.SphU#entry(String resource)} to* enter same resource in different context, using context name as map key can* distinguish the same resource. In this case, multiple {@link DefaultNode}s will be created* of the same resource name, for every distinct context (different context name) each.** Consider another question. One resource may have multiple {@link DefaultNode},* so what is the fastest way to get total statistics of the same resource?* The answer is all {@link DefaultNode}s with same resource name share one* {@link ClusterNode}. See {@link ClusterBuilderSlot} for detail.*/DefaultNode node = map.get(context.getName());if (node == null) {synchronized (this) {node = map.get(context.getName());if (node == null) {node = new DefaultNode(resourceWrapper, null);HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());cacheMap.putAll(map);cacheMap.put(context.getName(), node);map = cacheMap;}// Build invocation tree((DefaultNode)context.getLastNode()).addChild(node);}}context.setCurNode(node);fireEntry(context, resourceWrapper, node, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}


  1. 使用context.getName作为key是为了满足获取不同context下相同资源统计数据的需求
  2. 为了快速的统计不同context下同一resource的总和数据,提供了ClusterNode与ClusterBuilderSlot





public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {/*** <p>* Remember that same resource({@link ResourceWrapper#equals(Object)}) will share* the same {@link ProcessorSlotChain} globally, no matter in witch context. So if* code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, boolean, Object...)},* the resource name must be same but context name may not.* </p>* <p>* To get total statistics of the same resource in different context, same resource* shares the same {@link ClusterNode} globally. All {@link ClusterNode}s are cached* in this map.* </p>* <p>* The longer the application runs, the more stable this mapping will* become. so we don't concurrent map but a lock. as this lock only happens* at the very beginning while concurrent map will hold the lock all the time.* </p>*/private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();private static final Object lock = new Object();private volatile ClusterNode clusterNode = null;@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args)throws Throwable {if (clusterNode == null) {synchronized (lock) {if (clusterNode == null) {// Create the cluster node.clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));newMap.putAll(clusterNodeMap);newMap.put(node.getId(), clusterNode);clusterNodeMap = newMap;}}}node.setClusterNode(clusterNode);/** if context origin is set, we should get or create a new {@link Node} of* the specific origin.*/if (!"".equals(context.getOrigin())) {Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());context.getCurEntry().setOriginNode(originNode);}fireEntry(context, resourceWrapper, node, count, prioritized, args);}

same resourc shares the same {@link ClusterNode} globally. All {@link ClusterNode}s are cached

结合官方的注释不难看懂,由于同一个resource会拥有同一条slot chain,因此每条slot chain中的ClusterBuilderSlot实例持有的ClusterNode都是这个resource唯一对应的clusterNode,但同时这个类还维护了一个全局的静态变量clusterNodeMap,这个变量以资源对应的ResourceWrapper为key维护了系统中所有资源对应的ClusterNode,为什么要这样做官方肯定有使用到这个特性的地方,之后发现了也会和大家分享,这里先简单看下entry里的逻辑:

  1. 检查当前resource对应的culsterNode是否为空
  2. 为空则加锁创建,设置culsterNode变量,填充clusterNodeMap
  3. 最后一个有意思的地方,他会将clusterNode塞进context对应的Node(这里是DefaultNode类型)里,而clusterNode整合所有相同资源统计信号量的能力就是通过这一句简单的代码实现的,还记得前几节对于DefaultNode的分析吗,DefaultNode的所有信号量更新的操作都不止针对它自己,同时也同步更新了他持有的clusterNode,而所有相同资源的DefaultNode持有的clusterNode其实都是同一个,这样就做到了数据的共享,以increaseExceptionQps方法举例
 @Overridepublic void increaseExceptionQps(int count) {super.increaseExceptionQps(count);this.clusterNode.increaseExceptionQps(count);}




    @Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {checkFlow(resourceWrapper, context, node, count, prioritized);fireEntry(context, resourceWrapper, node, count, prioritized, args);}


void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)throws BlockException {checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);}


public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {if (ruleProvider == null || resource == null) {return;}Collection<FlowRule> rules = ruleProvider.apply(resource.getName());if (rules != null) {for (FlowRule rule : rules) {if (!canPassCheck(rule, context, node, count, prioritized)) {throw new FlowException(rule.getLimitApp(), rule);}}}}


public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {String limitApp = rule.getLimitApp();if (limitApp == null) {return true;}if (rule.isClusterMode()) {return passClusterCheck(rule, context, node, acquireCount, prioritized);}return passLocalCheck(rule, context, node, acquireCount, prioritized);}


private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);if (selectedNode == null) {return true;}return rule.getRater().canPass(selectedNode, acquireCount, prioritized);}

通过rule自身持有的rater去检查,rule会根据定义规则时声明的属性去生成对应的rater,这里同样不是我们研究的重点,我们随便看一个rater(com.alibaba.csp.sentinel.slots.block.flow.TrafficShapingController) canPass方法的实现:
com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController#canPass(com.alibaba.csp.sentinel.node.Node, int, boolean)

public boolean canPass(Node node, int acquireCount, boolean prioritized) {int curCount = avgUsedTokens(node);if (curCount + acquireCount > count) {if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {long currentTime;long waitInMs;currentTime = TimeUtil.currentTimeMillis();waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {node.addWaitingRequest(currentTime + waitInMs, acquireCount);node.addOccupiedPass(acquireCount);sleep(waitInMs);// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.throw new PriorityWaitException(waitInMs);}}return false;}return true;}


private int avgUsedTokens(Node node) {if (node == null) {return DEFAULT_AVG_USED_TOKENS;}return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());}



