//熔断器接口,用于实现Sentinel的熔断降级功能 public interface CircuitBreaker { //Get the associated circuit breaking rule. //获取当前熔断器对应的熔断降级规则 DegradeRule getRule(); //Acquires permission of an invocation only if it is available at the time of invoking. //尝试通过熔断器 //如果熔断器处于关闭状态(CLOSED),则允许请求通过; //如果处于打开状态(OPEN),则拒绝请求; //如果处于半开状态(HALF_OPEN),则根据规则允许部分请求通过; boolean tryPass(Context context); //Get current state of the circuit breaker. //获取当前熔断器的状态(OPEN, HALF_OPEN, CLOSED) State currentState(); //Record a completed request with the context and handle state transformation of the circuit breaker. //Called when a passed invocation finished. //在请求完成后调用此方法,用于更新熔断器的统计数据 void onRequestComplete(Context context); //Circuit breaker state. enum State { //In OPEN state, all requests will be rejected until the next recovery time point. //表示熔断器处于打开状态,此时会拒绝所有请求 OPEN, //In HALF_OPEN state, the circuit breaker will allow a "probe" invocation. //If the invocation is abnormal according to the strategy (e.g. it's slow), //the circuit breaker will re-transform to the OPEN state and wait for the next recovery time point; //otherwise the resource will be regarded as "recovered" and the circuit breaker will cease cutting off requests and transform to CLOSED state. //表示熔断器处于半开状态,此时允许部分请求通过,以检测系统是否已经恢复正常 HALF_OPEN, //In CLOSED state, all requests are permitted. //When current metric value exceeds the threshold, the circuit breaker will transform to OPEN state. //表示熔断器处于关闭状态,此时允许所有请求通过 CLOSED } } public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker { ... ... } public class ExceptionCircuitBreaker extends AbstractCircuitBreaker { ... ... }
@Spi(order = Constants.ORDER_STATISTIC_SLOT) public classStatisticSlotextendsAbstractLinkedProcessorSlot<DefaultNode> { @Overridepublicvoidentry(Contextcontext, ResourceWrapperresourceWrapper, DefaultNodenode, intcount, booleanprioritized, Object... args) throwsThrowable { ... //执行下一个ProcessorSlot,先进行规则验证等 fireEntry(context, resourceWrapper, node, count, prioritized, args); //如果通过了后面ProcessorSlot的验证 //则将处理当前资源resourceWrapper的线程数+1 以及 将对当前资源resourceWrapper的成功请求数+1 node.increaseThreadNum(); node.addPassRequest(count); ... } } //A Node used to hold statistics for specific resource name in the specific context. //Each distinct resource in each distinct Context will corresponding to a DefaultNode. //This class may have a list of sub DefaultNodes. //Child nodes will be created when calling SphU.entry() or SphO.entry() multiple times in the same Context. public class DefaultNode extends StatisticNode { //Associated cluster node. private ClusterNode clusterNode; ... //DefaultNode会统计名字相同的Context下的某个资源的调用数据,按照单机里的资源维度进行调用数据统计 //EntranceNode会统计名字相同的Context下的全部资源的调用数据,按接口维度来统计调用数据,即统计接口下所有资源的调用情况 //ClusterNode会统计某个资源在全部Context下的调用数据,按照集群中的资源维度进行调用数据统计 @Override public void addPassRequest(int count) { //增加当前资源对应的DefaultNode中的数据 super.addPassRequest(count); //增加当前资源对应的ClusterNode中的全局统计数据 this.clusterNode.addPassRequest(count); } ... } //The statistic node keep three kinds of real-time statistics metrics: //1.metrics in second level rollingCounterInSecond //2.metrics in minute level rollingCounterInMinute //3.thread count //Sentinel use sliding window to record and count the resource statistics in real-time. //The sliding window infrastructure behind the ArrayMetric is LeapArray. //case 1: When the first request comes in, //Sentinel will create a new window bucket of a specified time-span to store running statics, //such as total response time(rt), incoming request(QPS), block request(bq), etc. //And the time-span is defined by sample count. // 0 100ms // +-------+--→ Sliding Windows // ^ // | // request //Sentinel use the statics of the valid buckets to decide whether this request can be passed. //For example, if a rule defines that only 100 requests can be passed, //it will sum all qps in valid buckets, and compare it to the threshold defined in rule. //case 2: continuous requests // 0 100ms 200ms 300ms // +-------+-------+-------+-----→ Sliding Windows // ^ // | // request //case 3: requests keeps coming, and previous buckets become invalid // 0 100ms 200ms 800ms 900ms 1000ms 1300ms // +-------+-------+ ...... +-------+-------+ ...... +-------+-----→ Sliding Windows // ^ // | // request //The sliding window should become: // 300ms 800ms 900ms 1000ms 1300ms // + ...... +-------+ ...... +-------+-----→ Sliding Windows // ^ // | // request public class StatisticNode implements Node { //Holds statistics of the recent INTERVAL milliseconds. //The INTERVAL is divided into time spans by given sampleCount. //定义一个保存数据的ArrayMetric,指定了样本窗口数量默认为2(SAMPLE_COUNT),指定了时间窗口长度默认为1000ms(INTERVAL) private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); //Holds statistics of the recent 60 seconds. //The windowLengthInMs is deliberately set to 1000 milliseconds, //meaning each bucket per second, in this way we can get accurate statistics of each second. private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); ... @Override public void addPassRequest(int count) { //调用ArrayMetric.addPass()方法,根据当前请求增加计数 rollingCounterInSecond.addPass(count); rollingCounterInMinute.addPass(count); } ... } //The basic metric class in Sentinel using a BucketLeapArray internal. public class ArrayMetric implements Metric { //用于存储统计数据 private final LeapArray<MetricBucket> data; ... @Override public void addPass(int count) { //1.通过LeapArray.currentWindow()方法获取当前时间所在的样本窗口 WindowWrap<MetricBucket> wrap = data.currentWindow(); //2.调用MetricBucket.addPass()方法将当前请求的计数量添加到样本窗口的统计数据中 wrap.value().addPass(count); } ... } //Basic data structure for statistic metrics in Sentinel. //Leap array use sliding window algorithm to count data. //Each bucket cover windowLengthInMs time span, and the total time span is intervalInMs, //so the total bucket amount is: sampleCount = intervalInMs / windowLengthInMs. public abstract classLeapArray<T> { //样本窗口的长度 protectedintwindowLengthInMs; //一个滑动窗口包含的样本窗口数量,公式 intervalInMs / windowLengthInMs,也就是滑动窗口长度 / 样本窗口长度 protected int sampleCount; //滑动窗口长度 protected int intervalInMs; //也是滑动窗口长度,只是单位为s private double intervalInSecond; //WindowWrap是样本窗口类,它是一个数组,泛型T实际类型为MetricBucket //LeapArray类似于一个样本窗口管理类,而真正的样本窗口类是WindowWrap<T> protected final AtomicReferenceArray<WindowWrap<T>> array; //The total bucket count is: sampleCount = intervalInMs / windowLengthInMs. //@param sampleCount bucket count of the sliding window //@param intervalInMs the total time interval of this LeapArray in milliseconds public LeapArray(int sampleCount, int intervalInMs) { ... this.windowLengthInMs = intervalInMs / sampleCount;//默认为500ms this.intervalInMs = intervalInMs;//默认为1000ms this.intervalInSecond = intervalInMs / 1000.0;//默认为1 this.sampleCount = sampleCount;//默认为2 this.array = new AtomicReferenceArray<>(sampleCount); } //Get the bucket at current timestamp. //获取当前时间点所在的样本窗口 public WindowWrap<T> currentWindow() { return currentWindow(TimeUtil.currentTimeMillis()); } ... } //Wrapper entity class for a period of time window. //样本窗口类,泛型T比如是MetricBucket public class WindowWrap<T> { //Time length of a single window bucket in milliseconds. //单个样本窗口的长度 private final long windowLengthInMs; //Start timestamp of the window in milliseconds. //样本窗口的起始时间戳 private long windowStart; //Statistic data. //当前样本窗口的统计数据,类型为MetricBucket private T value; ... //返回比如MetricBucket对象 public T value() { return value; } } //Represents metrics data in a period of time span. //统计数据的封装类 public class MetricBucket { //统计的数据会存放在LongAdder数组里 //使用数组而不直接使用"LongAdder+1"是因为: //由于统计的数据是多维度的,并且MetricEvent枚举类定义了这些维度类型 //因此将MetricEvent维度类型枚举值对应的序号映射成数组索引,巧妙地将多维度的数据定义在LongAdder数组中 private final LongAdder[] counters; private volatile long minRt; public MetricBucket() { MetricEvent[] events = MetricEvent.values(); this.counters = new LongAdder[events.length]; for (MetricEvent event : events) { counters[event.ordinal()] = new LongAdder(); } initMinRt(); } private void initMinRt() { this.minRt = SentinelConfig.statisticMaxRt(); } public void addPass(int n) { add(MetricEvent.PASS, n); } public MetricBucket add(MetricEvent event, long n) { //统计数据并存储到counters中 counters[event.ordinal()].add(n); return this; } ... } public enum MetricEvent { PASS, BLOCK, EXCEPTION, SUCCESS, RT, OCCUPIED_PASS }
public abstract classLeapArray<T> { //样本窗口的长度 protectedintwindowLengthInMs; //一个滑动窗口包含的样本窗口数量,公式 intervalInMs / windowLengthInMs,也就是滑动窗口长度 / 样本窗口长度 protected int sampleCount; //滑动窗口长度 protected int intervalInMs; //也是滑动窗口长度,只是单位为s private double intervalInSecond; //WindowWrap是样本窗口类,它是一个数组,泛型T实际类型为MetricBucket //LeapArray类似于一个样本窗口管理类,而真正的样本窗口类是WindowWrap<T> protected final AtomicReferenceArray<WindowWrap<T>> array; ... //假设timeMillis = 1600,windowLengthInMs = 500,array.length = 2,那么timeId = 3,返回1 private int calculateTimeIdx(/*@Valid*/ long timeMillis) { long timeId = timeMillis / windowLengthInMs; //Calculate current index so we can map the timestamp to the leap array. return (int)(timeId % array.length()); } //假设timeMillis = 1600,windowLengthInMs = 500,那么返回1500 protected long calculateWindowStart(/*@Valid*/ long timeMillis) { return timeMillis - timeMillis % windowLengthInMs; } //Get bucket item at provided timestamp. public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } //计算当前时间所在的样本窗口id,也就是样本窗口的下标,即计算在数组LeapArray中的下标 int idx = calculateTimeIdx(timeMillis); //Calculate current bucket start time. //计算当前样本窗口的开始时间点 long windowStart = calculateWindowStart(timeMillis); //Get bucket item at given time from the array. //(1) Bucket is absent, then just create a new bucket and CAS update to circular array. //(2) Bucket is up-to-date, then just return the bucket. //(3) Bucket is deprecated, then reset current bucket. while (true) { //获取当前时间所在的样本窗口 WindowWrap<T> old = array.get(idx); //如果当前时间所在的样本窗口为null,则需要创建 if (old == null) { //创建一个时间窗口 // B0 B1 B2 NULL B4 // ||_______|_______|_______|_______|_______||___ // 200 400 600 800 1000 1200 timestamp // ^ // time=888 // bucket is empty, so create new and update //If the old bucket is absent, then we create a new bucket at windowStart, //then try to update circular array via a CAS operation. //Only one thread can succeed to update, while other threads yield its time slice. WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); //通过CAS将新创建的窗口放入到LeapArray中 if (array.compareAndSet(idx, null, window)) { //Successfully updated, return the created bucket. return window; } else { //Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } //如果当前样本窗口的起始时间与计算出的样本窗口起始时间相同,则说明这两个是同一个样本窗口,直接获取就行 else if (windowStart == old.windowStart()) { // B0 B1 B2 B3 B4 // ||_______|_______|_______|_______|_______||___ // 200 400 600 800 1000 1200 timestamp // ^ // time=888 // startTime of Bucket 3: 800, so it's up-to-date //If current windowStart is equal to the start timestamp of old bucket, //that means the time is within the bucket, so directly return the bucket. return old; } //如果当前样本窗口的起始时间大于计算出的样本窗口起始时间,则说明计算出来的样本窗口已经过时了,需要将原来的样本窗口替换为新的样本窗口 //数组的环形数组,不是无限长的,比如存1s,1000个样本窗口,那么下1s的1000个时间窗口会覆盖上一秒的 else if (windowStart > old.windowStart()) { // (old) // B0 B1 B2 NULL B4 // |_______||_______|_______|_______|_______|_______||___ // ... 1200 1400 1600 1800 2000 2200 timestamp // ^ // time=1676 // startTime of Bucket 2: 400, deprecated, should be reset //If the start timestamp of old bucket is behind provided time, that means the bucket is deprecated. //We have to reset the bucket to current windowStart. //Note that the reset and clean-up operations are hard to be atomic, //so we need a update lock to guarantee the correctness of bucket update. //The update lock is conditional (tiny scope) and will take effect only when bucket is deprecated, //so in most cases it won't lead to performance loss. if (updateLock.tryLock()) { try { //Successfully get the update lock, now we reset the bucket. //替换老的样本窗口 return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { //Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } //如果当前样本窗口的起始时间小于计算出的样本窗口起始时间 //这种情况一般不会出现,因为时间不会倒流,除非人为修改系统时间导致时钟回拨 else if (windowStart < old.windowStart()) { //Should not go through here, as the provided time is already behind. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } } ... }