API 调用次数限制实现
我們在開發接口服務器的過程中,為了防止客戶端對于接口的濫用,保護服務器的資源, 通常來說我們會對于服務器上的各種接口進行調用次數的限制。
?
比如對于某個 用戶,他在一個時間段(interval)內,比如 1 分鐘,調用服務器接口的次數不能夠大于一個上限(limit),比如說 100 次。如果用戶調用接口的次數超過上限的話,就直接拒絕用戶的請求,返回錯誤信息。
?
最初的想法
?
對于實現方法的第一印象,大概是,給每個用戶一個配額,次數為 Q, 這個配額在用戶第一次調用接口的時候分配給用戶。然后在接下去的 P 時間段內,如果用戶訪問 API 的次數大于Q,就開始拒絕用戶的調用請求。然后,這個 配額,在 P 時間之后,配額會被重置回 Q。
?
邏輯的偽代碼見如下:
?
can_access(identity):
limit_counter = get(identity)
if limiter_counter exists and
limiter_counter.timestamp - CURRENT_TIME < LIMIT_INTERVAL and
limit_counter >= limit:
return false
else
if limiter_counter is nil or
(limit_counter exists and limiter_counter.timestamp - CURRENT_TIME > LIMT_INTERVAL):
put(identity, new LimiterCounter(1, CURRENT_TIME))
else
put(identity, limiter_counter.increment())
end
return true
end
?
Redis 的 INCR 命令可以比較簡單地實現這種方法,在 INCR 的文檔頁下面介紹了如何使用 INCR 命令實現 Rate Limtier。
?
這種實現方法,仔細想來存在一個缺陷,就是用戶可以在一個時間段的末尾發起 Q 次請求,然后在下一個時間段的開始又 發起 Q 次請求,這樣,一個用戶可以在很短的時間之內發起 2Q 次請求。
?
可能普通用戶不會刻意這么去做,但如果真的出現這種情況的時候,服務器會承受正常情況下兩倍的負載, 這并不是我們所希望看見的。而且如果服務器被攻擊的話,這種的缺陷,還是很可能會被利用的。
?
Token Bucket 算法
?
作為一個程序員,當我對于一個問題沒有頭緒時,可以這樣:
?
?
或者可以:
簡單的 Google 之后,在 StackOverflow 上面了解到 Token Bucket 算法, 這個算法通常用在計算機的網絡設備上面,一般用于限制流量,很符合我所需要解決的問題。
?
在這個算法當中:
簡單地來看,可以將這個算法類比成有個水龍頭在往水桶中放水,然后不斷地有水瓢到這個水桶中打水去澆花,如果水桶的水滿了,那么水就從水桶中溢出了。
?
?
在我們的問題領域,要將流量換成一個請求。當一個請求到達服務器的時候,首先需要根據請求的各種信息,確定其需要獲取哪個 bucket 的 Token,因為服務器一般會有多種限制流量的策略進行組合。
?
舉一些例子:
- 對于每個登錄過的用戶,服務器規定 10 秒內,用戶的請求次數不能超過 200 次;而且,1 小時內,用戶的請求次數不能超過 5000 次;并且,1 天內, 用戶的請求次數不能超過 20000 次。這樣,對于每個用戶都需要設置三個 bucket。
- 另外服務器還規定服務器所有的接口在 10 秒內,請求次數不能超過 100000 次,這種情況下,所有用戶會共享一個全服務器的 token bucket。
- 還有可能根據 IP 進行限制,這樣 bucket 就需要根據 IP 地址進行創建。
接著,這個請求去對應的 token bucket 獲取允許通行的 token,如果沒有獲取到 token,服務器最好的做法是直接返回流量超過限制的響應(429)。 如果獲取到相應的 token,那么就對于請求給予放行。
?
初步的實現的想法
?
初步實現的想法很簡單,首先,對于每個 Bucket 設置一個定時器,每過一個間隔,就往這個 Bucket 當中加入一些 Token,然后用戶 獲取一個 Token 之后,就將 Bucket 中的 Token 數量減一。這個實現,是在看到這個算法的時候,比較容易想到的一個方法。然而, 稍微仔細地考慮一下,就知道這個實現手段在現實當中基本上是屬于沒法用的實現。原因在于,這種實現算法需要給每個 Bucket 添加 一個定時器,而一個定時器就是一條線程。那么在你的服務器上,光是分配給定時器的線程就需要和你的用戶數量是一個量級的, 幾萬幾十萬條線程在服務器上運行,是完全是脫離了實際情況的。
?
所以這個簡單的實現方法,在稍微考慮之后,就可以排除了。
?
另一種的實現的辦法
?
于是又開始尋找另外的實現方法,搜索資料的時候,發現 Guava 庫當中也有一個 RateLimiter,其作用也是 用來進行限流,于是閱讀了 RateLimiter 的源代碼,查看一些 Google 的人是如何實現 Token Bucket 算法的。
?
RateLimiter 和 SmoothRateLimiter
// com.google.common.util.concurrent.SmoothRateLimiter private void resync(long nowMicros) { // if nextFreeTicket is in the past, resync to now if (nowMicros > nextFreeTicketMicros) { storedPermits = min(maxPermits, storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros); nextFreeTicketMicros = nowMicros; } }
在 resync 方法中的這句代碼 storedPermits = min(maxPermits, storedPermits+ (nowMicros - nextFreeTicketMicros)/stableIntervalMicros); 就是 RateLimiter 中計算 Token 數量的方法。沒有使用計時器,而是使用時間戳的方式計算。這個做法給足了 信息。
?
我們可以在 Bucket 中存放現在的 Token 數量,然后存儲上一次補充 Token 的時間戳,當用戶下一次請求獲取一個 Token 的時候, 根據此時的時間戳,計算從上一個時間戳開始,到現在的這個時間點所補充的所有 Token 數量,加入到 Bucket 當中。
?
這種實現方法有幾個優勢:
- 首先, 避免了給每一個 Bucket 設置一個定時器這種笨辦法,
- 第二,數據結構需要的內存量很小,只需要儲存 Bucket 中剩余的 Token 量以及上次補充 Token 的時間戳就可以了;
- 第三,只有在用戶訪問的時候,才會計算 Token 補充量,對于系統的計算資源占用量也較小。
確定和實現方法之后,就可以開始實現這個算法了。首先要考慮的是 Bucket 存放在哪里?雖然 Bucket 占用內存的數量很小,假設一個 Bucket 的大小為 20 個字節,如果需要儲存一百萬個 Bucket 就需要使用 20M 的內存。
?
而且,Bucket 從 一定意義上屬于緩存數據,因為如果用戶長期不使用這個 Bucket 的話,應該能夠自動失效。從上面的分析,自然地,我想到 將 Bucket 放在 Redis 當中,每個 Bucket 使用一個 Hash 存放(HSET), 并且支持在一段時間之后,使 Bucket 失效(TTL)。
?
于是有了第一版的代碼, RateLimiter
public boolean access(String userId) { String key = genKey(userId); try (Jedis jedis = jedisPool.getResource()) { Map<String, String> counter = jedis.hgetAll(key); if (counter.size() == 0) { TokenBucket tokenBucket = new TokenBucket(System.currentTimeMillis(), limit - 1); jedis.hmset(key, tokenBucket.toHash()); return true; } else { TokenBucket tokenBucket = TokenBucket.fromHash(counter); long lastRefillTime = tokenBucket.getLastRefillTime(); long refillTime = System.currentTimeMillis(); long intervalSinceLast = refillTime - lastRefillTime; long currentTokensRemaining; if (intervalSinceLast > intervalInMills) { currentTokensRemaining = limit; } else { long grantedTokens = (long) (intervalSinceLast / intervalPerPermit); System.out.println(grantedTokens); currentTokensRemaining = Math.min(grantedTokens + tokenBucket.getTokensRemaining(), limit); } tokenBucket.setLastRefillTime(refillTime); assert currentTokensRemaining >= 0; if (currentTokensRemaining == 0) { tokenBucket.setTokensRemaining(currentTokensRemaining); jedis.hmset(key, tokenBucket.toHash()); return false; } else { tokenBucket.setTokensRemaining(currentTokensRemaining - 1); jedis.hmset(key, tokenBucket.toHash()); return true; } } } }
上面的方法是最初的實現方法,對于每一個 Token Bucket,在 Redis 上面,使用一個 Hash 進行表示,一個 Token Bucket 有 lastRefillTime 表示最后一次補充 Token 的時間,tokensRemaining 則表示 Bucket 中的剩余 Token 數量,access() 方法大致的步驟為:
這個方法在單線程的條件下面,可以比較好地滿足需求,但是在多線程的條件下面,是會出現 race condition,如下面的圖所示。
?
?
更好一點的實現方法
?
上面的 race condition 出現的原因是多個線程對于 Token Bucket 進行寫操作,當遇到 race condition 的時候, 我們通常使用鎖的方式解決這個沖突。
?
對于上面這個情況,我們需要使用鎖保護的資源就是相應的 Token Bucket。如果使用鎖的方式實現,需要對每一個 Token Bucket 附加一個鎖,當多個線程并發地讀寫 Token Bucket 的時候,需要先獲取這個鎖的控制權,然后 對于 Token Bucket 進行修改,然后更新到 Redis 中。
?
雖然使用在業務代碼當中可以實現這個邏輯,但 Redis 提供了一種我個人感覺更好的方法來實現同樣的上鎖機制。 EVAL 和 EVALSHA, 使用 lua 腳本的方式執行命令,這個腳本整個的操作會被當成一個原子操作在 Redis 上面執行
我們可以將原本的 Java 實現轉寫成 lua 腳本,將要本要做的 Token Bucket 的讀寫操作放在這個腳本當中, 讓 Redis 去保證這個操作的原子性。 大致的實現放在這里rate_limiter.lua 和LuaRateLimiter.java。
?
查漏補缺
?
在上面一個版本的 LuaRateLimiter 當中,我自己發現的問題有兩個:
經過改版之后的代碼在這里
?
對于第一個問題,我現在做的改動比較有限,在 RateLimitPolicy 當中添加一個 maxBurstTime,然后計算 Bucket 激活的時候 初始的 Token 容量。程序創建一個 RateLimitPolicy 的時候,需要指定這個 maxBurstTime。關于這個初始容量的設置與計算 可以進一步參考 Guava 的 SmoothRateLimiter 中的文檔和代碼。
?
第二問題就是比較簡單的重構,在實現當中,RateLimitPolicy 是一個抽象類,有一個 PerUserRateLimitPolicy,這個 規則通過 genBucketKey() 方法,對于每個用戶都返回一個不同的 bucket key,從而使不同的用戶使用不同的 Token Bucket。 另外,如果有別的限制策略,可以通過實現不同的 RateLimitPolicy 來完成(genBucketKey() 方法的定義還可以進一步地優化)。
?
現在這個版本的實現,不算特別成熟,但如果要求不是特別高的話,也是一個可用的實現方案,所以暫時就先寫成這樣, 如果實際應用中發現問題,可以進一步地優化。
總結
以上是生活随笔為你收集整理的API 调用次数限制实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Harshanie Lakshika J
- 下一篇: 双色球4+1中多少钱?