javascript
Spring Cloud Gateway(限流)
在高并發的應用中,限流是一個繞不開的話題。限流可以保障我們的 API 服務對所有用戶的可用性,也可以防止網絡攻擊。
一般開發高并發系統常見的限流有:限制總并發數(比如數據庫連接池、線程池)、限制瞬時并發數(如 nginx 的 limit_conn 模塊,用來限制瞬時并發連接數)、限制時間窗口內的平均速率(如 Guava 的 RateLimiter、nginx 的 limit_req 模塊,限制每秒的平均速率);其他還有如限制遠程接口調用速率、限制 MQ 的消費速率。另外還可以根據網絡連接數、網絡流量、CPU 或內存負載等來限流。
本文詳細探討在 Spring Cloud Gateway 中如何實現限流。
限流算法
做限流 (Rate Limiting/Throttling) 的時候,除了簡單的控制并發,如果要準確的控制 TPS,簡單的做法是維護一個單位時間內的 Counter,如判斷單位時間已經過去,則將 Counter 重置零。此做法被認為沒有很好的處理單位時間的邊界,比如在前一秒的最后一毫秒里和下一秒的第一毫秒都觸發了最大的請求數,也就是在兩毫秒內發生了兩倍的 TPS。
常用的更平滑的限流算法有兩種:漏桶算法和令牌桶算法。很多傳統的服務提供商如華為中興都有類似的專利,參考采用令牌漏桶進行報文限流的方法。
漏桶算法
漏桶(Leaky Bucket)算法思路很簡單,水(請求)先進入到漏桶里,漏桶以一定的速度出水(接口有響應速率),當水流入速度過大會直接溢出(訪問頻率超過接口響應速率),然后就拒絕請求,可以看出漏桶算法能強行限制數據的傳輸速率。
可見這里有兩個變量,一個是桶的大小,支持流量突發增多時可以存多少的水(burst),另一個是水桶漏洞的大小(rate)。因為漏桶的漏出速率是固定的參數,所以,即使網絡中不存在資源沖突(沒有發生擁塞),漏桶算法也不能使流突發(burst)到端口速率。因此,漏桶算法對于存在突發特性的流量來說缺乏效率。
令牌桶算法
令牌桶算法(Token Bucket)和 Leaky Bucket 效果一樣但方向相反的算法,更加容易理解。隨著時間流逝,系統會按恒定 1/QPS 時間間隔(如果 QPS=100,則間隔是 10ms)往桶里加入 Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),如果桶已經滿了就不再加了。新請求來臨時,會各自拿走一個 Token,如果沒有 Token 可拿了就阻塞或者拒絕服務。
令牌桶的另外一個好處是可以方便的改變速度。一旦需要提高速率,則按需提高放入桶中的令牌的速率。一般會定時(比如 100 毫秒)往桶中增加一定數量的令牌,有些變種算法則實時的計算應該增加的令牌的數量。
Guava 中的 RateLimiter 采用了令牌桶的算法,設計思路參見?How is the RateLimiter designed, and why?,詳細的算法實現參見源碼。
Leakly Bucket vs Token Bucket
| 依賴 token | 否 | 是 | ? |
| 立即執行 | 是 | 否 | 有足夠的 token 才能執行 |
| 堆積 token | 否 | 是 | ? |
| 速率恒定 | 是 | 否 | 可以大于設定的 QPS |
限流實現
在 Gateway 上實現限流是個不錯的選擇,只需要編寫一個過濾器就可以了。有了前邊過濾器的基礎,寫起來很輕松。(如果你對 Spring Cloud Gateway 的過濾器還不了解,請先看這里)
我們這里采用令牌桶算法,Google Guava 的RateLimiter、Bucket4j、RateLimitJ?都是一些基于此算法的實現,只是他們支持的 back-ends(JCache、Hazelcast、Redis 等)不同罷了,你可以根據自己的技術棧選擇相應的實現。
這里我們使用 Bucket4j,引入它的依賴坐標,為了方便順便引入 Lombok
<dependency><groupId>com.github.vladimir-bukhtoyarov</groupId><artifactId>bucket4j-core</artifactId><version>4.0.0</version> </dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.20</version><scope>provided</scope> </dependency>我們來實現具體的過濾器
@CommonsLog @Builder @Data @AllArgsConstructor @NoArgsConstructor public class RateLimitByIpGatewayFilter implements GatewayFilter,Ordered {int capacity;int refillTokens;Duration refillDuration;private static final Map<String,Bucket> CACHE = new ConcurrentHashMap<>();private Bucket createNewBucket() {Refill refill = Refill.of(refillTokens,refillDuration);Bandwidth limit = Bandwidth.classic(capacity,refill);return Bucket4j.builder().addLimit(limit).build();}@Overridepublic Mono<Void> filter(ServerWebExchange exchange,GatewayFilterChain chain) {// if (!enableRateLimit){// return chain.filter(exchange);// }String ip = exchange.getRequest().getRemoteAddress().getAddress().getHostAddress();Bucket bucket = CACHE.computeIfAbsent(ip,k -> createNewBucket());log.debug("IP: " + ip + ",TokenBucket Available Tokens: " + bucket.getAvailableTokens());if (bucket.tryConsume(1)) {return chain.filter(exchange);} else {exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);return exchange.getResponse().setComplete();}}@Overridepublic int getOrder() {return -1000;}}通過對令牌桶算法的了解,我們知道需要定義三個變量:
- capacity:桶的最大容量,即能裝載 Token 的最大數量
- refillTokens:每次 Token 補充量
- refillDuration:補充 Token 的時間間隔
在這個實現中,我們使用了 IP 來進行限制,當達到最大流量就返回429錯誤。這里我們簡單使用一個 Map 來存儲 bucket,所以也決定了它只能單點使用,如果是分布式的話,可以采用 Hazelcast 或 Redis 等解決方案。
在 Route 中我們添加這個過濾器,這里指定了 bucket 的容量為 10 且每一秒會補充 1 個 Token。
.route(r -> r.path("/throttle/customer/**").filters(f -> f.stripPrefix(2).filter(new RateLimitByIpGatewayFilter(10,1,Duration.ofSeconds(1)))).uri("lb://CONSUMER").order(0).id("throttle_customer_service") )啟動服務并多次快速刷新改接口,就會看到 Tokens 的數量在不斷減小,等一會又會增加上來
2018-05-09 15:42:08.601 DEBUG 96278 --- [ctor-http-nio-2] com.windmt.filter.RateLimitByIpGatewayFilter : IP: 0:0:0:0:0:0:0:1,TokenBucket Available Tokens: 2 2018-05-09 15:42:08.958 DEBUG 96278 --- [ctor-http-nio-2] com.windmt.filter.RateLimitByIpGatewayFilter : IP: 0:0:0:0:0:0:0:1,TokenBucket Available Tokens: 1 2018-05-09 15:42:09.039 DEBUG 96278 --- [ctor-http-nio-2] com.windmt.filter.RateLimitByIpGatewayFilter : IP: 0:0:0:0:0:0:0:1,TokenBucket Available Tokens: 0 2018-05-09 15:42:10.201 DEBUG 96278 --- [ctor-http-nio-2] com.windmt.filter.RateLimitByIpGatewayFilter : IP: 0:0:0:0:0:0:0:1,TokenBucket Available Tokens: 1RequestRateLimiter
剛剛我們通過過濾器實現了限流的功能,你可能在想為什么不直接創建一個過濾器工廠呢,那樣多方便。這是因為 Spring Cloud Gateway 已經內置了一個RequestRateLimiterGatewayFilterFactory,我們可以直接使用(這里有坑,后邊詳說)。
目前RequestRateLimiterGatewayFilterFactory的實現依賴于 Redis,所以我們還要引入spring-boot-starter-data-redis-reactive
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>因為這里有坑,所以把 application.yml 的配置再全部貼一遍,新增的部分我已經用# ---標出來了
spring:application:name: cloud-gatewaycloud:gateway:discovery:locator:enabled: trueroutes:- id: service_customeruri: lb://CONSUMERorder: 0predicates:- Path=/customer/**filters:- StripPrefix=1# -------- name: RequestRateLimiterargs:key-resolver: "#{@remoteAddrKeyResolver}"redis-rate-limiter.replenishRate: 1redis-rate-limiter.burstCapacity: 5# -------- AddResponseHeader=X-Response-Default-Foo,Default-Bardefault-filters:- Elapsed=true# -------redis:host: localhostport: 6379database: 0# ------- server:port: 10000 eureka:client:service-url:defaultZone: http://localhost:7000/eureka/ logging:level:org.springframework.cloud.gateway: debugcom.windmt.filter: debug默認情況下,是基于令牌桶算法實現的限流,有個三個參數需要配置:
- burstCapacity,令牌桶容量。
- replenishRate,令牌桶每秒填充平均速率。
- key-resolver,用于限流的鍵的解析器的 Bean 對象名字(有些繞,看代碼吧)。它使用 SpEL 表達式根據#{@beanName}從 Spring 容器中獲取 Bean 對象。默認情況下,使用PrincipalNameKeyResolver,以請求認證的java.security.Principal作為限流鍵。
關于filters的那段配置格式,參考這里
我們實現一個使用請求 IP 作為限流鍵的KeyResolver
public class RemoteAddrKeyResolver implements KeyResolver {public static final String BEAN_NAME = "remoteAddrKeyResolver";@Overridepublic Mono<String> resolve(ServerWebExchange exchange) {return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());}}配置RemoteAddrKeyResolver?Bean 對象
@Bean(name = RemoteAddrKeyResolver.BEAN_NAME) public RemoteAddrKeyResolver remoteAddrKeyResolver() {return new RemoteAddrKeyResolver(); }以上就是代碼部分,我們還差一個 Redis,我就本地用 docker 來快速啟動了
docker run --name redis -p 6379:6379 -d redis萬事俱備,只欠測試了。以上的代碼的和配置都是 OK 的,可以自行測試。下面來說一下這里邊的坑。
遇到的坑
配置不生效
參考這個?issue
No Configuration found for route
這個異常信息如下:
java.lang.IllegalArgumentException: No Configuration found for route service_customerat org.springframework.cloud.gateway.filter.ratelimit.RedisRateLimiter.isAllowed(RedisRateLimiter.java:93) ~[spring-cloud-gateway-core-2.0.0.RC1.jar:2.0.0.RC1]出現在將 RequestRateLimiter 配置為 defaultFilters 的情況下,比如像這樣
default-filters:- name: RequestRateLimiterargs:key-resolver: "#{@remoteAddrKeyResolver}"redis-rate-limiter.replenishRate: 1redis-rate-limiter.burstCapacity: 5這時候就會導致這個異常。我通過分析源碼,發現了一些端倪,感覺像是一個 bug,已經提交了?issue
我們從異常入手來看,?RedisRateLimiter#isAllowed?這個方法要獲取 routeId 對應的 routerConfig,如果獲取不到就拋出剛才我們看到的那個異常。
public Mono<Response> isAllowed(String routeId,String id) {if (!this.initialized.get()) {throw new IllegalStateException("RedisRateLimiter is not initialized");}// 只為 defaultFilters 配置 RequestRateLimiter 的時候// config map 里邊的 key 只有 "defaultFilters"// 但是我們實際請求的 routeId 為 "customer_service"Config routeConfig = getConfig().get(routeId);if (routeConfig == null) {if (defaultConfig == null) {throw new IllegalArgumentException("No Configuration found for route " + routeId);}routeConfig = defaultConfig;}// 省略若干代碼... }既然這里要 get,那必然有個地方要 put。put 的相關代碼在?AbstractRateLimiter#onApplicationEvent?這個方法。
@Override public void onApplicationEvent(FilterArgsEvent event) {Map<String,Object> args = event.getArgs();// hasRelevantKey 檢查 args 是否包含 configurationPropertyName// 只有 defaultFilters 包含if (args.isEmpty() || !hasRelevantKey(args)) {return;}String routeId = event.getRouteId();C routeConfig = newConfig();ConfigurationUtils.bind(routeConfig,args,configurationPropertyName,configurationPropertyName,validator);getConfig().put(routeId,routeConfig); }private boolean hasRelevantKey(Map<String,Object> args) {return args.keySet().stream().anyMatch(key -> key.startsWith(configurationPropertyName + ".")); }上邊的 args 里是是配置參數的鍵值對,比如我們之前自定義的過濾器工廠Elapsed,有個參數withParams,這里就是withParams=true。關鍵代碼在第 7 行,hasRelevantKey方法用于檢測 args 里邊是否包含configurationPropertyName.,具體到本例就是是否包含redis-rate-limiter.。悲劇就發生在這里,因為我們只為 defaultFilters 配置了相關 args,注定其他的 route 到這里就直接 return 了。
現在不清楚這是 bug 還是設計者有意為之,等答復吧。
基于系統負載的動態限流
在實際工作中,我們可能還需要根據網絡連接數、網絡流量、CPU 或內存負載等來進行動態限流。在這里我們以 CPU 為栗子。
我們需要借助 Spring Boot Actuator 提供的 Metrics 能力進行實現基于 CPU 的限流——當 CPU 使用率高于某個閾值就開啟限流,否則不開啟限流。
我們在項目中引入 Actuator 的依賴坐標
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId> </dependency>因為 Spring Boot 2.x 之后,Actuator 被重新設計了,和 1.x 的區別還是挺大的(參考這里)。我們先在配置中設置management.endpoints.web.exposure.include=*來觀察一下新的 Metrics 的能力
http://localhost:10000/actuator/metrics
{"names": ["jvm.buffer.memory.used","jvm.memory.used","jvm.buffer.count","jvm.gc.memory.allocated","logback.events","process.uptime","jvm.memory.committed","system.load.average.1m","jvm.gc.pause","jvm.gc.max.data.size","jvm.buffer.total.capacity","jvm.memory.max","system.cpu.count","system.cpu.usage","process.files.max","jvm.threads.daemon","http.server.requests","jvm.threads.live","process.start.time","jvm.classes.loaded","jvm.classes.unloaded","jvm.threads.peak","jvm.gc.live.data.size","jvm.gc.memory.promoted","process.files.open","process.cpu.usage"] }我們可以利用里邊的系統 CPU 使用率system.cpu.usage
http://localhost:10000/actuator/metrics/system.cpu.usage
{"name": "system.cpu.usage","measurements": [{"statistic": "VALUE","value": 0.5189003436426117}],"availableTags": [] }最近一分鐘內的平均負載system.load.average.1m也是一樣的
http://localhost:10000/actuator/metrics/system.load.average.1m
{"name": "system.load.average.1m","measurements": [{"statistic": "VALUE","value": 5.33203125}],"availableTags": [] }知道了 Metrics 提供的指標,我們就來看在代碼里具體怎么實現吧。Actuator 2.x 里邊已經沒有了之前 1.x 里邊提供的SystemPublicMetrics,但是經過閱讀源碼可以發現MetricsEndpoint這個類可以提供類似的功能。就用它來擼代碼吧
@CommonsLog @Component public class RateLimitByCpuGatewayFilter implements GatewayFilter, Ordered {@Autowiredprivate MetricsEndpoint metricsEndpoint;private static final String METRIC_NAME = "system.cpu.usage";private static final double MAX_USAGE = 0.50D;@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {// if (!enableRateLimit){// return chain.filter(exchange);// }Double systemCpuUsage = metricsEndpoint.metric(METRIC_NAME, null).getMeasurements().stream().filter(Objects::nonNull).findFirst().map(MetricsEndpoint.Sample::getValue).filter(Double::isFinite).orElse(0.0D);boolean ok = systemCpuUsage < MAX_USAGE;log.debug("system.cpu.usage: " + systemCpuUsage + " ok: " + ok);if (!ok) {exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);return exchange.getResponse().setComplete();} else {return chain.filter(exchange);}}@Overridepublic int getOrder() {return 0;}}配置 Route
@Autowired private RateLimitByCpuGatewayFilter rateLimitByCpuGatewayFilter;@Bean public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {// @formatter:offreturn builder.routes().route(r -> r.path("/throttle/customer/**").filters(f -> f.stripPrefix(2).filter(rateLimitByCpuGatewayFilter)).uri("lb://CONSUMER").order(0).id("throttle_customer_service")).build();// @formatter:on }至于效果嘛,自己試試吧。因為 CPU 的使用率一般波動較大,測試效果還是挺明顯的,實際使用就得慎重了。
示例代碼可以從 Github 獲取:https://github.com/zhaoyibo/spring-cloud-study
改進與提升
實際項目中,除以上實現的限流方式,還可能會:一、在上文的基礎上,增加配置項,控制每個路由的限流指標,并實現動態刷新,從而實現更加靈活的管理。二、實現不同維度的限流,例如:
- 對請求的目標 URL 進行限流(例如:某個 URL 每分鐘只允許調用多少次)
- 對客戶端的訪問 IP 進行限流(例如:某個 IP 每分鐘只允許請求多少次)
- 對某些特定用戶或者用戶組進行限流(例如:非 VIP 用戶限制每分鐘只允許調用 100 次某個 API 等)
- 多維度混合的限流。此時,就需要實現一些限流規則的編排機制(與、或、非等關系)
參考
Token bucket
RequestRateLimiter GatewayFilter Factory
Scaling your API with rate limiters
Scaling your API with rate limiters 譯文
Spring Boot Actuator Web API Documentation
https://github.com/nereuschen/blog/issues/37
http://www.itmuch.com/spring-cloud-sum/spring-cloud-ratelimit
- 本文作者:?Yibo
- 本文鏈接:?https://windmt.com/2018/05/09/spring-cloud-15-spring-cloud-gateway-ratelimiter/
- 版權聲明:?本博客所有文章除特別聲明外,均采用?CC BY-NC-SA 4.0?許可協議。轉載請注明出處!
總結
以上是生活随笔為你收集整理的Spring Cloud Gateway(限流)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Cloud Gateway
- 下一篇: Spring Cloud Gateway