《优化接口设计的思路》系列:第七篇—接口限流策略
一、前言
大家好!我是sum墨,一個一線的底層碼農(nóng),平時喜歡研究和思考一些技術(shù)相關(guān)的問題并整理成文,限于本人水平,如果文章和代碼有表述不當之處,還請不吝賜教。
作為一名從業(yè)已達六年的老碼農(nóng),我的工作主要是開發(fā)后端Java業(yè)務(wù)系統(tǒng),包括各種管理后臺和小程序等。在這些項目中,我設(shè)計過單/多租戶體系系統(tǒng),對接過許多開放平臺,也搞過消息中心這類較為復(fù)雜的應(yīng)用,但幸運的是,我至今還沒有遇到過線上系統(tǒng)由于代碼崩潰導(dǎo)致資損的情況。這其中的原因有三點:一是業(yè)務(wù)系統(tǒng)本身并不復(fù)雜;二是我一直遵循某大廠代碼規(guī)約,在開發(fā)過程中盡可能按規(guī)約編寫代碼;三是經(jīng)過多年的開發(fā)經(jīng)驗積累,我成為了一名熟練工,掌握了一些實用的技巧。
好像一提到防抖,接下來就會提到限流,我在第六篇文章寫了一些接口防抖的策略,那么這篇正好講講接口如何限流。不知道從哪里看到的,“防抖是回城,限流是攻擊”,感覺真的很形象,我來簡要描述一下:
王者榮耀大家都玩過吧,里面的英雄都有一個攻擊間隔,當我們連續(xù)的點擊普通攻擊的時候,英雄的攻速并不會隨著我們點擊的越快而更快的攻擊。這個就是限流,英雄會按照自身攻速的系數(shù)執(zhí)行攻擊,我們點的再快也沒用。
而防抖在王者榮耀中就是回城,在游戲中經(jīng)常會遇到連續(xù)回城嘲諷對手的玩家,它們每點擊一次回城,后一次的回城都會打斷前一次的回城,只有最后一次點擊的回城會被觸發(fā),從而保證回城只執(zhí)行一次,這就是防抖的概念。
本文參考項目源碼地址:summo-springboot-interface-demo
二、業(yè)務(wù)場景
1. API速率限制
對外提供的API接口可能需要限制每個用戶或每個IP地址在單位時間內(nèi)的訪問次數(shù),以防止濫用或過載。
2. 網(wǎng)站流量控制
對于高流量的網(wǎng)站,為了防止瞬時訪問量過大導(dǎo)致服務(wù)器壓力過重,可以通過限流保護系統(tǒng)穩(wěn)定運行。
3. 秒殺活動
電商平臺在進行秒殺活動時,可能會遭遇大量用戶同時搶購,通過計數(shù)器限流算法可以有效地控制訪問量,避免系統(tǒng)崩潰。
4. 微服務(wù)架構(gòu)
在微服務(wù)架構(gòu)中,限流可以防止某個服務(wù)因為突發(fā)的高流量而成為瓶頸,進而影響到整個系統(tǒng)的穩(wěn)定性。
5. 分布式系統(tǒng)的互斥操作
在進行諸如分布式鎖的操作時,限流算法可以避免過多的請求同時競爭資源,保證系統(tǒng)的公平性和效率。
6. 基礎(chǔ)設(shè)施保護
對于數(shù)據(jù)庫、緩存等基礎(chǔ)設(shè)施服務(wù),通過計數(shù)器限流可以避免過多的并發(fā)請求導(dǎo)致服務(wù)不可用。
7. 網(wǎng)絡(luò)帶寬控制
對于帶寬有限的網(wǎng)絡(luò)服務(wù),限流算法可以用來確保帶寬的合理分配,防止網(wǎng)絡(luò)擁堵。
三、限流策略
1. 計數(shù)器
(1)代碼
CounterRateLimit.java
package com.summo.demo.config.limitstrategy.counter;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface CounterRateLimit {
/**
* 請求的數(shù)量
*
* @return
*/
int requests();
/**
* 時間窗口,單位為秒
*
* @return
*/
int timeWindow();
}
CounterRateLimitAspect.java
package com.summo.demo.config.limitstrategy.counter;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.summo.demo.exception.biz.BizException;
import com.summo.demo.model.response.ResponseCodeEnum;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@Slf4j
@Aspect
@Component
@Order(5)
public class CounterRateLimitAspect {
/**
* 用來存儲每個方法請求計數(shù)的映射
*/
private final ConcurrentHashMap<String, AtomicInteger> REQUEST_COUNT = new ConcurrentHashMap<>();
/**
* 來存儲每個方法的時間戳的映射
*/
private final ConcurrentHashMap<String, Long> TIMESTAMP = new ConcurrentHashMap<>();
@Around("@annotation(com.summo.demo.config.limitstrategy.counter.CounterRateLimit)")
public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable {
//獲取注解信息
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
Method method = signature.getMethod();
CounterRateLimit counterRateLimit = method.getAnnotation(CounterRateLimit.class);
//獲取注解上配置的參數(shù)
int maxRequests = counterRateLimit.requests();
long windowSizeInMillis = TimeUnit.SECONDS.toMillis(counterRateLimit.timeWindow());
// 獲取方法的字符串表示,用作鍵值
String methodName = method.toString();
// 初始化計數(shù)器和時間戳
AtomicInteger count = REQUEST_COUNT.computeIfAbsent(methodName, k -> new AtomicInteger(0));
long startTime = TIMESTAMP.computeIfAbsent(methodName, k -> System.currentTimeMillis());
// 獲取當前時間
long currentTimeMillis = System.currentTimeMillis();
// 如果當前時間超出時間窗口,則重置計數(shù)器和時間戳
if (currentTimeMillis - startTime > windowSizeInMillis) {
// 原子地重置時間戳和計數(shù)器
TIMESTAMP.put(methodName, currentTimeMillis);
count.set(0);
}
// 原子地增加計數(shù)器并檢查其值
if (count.incrementAndGet() > maxRequests) {
// 如果超出最大請求次數(shù),遞減計數(shù)器,并報錯
count.decrementAndGet();
throw new BizException(ResponseCodeEnum.LIMIT_EXCEPTION, "Too many requests, please try again later.");
}
// 執(zhí)行原方法
return joinPoint.proceed();
}
}
使用方式
/**
* 計數(shù)器算法限流
*
* @return
*/
@GetMapping("/counter")
@CounterRateLimit(requests = 2, timeWindow = 2)
public ResponseEntity counter() {
return ResponseEntity.ok("counter test ok!");
}
(3)原理說明
在示例中,有一個使用了 @CounterRateLimit 注解的 counter 方法。根據(jù)注解的參數(shù),這個方法在2秒鐘的時間窗口內(nèi)只能被調(diào)用2次。 如果在 2 秒內(nèi)有更多的調(diào)用嘗試,那么這些額外的調(diào)用將被限流,并返回錯誤信息。
(4)流程圖
(5)缺點
無法處理“臨界問題”。
(6)臨界問題
假設(shè)1min一個時間段,每個時間段內(nèi)最多100個請求。有一種極端情況,當10:00:58這個時刻100個請求一起過來,到達閾值;當10:01:02這個時刻100個請求又一起過來,到達閾值。這種情況就會導(dǎo)致在短短的4s內(nèi)已經(jīng)處理完了200個請求,而其他所有的時間都在限流中。
2. 滑動窗口
(1)代碼
SlidingWindowRateLimit.java
package com.summo.demo.config.limitstrategy.slidingwindow;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface SlidingWindowRateLimit {
/**
* 請求的數(shù)量
*
* @return
*/
int requests();
/**
* 時間窗口,單位為秒
*
* @return
*/
int timeWindow();
}
SlidingWindowRateLimitAspect.java
package com.summo.demo.config.limitstrategy.slidingwindow;
import com.summo.demo.exception.biz.BizException;
import com.summo.demo.model.response.ResponseCodeEnum;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@Slf4j
@Aspect
@Component
@Order(5)
public class SlidingWindowRateLimitAspect {
/**
* 使用 ConcurrentHashMap 保存每個方法的請求時間戳隊列
*/
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Long>> REQUEST_TIMES_MAP = new ConcurrentHashMap<>();
@Around("@annotation(com.summo.demo.config.limitstrategy.slidingwindow.SlidingWindowRateLimit)")
public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
Method method = signature.getMethod();
SlidingWindowRateLimit rateLimit = method.getAnnotation(SlidingWindowRateLimit.class);
// 允許的最大請求數(shù)
int requests = rateLimit.requests();
// 滑動窗口的大小(秒)
int timeWindow = rateLimit.timeWindow();
// 獲取方法名稱字符串
String methodName = method.toString();
// 如果不存在當前方法的請求時間戳隊列,則初始化一個新的隊列
ConcurrentLinkedQueue<Long> requestTimes = REQUEST_TIMES_MAP.computeIfAbsent(methodName,
k -> new ConcurrentLinkedQueue<>());
// 當前時間
long currentTime = System.currentTimeMillis();
// 計算時間窗口的開始時間戳
long thresholdTime = currentTime - TimeUnit.SECONDS.toMillis(timeWindow);
// 這一段代碼是滑動窗口限流算法中的關(guān)鍵部分,其功能是移除當前滑動窗口之前的請求時間戳。這樣做是為了確保窗口內(nèi)只保留最近時間段內(nèi)的請求記錄。
// requestTimes.isEmpty() 是檢查隊列是否為空的條件。如果隊列為空,則意味著沒有任何請求記錄,不需要進行移除操作。
// requestTimes.peek() < thresholdTime 是檢查隊列頭部的時間戳是否早于滑動窗口的開始時間。如果是,說明這個時間戳已經(jīng)不在當前的時間窗口內(nèi),應(yīng)當被移除。
while (!requestTimes.isEmpty() && requestTimes.peek() < thresholdTime) {
// 移除隊列頭部的過期時間戳
requestTimes.poll();
}
// 檢查當前時間窗口內(nèi)的請求次數(shù)是否超過限制
if (requestTimes.size() < requests) {
// 未超過限制,記錄當前請求時間
requestTimes.add(currentTime);
return joinPoint.proceed();
} else {
// 超過限制,拋出限流異常
throw new BizException(ResponseCodeEnum.LIMIT_EXCEPTION, "Too many requests, please try again later.");
}
}
}
使用方式
/**
* 滑動窗口算法限流
*
* @return
*/
@GetMapping("/slidingWindow")
@SlidingWindowRateLimit(requests = 2, timeWindow = 2)
public ResponseEntity slidingWindow() {
return ResponseEntity.ok("slidingWindow test ok!");
}
(3)原理說明
從圖上可以看到時間創(chuàng)建是一種滑動的方式前進, 滑動窗口限流策略能夠顯著減少臨界問題的影響,但并不能完全消除它。滑動窗口通過跟蹤和限制在一個連續(xù)的時間窗口內(nèi)的請求來工作。與簡單的計數(shù)器方法不同,它不是在窗口結(jié)束時突然重置計數(shù)器,而是根據(jù)時間的推移逐漸地移除窗口中的舊請求,添加新的請求。
舉個例子:假設(shè)時間窗口為10s,請求限制為3,第一次請求在10:00:00發(fā)起,第二次在10:00:05發(fā)起,第三次10:00:11發(fā)起,那么計數(shù)器策略的下一個窗口開始時間是10:00:11,而滑動窗口是10:00:05。所以這也是滑動窗口為什么可以減少臨界問題的影響,但并不能完全消除它的原因。
(4)流程圖
3. 令牌桶
(1)代碼
該算法我就不造*了,直接使用Guava自帶的RateLimiter實現(xiàn)。
pom.xml
<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.1-jre</version>
</dependency>
TokenBucketRateLimit.java
package com.summo.demo.config.limitstrategy.tokenbucket;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface TokenBucketRateLimit {
/**
* 產(chǎn)生令牌的速率(xx 個/秒)
*
* @return
*/
double permitsPerSecond();
}
TokenBucketRateLimitAspect.java
package com.summo.demo.config.limitstrategy.tokenbucket;
import com.google.common.util.concurrent.RateLimiter;
import com.summo.demo.exception.biz.BizException;
import com.summo.demo.model.response.ResponseCodeEnum;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Aspect
@Component
@Order(5)
public class TokenBucketRateLimitAspect {
private final ConcurrentHashMap<String, RateLimiter> limiters = new ConcurrentHashMap<>();
@Around("@annotation(com.summo.demo.config.limitstrategy.tokenbucket.TokenBucketRateLimit)")
public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
Method method = signature.getMethod();
TokenBucketRateLimit rateLimit = method.getAnnotation(TokenBucketRateLimit.class);
double permitsPerSecond = rateLimit.permitsPerSecond();
String methodName = method.toString();
RateLimiter rateLimiter = limiters.computeIfAbsent(methodName, k -> RateLimiter.create(permitsPerSecond));
if (rateLimiter.tryAcquire()) {
return joinPoint.proceed();
} else {
throw new BizException(ResponseCodeEnum.LIMIT_EXCEPTION, "Too many requests, please try again later.");
}
}
}
使用方式
/**
* 令牌桶算法限流
*
* @return
*/
@GetMapping("/tokenBucket")
@TokenBucketRateLimit(permitsPerSecond = 0.5)
public ResponseEntity tokenBucket() {
return ResponseEntity.ok("tokenBucket test ok!");
}
(3)原理說明
令牌桶算法是一種流量控制機制,非常適合于處理突發(fā)流量,同時保證一定程度的平滑流動。它的工作原理類似于一個實際的水桶,其中水桶代表令牌桶,水流代表令牌。令牌以恒定的速率填充到桶中,直到達到桶的容量上限,多余的> 令牌會被丟棄。當請求(比如網(wǎng)絡(luò)數(shù)據(jù)包或者游戲中的攻擊動作)到達時,它需要消耗一個令牌才能被處理。如果桶中沒有令牌,請求就會被延遲或丟棄,直到桶中再次有令牌為止。
以王者榮耀和LOL中的英雄攻速為例,英雄的攻擊動作可以類比為請求,而英雄的攻速屬性則確定了令牌生成的速度,即攻擊的最大頻率。如果英雄的攻擊動作必須消耗一個令牌才能執(zhí)行,那么即使玩家手速再快,也不能超過攻速設(shè)定> 的最大限制。這樣,英雄的攻擊將會保持一個恒定和平滑的節(jié)奏,而不會出現(xiàn)一會兒快速連續(xù)攻擊,一會兒又突然停止的現(xiàn)象。
這種算法的優(yōu)勢在于其能夠限制請求的峰值速率,同時允許一定程度的突發(fā)請求。在實際應(yīng)用中,令牌桶算法可以平滑流量,減少擁塞,確保系統(tǒng)的穩(wěn)定性。相較于僅僅依靠計數(shù)器或者滑動窗口算法,令牌桶算法提供了更加靈活和平滑> 的流量控制方式,非常適合需要處理突發(fā)性高流量和保持服務(wù)質(zhì)量的場景。
(4)流程圖
4. 漏桶
(1)代碼
LeakyBucketRateLimit.java
package com.summo.demo.config.limitstrategy.leakybucket;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface LeakyBucketRateLimit {
/**
* 桶的容量
*
* @return
*/
int capacity();
/**
* 漏斗的速率,單位通常是秒
*
* @return
*/
int leakRate();
}
LeakyBucketLimiter.java
package com.summo.demo.config.limitstrategy.leakybucket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class LeakyBucketLimiter {
/**
* 桶的容量
*/
private final int capacity;
/**
* 漏桶的漏出速率,單位時間內(nèi)漏出水的數(shù)量
*/
private final int leakRate;
/**
* 當前桶中的水量
*/
private volatile int water = 0;
/**
* 上次漏水的時間
*/
private volatile long lastLeakTime = System.currentTimeMillis();
/**
* 漏桶容器
*/
private static final ConcurrentHashMap<String, LeakyBucketLimiter> LIMITER_MAP = new ConcurrentHashMap<>();
/**
* 靜態(tài)工廠方法,確保相同的方法使用相同的漏桶實例
*
* @param methodKey 方法名
* @param capacity
* @param leakRate
* @return
*/
public static LeakyBucketLimiter createLimiter(String methodKey, int capacity, int leakRate) {
return LIMITER_MAP.computeIfAbsent(methodKey, k -> new LeakyBucketLimiter(capacity, leakRate));
}
private LeakyBucketLimiter(int capacity, int leakRate) {
this.capacity = capacity;
this.leakRate = leakRate;
}
/**
* 嘗試獲取許可(try to acquire a permit),如果獲取成功返回true,否則返回false
*
* @return
*/
public boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
synchronized (this) {
// 計算上次漏水到當前時間的時間間隔
long leakDuration = currentTime - lastLeakTime;
// 如果時間間隔大于等于1秒,表示漏桶已經(jīng)漏出一定數(shù)量的水
if (leakDuration >= TimeUnit.SECONDS.toMillis(1)) {
// 計算漏出的水量
long leakQuantity = leakDuration / TimeUnit.SECONDS.toMillis(1) * leakRate;
// 漏桶漏出水后,更新桶中的水量,但不能低于0
water = (int)Math.max(0, water - leakQuantity);
lastLeakTime = currentTime;
}
// 判斷桶中的水量是否小于容量,如果是則可以繼續(xù)添加水(相當于獲取到令牌)
if (water < capacity) {
water++;
return true;
}
}
// 如果桶滿,則獲取令牌失敗
return false;
}
}
LeakyBucketRateLimitAspect.java
package com.summo.demo.config.limitstrategy.leakybucket;
import java.lang.reflect.Method;
import com.summo.demo.exception.biz.BizException;
import com.summo.demo.model.response.ResponseCodeEnum;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@Slf4j
@Aspect
@Component
@Order(5)
public class LeakyBucketRateLimitAspect {
@Around("@annotation(com.summo.demo.config.limitstrategy.leakybucket.LeakyBucketRateLimit)")
public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
Method method = signature.getMethod();
LeakyBucketRateLimit leakyBucketRateLimit = method.getAnnotation(LeakyBucketRateLimit.class);
int capacity = leakyBucketRateLimit.capacity();
int leakRate = leakyBucketRateLimit.leakRate();
// 方法簽名作為唯一標識
String methodKey = method.toString();
LeakyBucketLimiter limiter = LeakyBucketLimiter.createLimiter(methodKey, capacity, leakRate);
if (!limiter.tryAcquire()) {
// 超過限制,拋出限流異常
throw new BizException(ResponseCodeEnum.LIMIT_EXCEPTION, "Too many requests, please try again later.");
}
return joinPoint.proceed();
}
}
使用方式
/**
* 漏桶算法限流
*
* @return
*/
@GetMapping("/leakyBucket")
@LeakyBucketRateLimit(capacity = 100, leakRate = 20)
public ResponseEntity leakyBucket() {
return ResponseEntity.ok("leakyBucket test ok!");
}
(3)原理說明
在Leaky Bucket算法中,容器有一個固定的容量,類似于漏桶的容量。數(shù)據(jù)以固定的速率進入容器,如果容器滿了,則多余的數(shù)據(jù)會溢出。容器中的數(shù)據(jù)會以恒定的速率從底部流出,類似于漏桶中的水滴。如果容器中的數(shù)據(jù)不足以滿足流出速率,則會等待直到有足夠的數(shù)據(jù)可供流出。這樣就實現(xiàn)了對數(shù)據(jù)流的平滑控制。
(4)流程圖
四、小結(jié)一下
如果面試中被問到如何進行接口優(yōu)化,大家第一印象會想到什么?放到3年前的我來看,第一反應(yīng)肯定是優(yōu)化接口性能,然后說一個本來耗時好幾秒的接口優(yōu)化到毫秒的案例。而現(xiàn)在的我會說:上下文、權(quán)限控制、防抖/限流等等,當然也會說性能優(yōu)化。感謝一直在學(xué)習的自己!
在寫這篇文章之前,我看了很多大佬寫的相關(guān)文章,才開始動筆,不是看不懂也不是寫不來,只是因為我沒怎么在真實業(yè)務(wù)中用到限流策略,我不敢亂寫。怕給人埋坑。
回顧前面的6篇,加上這一篇一共7篇,共計耗時3個月,但第6篇到第7篇則整整花了一個月,不是在偷懶,理論型的東西好寫,但沒有實際業(yè)務(wù)支持那就是高談闊論。不過沉淀了一個月,不寫點東西我怕馬上就忘記了,還是寫一篇,拙文共賞吧!如果有哪位同學(xué)在真實業(yè)務(wù)中使用過類似的限流策略,可以和我們分享一下經(jīng)驗哈,謝謝啦!
總結(jié)
以上是生活随笔為你收集整理的《优化接口设计的思路》系列:第七篇—接口限流策略的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: P1990-覆盖墙壁
- 下一篇: 一文了解Vprix容器流媒体平台和传统云