//资源类 public final class ResourceTypeConstants { //默认类型 public static final int COMMON = 0; //Web类型,也就是最常见的HTTP类型 public static final int COMMON_WEB = 1; //RPC类型,如Dubbo RPC,Grpc,Thrift等 public static final int COMMON_RPC = 2; //API网关 public static final int COMMON_API_GATEWAY = 3; //数据库SQL操作 public static final int COMMON_DB_SQL = 4; }
//资源类(资源包装类) public class ResourceWrapper { //资源名称 protected final String name; //资源类型:入口流量还是出口流量 protected final EntryType entryType; //请求类型:HTTP类型、RPC类型、API网关 protected final int resourceType; public ResourceWrapper(String name, EntryType entryType, int resourceType) { this.name = name; this.entryType = entryType; this.resourceType = resourceType; } }
接着,将这个ResourceWrapper资源类放入到Entry资源访问类中:
publicclassEntry { //封装了:名称(name)、请求类型(entryType)以及资源类型(resourceType)三个字段 protected final ResourceWrapper resourceWrapper; public Entry(ResourceWrapper resourceWrapper) { this.resourceWrapper = resourceWrapper; } }
类图如下:
同时,还需记录资源访问的开始时间和完成时间:
publicclassEntry { //资源访问的开始时间 private final long createTimestamp; //资源访问的完成时间 private long completeTimestamp; //封装了:名称(name)、请求类型(entryType)以及资源类型(resourceType)三个字段 protected final ResourceWrapper resourceWrapper; public Entry(ResourceWrapper resourceWrapper) { this.resourceWrapper = resourceWrapper; //给开始时间赋值为当前系统时间 this.createTimestamp = TimeUtil.currentTimeMillis(); } }
//用于统计资源的各项指标数据 public interface Node { //总的请求数 long totalRequest(); //请求成功数 long totalSuccess(); ... }
于是,Entry资源访问类变成如下:
public class Entry { //资源访问的开始时间 private final long createTimestamp; //资源访问的完成时间 private long completeTimestamp; //统计资源的各项数据指标 private Node curNode; //封装了:名称(name)、请求类型(entryType)以及资源类型(resourceType)三个字段 protected final ResourceWrapper resourceWrapper; public Entry(ResourceWrapper resourceWrapper) { this.resourceWrapper = resourceWrapper; //给开始时间赋值为当前系统时间 this.createTimestamp = TimeUtil.currentTimeMillis(); } }
//Each SphU.entry() will return an Entry. //This class holds information of current invocation: //createTime, the create time of this entry, using for rt statistics. //current Node, that is statistics of the resource in current context. //origin Node, that is statistics for the specific origin. //Usually the origin could be the Service Consumer's app name, see ContextUtil.enter(String name, String origin). //ResourceWrapper, that is resource name. //A invocation tree will be created if we invoke SphU.entry() multi times in the same Context, //so parent or child entry may be held by this to form the tree. //Since Context always holds the current entry in the invocation tree, //every Entry.exit() call should modify Context.setCurEntry(Entry) as parent entry of this. public abstract class Entry { //资源访问的开始时间 private final long createTimestamp; //资源访问的完成时间 private long completeTimestamp; //统计资源的各项数据指标 private Node curNode; //异常 private BlockException blockError; //封装了:名称(name)、请求类型(entryType)以及资源类型(resourceType)三个字段 protected final ResourceWrapper resourceWrapper; public Entry(ResourceWrapper resourceWrapper) { this.resourceWrapper = resourceWrapper; //给开始时间赋值为当前系统时间 this.createTimestamp = TimeUtil.currentTimeMillis(); } }
//Holds real-time statistics for resources. public interface Node extends OccupySupport, DebugSupport { ... } public class StatisticNode implements Node { ... } //A Node used to hold statistics for specific resource name in the specific context. //Each distinct resource in each distinct Context will corresponding to a DefaultNode. //This class may have a list of sub DefaultNodes. //Child nodes will be created when calling SphU.entry() multiple times in the same Context. public class DefaultNode extends StatisticNode { ... } //A Node represents the entrance of the invocation tree. //One Context will related to a EntranceNode, which represents the entrance of the invocation tree. //New EntranceNode will be created if current context does't have one. //Note that same context name will share same EntranceNode globally. public class EntranceNode extends DefaultNode { ... } //This class stores summary runtime statistics of the resource, including rt, thread count, qps and so on. //Same resource shares the same ClusterNode globally, no matter in which Context. public class ClusterNode extends StatisticNode { ... } //The ClusterNode is uniquely identified by the ResourceId. //The DefaultNode is identified by both the resource id and Context. //In other words, one resource id will generate multiple DefaultNode for each distinct context, //but only one ClusterNode.
//The fundamental SentinelAPI for recording statistics and performing rule checking for resources. public classSphU { private static final Object[] OBJECTS0 = new Object[0]; ... //Record statistics and perform rule checking for the given resource. //@param name the unique name of the protected resource public static Entry entry(String name) throws BlockException { //调用CtSph.entry()方法创建一个Entry资源访问对象,默认的请求类型为OUT return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0); } ... } //SentinelEnv. Thisclass will trigger all initialization for Sentinel. public classEnv { //创建一个CtSph对象 public static final Sph sph = new CtSph(); static { //If init fails, the process will exit. InitExecutor.doInit(); } } public classCtSph implements Sph { ... //Record statistics and perform rule checking for the given resource. //@param name the unique name for the protected resource //@param type the traffic type (inbound, outboundorinternal). //This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule //@param count the amount of calls within the invocation (e.g. batchCount=2 meansrequestfor 2 tokens) //@param args args for parameter flow control or customized slots @Override public Entry entry(Stringname, EntryTypetype, intcount, Object... args) throws BlockException { //StringResourceWrapper是ResourceWrapper的子类,且StringResourceWrapper的构造方法默认了资源类型为COMMONStringResourceWrapperresource = newStringResourceWrapper(name, type); returnentry(resource, count, args); } //Do all {@linkRule}s checking about the resource. public Entry entry(ResourceWrapperresourceWrapper, intcount, Object... args) throws BlockException { //调用CtSph.entryWithPriority()方法,执行如下处理: //初始化Context -> 将Context与线程绑定 -> 初始化Entry -> 将Context和ResourceWrapper放入Entry中 returnentryWithPriority(resourceWrapper, count, false, args); } private Entry entryWithPriority(ResourceWrapperresourceWrapper, intcount, booleanprioritized, Object... args) throws BlockException { //从当前线程中获取ContextContextcontext = ContextUtil.getContext(); if (contextinstanceofNullContext) { returnnewCtEntry(resourceWrapper, null, context); } //如果没获取到Context if (context == null) { //Usingdefaultcontext. //创建一个名为sentinel_default_context的Context,并且与当前线程绑定 context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } //Global switch is close, no rule checking will do. if (!Constants.ON) { returnnewCtEntry(resourceWrapper, null, context); } //调用CtSph.lookProcessChain()方法初始化处理链(处理器插槽链条) ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); if (chain == null) { returnnewCtEntry(resourceWrapper, null, context); } //创建出一个Entry资源访问对象,将处理链(处理器插槽链条)、Context与Entry资源访问对象绑定 //其中会将Entry的三个基础属性(封装在resourceWrapper里)以及当前Entry所属的Context作为参数传入CtEntry的构造方法 Entry e = new CtEntry(resourceWrapper, chain, context); try { //处理链(处理器插槽链条)入口,负责采集数据,规则验证 //调用DefaultProcessorSlotChain.entry()方法执行处理链每个节点的逻辑(数据采集+规则验证) chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockExceptione1) { //规则验证失败,比如:被流控、被熔断降级、触发黑白名单等 e.exit(count, args); throwe1; } catch (Throwablee1) { RecordLog.info("Sentinelunexpectedexception", e1); } return e; } ... private final static class InternalContextUtil extends ContextUtil { staticContextinternalEnter(Stringname) { //调用ContextUtil.trueEnter()方法创建一个Context对象 returntrueEnter(name, ""); } static Context internalEnter(Stringname, Stringorigin) { returntrueEnter(name, origin); } } } public class StringResourceWrapper extends ResourceWrapper { publicStringResourceWrapper(Stringname, EntryTypee) { //调用父类构造方法,且默认资源类型为COMMONsuper(name, e, ResourceTypeConstants.COMMON); } ... }
publicclassCtSph implements Sph { ... //Record statistics and perform rule checking for the given resource. //@param name the unique name for the protected resource //@param type the traffic type (inbound, outboundorinternal). //This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule //@param count the amount of calls within the invocation (e.g. batchCount=2 meansrequestfor 2 tokens) //@param args args for parameter flow control or customized slots @Override public Entry entry(Stringname, EntryTypetype, intcount, Object... args) throws BlockException { //StringResourceWrapper是ResourceWrapper的子类,且StringResourceWrapper的构造方法默认了资源类型为COMMONStringResourceWrapperresource = newStringResourceWrapper(name, type); returnentry(resource, count, args); } //Do all {@linkRule}s checking about the resource. public Entry entry(ResourceWrapperresourceWrapper, intcount, Object... args) throws BlockException { //调用CtSph.entryWithPriority()方法,执行如下处理: //初始化Context -> 将Context与线程绑定 -> 初始化Entry -> 将Context和ResourceWrapper放入Entry中 returnentryWithPriority(resourceWrapper, count, false, args); } private Entry entryWithPriority(ResourceWrapperresourceWrapper, intcount, booleanprioritized, Object... args) throws BlockException { //从当前线程中获取ContextContextcontext = ContextUtil.getContext(); if (contextinstanceofNullContext) { returnnewCtEntry(resourceWrapper, null, context); } //如果没获取到Context if (context == null) { //Usingdefaultcontext. //创建一个名为sentinel_default_context的Context,并且与当前线程绑定 context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } //Global switch is close, no rule checking will do. if (!Constants.ON) { returnnewCtEntry(resourceWrapper, null, context); } //调用CtSph.lookProcessChain()方法初始化处理链(处理器插槽链条) ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); if (chain == null) { returnnewCtEntry(resourceWrapper, null, context); } //创建出一个Entry对象,将处理链(处理器插槽链条)、Context与Entry绑定 //其中会将Entry的三个基础属性(封装在resourceWrapper里)以及当前Entry所属的Context作为参数传入CtEntry的构造方法 Entry e = new CtEntry(resourceWrapper, chain, context); try { //处理链(处理器插槽链条)入口,负责采集数据,规则验证 //调用DefaultProcessorSlotChain.entry()方法执行处理链每个节点的逻辑(数据采集+规则验证) chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockExceptione1) { //规则验证失败,比如:被流控、被熔断降级、触发黑白名单等 e.exit(count, args); throwe1; } catch (Throwablee1) { RecordLog.info("Sentinelunexpectedexception", e1); } return e; } ... private final static class InternalContextUtil extends ContextUtil { staticContextinternalEnter(Stringname) { //调用ContextUtil.trueEnter()方法创建一个Context对象 returntrueEnter(name, ""); } static Context internalEnter(Stringname, Stringorigin) { returntrueEnter(name, origin); } } }
publicclass CtSph implements Sph { //Same resource will share the same ProcessorSlotChain}, no matter in which Context. //Same resource is that ResourceWrapper#equals(Object). private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(); private static final Object LOCK = new Object(); ... //Get ProcessorSlotChain of the resource. //new ProcessorSlotChain will be created if the resource doesn't relate one. //Same resource will share the same ProcessorSlotChain globally, no matter in which Context. //Same resource is that ResourceWrapper#equals(Object). //Note that total ProcessorSlot count must not exceed Constants.MAX_SLOT_CHAIN_SIZE, otherwise null will return. ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) { //操作chainMap时才加锁 synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) {//Double Check //Entry size limit. if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } //初始化处理链(处理器插槽链条) chain = SlotChainProvider.newSlotChain(); //写时复制 Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; } ... } //A provider for creating slot chains via resolved slot chain builder SPI. public final class SlotChainProvider { private static volatile SlotChainBuilder slotChainBuilder = null; //The load and pick process is not thread-safe, //but it's okay since the method should be only invoked via CtSph.lookProcessChain() under lock. public static ProcessorSlotChain newSlotChain() { //如果存在,则直接返回 if (slotChainBuilder != null) { return slotChainBuilder.build(); } //Resolve the slot chain builder SPI. //第一次使用SPI: 通过SPI机制初始化SlotChainBuilder slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault(); if (slotChainBuilder == null) { //Should not go through here. RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); slotChainBuilder = new DefaultSlotChainBuilder(); } else { RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", slotChainBuilder.getClass().getCanonicalName()); } return slotChainBuilder.build(); } private SlotChainProvider() {} } public final class SpiLoader<S> { //Cache the SpiLoader instances, key: classname of Service, value: SpiLoader instance private static final ConcurrentHashMap<String, SpiLoader> SPI_LOADER_MAP = new ConcurrentHashMap<>(); //Cache the classes of Provider private final List<Class<? extends S>> classList = Collections.synchronizedList(new ArrayList<Class<? extends S>>()); //Cache the sorted classes of Provider private final List<Class<? extends S>> sortedClassList = Collections.synchronizedList(new ArrayList<Class<? extends S>>()); ... //Create SpiLoader instance via Service class Cached by className, and load from cache first public static <T> SpiLoader<T> of(Class<T> service) { AssertUtil.notNull(service, "SPI class cannot be null"); AssertUtil.isTrue(service.isInterface() || Modifier.isAbstract(service.getModifiers()), "SPI class[" + service.getName() + "] must be interface or abstract class"); String className = service.getName(); SpiLoader<T> spiLoader = SPI_LOADER_MAP.get(className); if (spiLoader == null) { synchronized (SpiLoader.class) { spiLoader = SPI_LOADER_MAP.get(className); if (spiLoader == null) {//Double Check SPI_LOADER_MAP.putIfAbsent(className, new SpiLoader<>(service)); spiLoader = SPI_LOADER_MAP.get(className); } } } return spiLoader; } //Load the first-found Provider instance,if not found, return default Provider instance public S loadFirstInstanceOrDefault() { //SPI机制加载Class,然后将加载的Class放到classList数组里 load(); //循环遍历,根据classList里的Class来初始化对应的实例 for (Class<? extends S> clazz : classList) { if (defaultClass == null || clazz != defaultClass) { return createInstance(clazz); } } //初始化默认的DefaultSlotChainBuilder return loadDefaultInstance(); } //Load all Provider instances of the specified Service, sorted by order value in class's {@link Spi} annotation public List<S> loadInstanceListSorted() { //比如读取com.alibaba.csp.sentinel.slotchain.ProcessorSlot文件里的Class名字 //然后根据这些Class名字加载Class //接着将Class放到sortedClassList集合中 load(); //实例化sortedClassList集合里的每个Class return createInstanceList(sortedClassList); } ... } //Builder for a default {@link ProcessorSlotChain}. @Spi(isDefault = true) public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); //通过SPI机制加载责任链的节点ProcessorSlot实现类 //然后按照@Spi注解的order属性进行排序并进行实例化 //最后将ProcessorSlot实例放到sortedSlotList中 List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted(); //遍历已排好序的ProcessorSlot集合 for (ProcessorSlot slot : sortedSlotList) { //安全检查,防止业务系统也写了一个SPI文件,但没按规定继承AbstractLinkedProcessorSlot if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); continue; } //调用DefaultProcessorSlotChain.addLast()方法构建单向链表 //将责任链的节点ProcessorSlot实例放入DefaultProcessorSlotChain中 chain.addLast((AbstractLinkedProcessorSlot<?>) slot); } //返回单向链表 return chain; } }