public classRateLimiterControllerimplementsTrafficShapingController { //排队等待的意思是超出阈值后等待一段时间,maxQueueingTimeMs就是请求在队列中的最大等待时间 privatefinalintmaxQueueingTimeMs; //流控规则中限制QPS的阈值,也就是QPS超出多少后会进行限制 private final double count; //最近允许一个请求通过的时间,每次请求通过后就会更新此时间,可以根据该时间计算出当前请求最早的预期通过时间 //注意:Sentinel是在业务前面的,尽量不要让业务受到Sentinel的影响,所以不需要等请求完全被处理完,才确定请求被通过的时间 private final AtomicLong latestPassedTime = new AtomicLong(-1); public RateLimiterController(int timeOut, double count) { this.maxQueueingTimeMs = timeOut; this.count = count; } @Override public boolean canPass(Node node, int acquireCount) { return canPass(node, acquireCount, false); } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { //Pass when acquire count is less or equal than 0. //acquireCount代表每次从桶底流出多少个请求 //如果acquireCount小于等于0,则表示无需限流直接通过,不过acquireCount一般默认是1 if (acquireCount <= 0) { return true; } //Reject when count is less or equal than 0. //Otherwise, the costTime will be max of long and waitTime will overflow in some cases. //如果限流规则的count(即限制QPS的阈值)小于等于0,则直接拒绝,相当于一个请求也不能放行 if (count <= 0) { return false; } //1.首先获取系统的当前时间 long currentTime = TimeUtil.currentTimeMillis(); //Calculate the interval between every two requests. //2.然后计算,在满足流控规则中限制的QPS阈值count的情况下,先后的两个请求被允许通过时的最小间隔时间(假设请求是单线程处理的) long costTime = Math.round(1.0 * (acquireCount) / count * 1000); //Expected pass time of this request. //3.接着计算当前请求最早的预期通过时间 = 满足QPS阈值下的两个请求的最小时间间隔 + 上次请求的通过时间 long expectedTime = costTime + latestPassedTime.get(); //4.最后判断当前请求最早的预期通过时间是否比系统当前时间小 if (expectedTime <= currentTime) {//等价于没有超出QPS阈值 //当前请求最早的预期通过时间比系统当前时间小 //如果在此时(currentTime)通过当前请求,那么当前请求的实际通过时间就比它最早的预期通过时间(expectedTime)要晚 //也就是当前请求和最近通过的请求的时间间隔变大了,所以此时不会超QPS阈值,返回true允许通过 //Contention may exist here, but it's okay. //latestPassedTime并不会影响costTime,也就是说,多个线程可以并发执行到这里而不受阈值的影响 //这意味着,Sentinel流量控制的漏桶算法,只能限制在costTime时间内的流量激增,限制不了costTime时间外的流量激增 //比如系统启动完的那一瞬间就涌入超出QPS阈值的并发请求,此时的这种流量激增是限制不了的 //又比如系统正常运行时处理完了正常流量的最后一个请求,隔了costTime+的时间后,突然涌入超出QPS阈值的并发请求,此时也限制不了 //只能限制住这样的一种情况:系统正常运行处理完正常流量的最后一个请求,隔了costTime-的时间,突然涌入超出QPS阈值的并发请求 latestPassedTime.set(currentTime); return true; } else {//等价于超出了QPS阈值 //当前请求最早的预期通过时间比系统当前时间大 //如果在此时(currentTime)通过当前请求,那么当前请求的实际通过时间就比它最早的预期通过时间(expectedTime)要早 //也就是当前请求和最近通过的请求的时间间隔变小了,比最小间隔时间costTime还小 //所以此时必然会超QPS阈值,因此返回进行等待或者返回false不允许通过 //而等待的最小时间,就是最近通过请求的时间 + 先后两个请求允许通过时的最小间隔时间 - 当前时间 ... } } }
public class TokenBucket { //令牌桶的容量 private final int capacity; //令牌生成速度,也就是每秒产生多少个令牌 private final int tokensPerSecond; //当前桶内的令牌数量 private final AtomicInteger currentTokens; //最后一次令牌生成时间 private final AtomicLong lastRefillTime; //初始化 public TokenBucket(int capacity, int tokensPerSecond) { this.capacity = capacity; this.tokensPerSecond = tokensPerSecond; this.currentTokens = new AtomicInteger(capacity); this.lastRefillTime = new AtomicLong(System.currentTimeMillis()); } //true:放行;false:限流 public synchronized boolean tryAcquire(int tokens) { //填充令牌 refill(); //规则判断 return currentTokens.addAndGet(-tokens) >= 0; } //填充令牌 private void refill() { //获取当前系统时间 long currentTime = System.currentTimeMillis(); //用当前系统时间 - 上一次令牌生成时间 得出两次生成令牌需要间隔多久ms long timeSinceLastRefill = currentTime - lastRefillTime.get(); //得出上一次令牌生成时间到现在这段时间内,应该生成多少令牌 int tokensToAdd = (int) (timeSinceLastRefill * tokensPerSecond / 1000); if (tokensToAdd > 0) { //更新当前令牌数 int newTokenCount = Math.min(capacity, currentTokens.get() + tokensToAdd); currentTokens.set(newTokenCount); //更新上一次令牌生成时间为当前系统时间 lastRefillTime.set(currentTime); } } }
(2)WarmUpController使用的预热模型
Sentinel中的令牌桶算法,参考了Guava的RateLimiter。
//The principle idea comes from Guava. //However, the calculation of Guava is rate-based, which means that we need to translate rate to QPS. //这个原理来自于Guava; //然而,Guava的计算是基于速率的,这意味着我们需要将速率转换为QPS; //Requests arriving at the pulse may drag down long idle systems even though it has a much larger handling capability in stable period. //It usually happens in scenarios that require extra time for initialization, //e.g. DB establishes a connection, connects to a remote service, and so on. //That's why we need "warm up". //突发式的流量可能会拖累一个长期空闲的系统,即使这个系统在稳定阶段具有更大的流量处理能力; //这通常发生在需要额外时间进行初始化的场景中,比如DB建立连接、连接到远程服务等; //这就是为什么我们需要对系统进行"预热"; //Sentinel's "warm-up" implementation is based on the Guava's algorithm. //However, Guava’s implementation focuses on adjusting the request interval, which is similar to leaky bucket. //Sentinel pays more attention to controlling the count of incoming requests per second without calculating its interval, //which resembles token bucket algorithm. //Sentinel的"预热"实现是基于Guava的算法的; //然而,Guava的实现侧重于调整请求间隔,这类似于漏桶; //而Sentinel更注重控制每秒传入请求的数量,而不计算其间隔,这类似于令牌桶算法; //The remaining tokens in the bucket is used to measure the system utility. //Suppose a system can handle b requests per second. //Every second b tokens will be added into the bucket until the bucket is full. //And when system processes a request, it takes a token from the bucket. //The more tokens left in the bucket, the lower the utilization of the system; //when the token in the token bucket is above a certain threshold, //we call it in a "saturation" state. //桶中存储的令牌是用来测量系统的实用程序的; //假设一个系统每秒可以处理b个请求; //那么每秒就有b个令牌被添加到桶中,直到桶满为止; //当系统处理一个请求时,就会从桶中获取一个令牌; //桶中存储的令牌剩余得越多,那么就说明系统的利用率就越低; //当令牌桶中的令牌数高于某个阈值时,我们称之为"饱和"状态; //Base on Guava’s theory, there is a linear equation we can write this in the form //y = m * x + b where y (a.k.a y(x)), or qps(q)), //is our expected QPS given a saturated period (e.g. 3 minutes in), //m is the rate of change from our cold (minimum) rate to our stable (maximum) rate, //x (or q) is the occupied token. //根据Guava的理论,有一个线性方程,我们可以把它写成y = m * x + b; //这是在给定饱和周期(例如3分钟)的情况下预期的QPS; //m是从我们的冷(最小)速率到我们的稳定(最大)速率的变化率; //x(或q)就是需要被占用的令牌数; public class WarmUpController implements TrafficShapingController { ... ... }
//The principle idea comes from Guava. //However, the calculation of Guava is rate-based, which means that we need to translate rate to QPS. //这个原理来自于Guava; //然而,Guava的计算是基于速率的,这意味着我们需要将速率转换为QPS; //Requests arriving at the pulse may drag down long idle systems even though it has a much larger handling capability in stable period. //It usually happens in scenarios that require extra time for initialization, //e.g. DB establishes a connection, connects to a remote service, and so on. //That's why we need "warm up". //突发式的流量可能会拖累一个长期空闲的系统,即使这个系统在稳定阶段具有更大的流量处理能力; //这通常发生在需要额外时间进行初始化的场景中,比如DB建立连接、连接到远程服务等; //这就是为什么我们需要对系统进行"预热"; //Sentinel's "warm-up" implementation is based on the Guava's algorithm. //However, Guava’s implementation focuses on adjusting the request interval, which is similar to leaky bucket. //Sentinel pays more attention to controlling the count of incoming requests per second without calculating its interval, //which resembles token bucket algorithm. //Sentinel的"预热"实现是基于Guava的算法的; //然而,Guava的实现侧重于调整请求间隔,这类似于漏桶; //而Sentinel更注重控制每秒传入请求的数量,而不计算其间隔,这类似于令牌桶算法; //The remaining tokens in the bucket is used to measure the system utility. //Suppose a system can handle b requests per second. //Every second b tokens will be added into the bucket until the bucket is full. //And when system processes a request, it takes a token from the bucket. //The more tokens left in the bucket, the lower the utilization of the system; //when the token in the token bucket is above a certain threshold, //we call it in a "saturation" state. //桶中存储的令牌是用来测量系统的实用程序的; //假设一个系统每秒可以处理b个请求; //那么每秒就有b个令牌被添加到桶中,直到桶满为止; //当系统处理一个请求时,就会从桶中获取一个令牌; //桶中存储的令牌剩余得越多,那么就说明系统的利用率就越低; //当令牌桶中的令牌数高于某个阈值时,我们称之为"饱和"状态; //Base on Guava’s theory, there is a linear equation we can write this in the form //y = m * x + b where y (a.k.a y(x)), or qps(q)), //is our expected QPS given a saturated period (e.g. 3 minutes in), //m is the rate of change from our cold (minimum) rate to our stable (maximum) rate, //x (or q) is the occupied token. //根据Guava的理论,有一个线性方程,我们可以把它写成y = m * x + b; //这是在给定饱和周期(例如3分钟)的情况下预期的QPS; //m是从我们的冷(最小)速率到我们的稳定(最大)速率的变化率; //x(或q)就是需要被占用的令牌数; public class WarmUpController implements TrafficShapingController { ... @Override public boolean canPass(Node node, int acquireCount) { return canPass(node, acquireCount, false); } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { //获取当前1s的QPS long passQps = (long) node.passQps(); //获取上一窗口通过的QPS long previousQps = (long) node.previousPassQps(); //1.生成令牌并同步到令牌桶内 syncToken(previousQps); //获取令牌桶内剩余的令牌数 long restToken = storedTokens.get(); //2.如果令牌桶中的令牌数量大于告警值,说明还处于预热阶段,此时需要判断令牌的生成速度和消费速度 if (restToken >= warningToken) { //获取桶内剩余令牌数超过告警值的令牌个数 long aboveToken = restToken - warningToken; //当前令牌的生成间隔 = 稳定阶段的生成间隔 + 桶内超出告警值部分的已存储令牌数 * slope //其中,稳定阶段的生成间隔是1/count,桶内超出告警值部分的已存储令牌数是aboveToken //注意:预热阶段生成令牌的速率会越来越慢,也就是生成令牌的间隔越来越大; //当桶内已存储的令牌超过告警值后,令牌越多,那1秒可允许的QPS越小; //下面代码计算的是: //当前1s内的时间窗口能够生成的令牌数量,即当前时间窗口生成的令牌可满足的QPS = 1 / 当前令牌的生成间隔 double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); //如果当前消费令牌的速度(passQps + acquireCount) <= 当前生成令牌的速度(warningQps),则允许通过 //如果当前时间窗口通过的QPS + 客户端申请的令牌数 小于等于 当前预热阶段的告警QPS,则代表允许通过 if (passQps + acquireCount <= warningQps) { return true; } } //3.如果令牌桶中的令牌数量小于告警值,说明预热结束,进入稳定阶段 else { //如果当前消费令牌的速度(passQps + acquireCount) <= 当前生成令牌的速度(count),则允许通过 if (passQps + acquireCount <= count) { return true; } } return false; } //生成令牌并同步到令牌桶内 //入参passQps是前一个时间窗口的QPS,也就是上一秒通过的QPS数 //syncToken()方法的逻辑是: //1.首先验证当前时间与最后更新令牌桶的时间,避免在同一个时间窗口重复添加令牌; //2.其次通过WarmUpController.coolDownTokens()方法获取最新的令牌数; //3.接着利用CAS来保证更新令牌桶的线程安全性; //4.最后将桶内已存储的令牌数,减去上一秒通过的QPS数,得到目前令牌桶剩余的令牌数; protected void syncToken(long passQps) { //获取当前时间ms long currentTime = TimeUtil.currentTimeMillis(); //将当前时间ms转换为s currentTime = currentTime - currentTime % 1000; //获取上一次更新令牌桶已存储的令牌数量的时间 long oldLastFillTime = lastFilledTime.get(); //如果上一次更新令牌桶已存储的令牌数量的时间和当前时间一样,或发生了时钟回拨等情况导致比当前时间还小 //那么就无需更新,直接return即可 if (currentTime <= oldLastFillTime) { return; } //先获取目前令牌桶已存储的令牌数 long oldValue = storedTokens.get(); //调用WarmUpController.coolDownTokens()方法得到最新的令牌数 long newValue = coolDownTokens(currentTime, passQps); //通过CAS更新令牌桶已存储的令牌数 //注意:系统初始化完毕,第一个请求进来调用WarmUpController.canPass()方法时,storedTokens = maxToken if (storedTokens.compareAndSet(oldValue, newValue)) { //设置令牌桶内已存储的最新令牌数 = 当前令牌数 - 上一个时间窗口通过的请求数 long currentValue = storedTokens.addAndGet(0 - passQps); if (currentValue < 0) { storedTokens.set(0L); } //更新最后一次添加令牌的时间戳 lastFilledTime.set(currentTime); } } //根据当前时间和上一个时间窗口通过的QPS计算更新后的令牌数 private long coolDownTokens(long currentTime, long passQps) { //获取当前令牌桶已存储的令牌数 long oldValue = storedTokens.get(); long newValue = oldValue; //如果令牌桶中已存储的令牌数小于告警值,说明系统已结束冷启动,即退出预热阶段进入稳定阶段 //也就是桶内已存储的令牌数没有达到进入预热阶段的阈值,此时需要较快地向令牌桶中添加令牌 if (oldValue < warningToken) { //在当前令牌数量的基础上,加上从上次添加令牌到现在经过的时间(以秒为单位)乘以令牌生成速率(QPS阈值count) newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count /1000); } //如果令牌桶中已存储的令牌数大于告警值,说明系统处于预热阶段,还在进行冷启动 else if (oldValue > warningToken) { //如果上一个时间窗口通过的QPS,小于系统最冷时允许通过的QPS(1 / (1 / count * coldFactor)) //那么就说明当前系统的负载比较低,可以向令牌桶中添加令牌 if (passQps < (int)count / coldFactor) { //在当前令牌数量的基础上,加上从上次添加令牌到现在经过的时间(以秒为单位)乘以令牌生成速率(QPS阈值count) newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } } //确保令牌桶更新后的令牌数不超过最大令牌数(maxToken) //系统初始化完毕,第一个请求进来调用WarmUpController.canPass()方法时, //oldValue = 0,lastFilledTime = 0,此时返回maxToken return Math.min(newValue, maxToken); } }