分布式限速器
限流算法
漏桶算法
漏桶算法思路很簡單,水(也就是請求)先進入到漏桶里,漏桶以一定的速度出水,當水流入速度過大會直接溢出,然后就拒絕請求,可以看出漏桶算法能強行限制數據的傳輸速率。 示意圖(來源網絡)如下:
令牌桶算法
令牌桶算法和漏桶算法效果一樣但方向相反的算法,更加容易理解。隨著時間流逝,系統會按恒定1/QPS時間間隔(如果QPS=100,則間隔是10ms)往桶里加入令牌(想象和漏洞漏水相反,有個水龍頭在不斷的加水),如果桶已經滿了就不再加了。新請求來臨時,會各自拿走一個令牌,如果沒有令牌可拿了就阻塞或者拒絕服務。示意圖(來源網絡)如下:
單機實現
類似guava的ratelimit
/*** 使用計數器限速,guava的RateLimiter沒用過研究后再說* <p>* 單機1秒限速1000*/ public class RateLimiterutils {private final static long DELTA_MILLIS = 1000;private final static long PERMITS_PER_SECOND = 888;private static int counter = 0;private static long timestamp = System.currentTimeMillis();public synchronized static void tryAcquire(int delta) {if (delta > PERMITS_PER_SECOND) {throw new IllegalArgumentException(String.format("delta gather than permitsPerSecond, delta:%d, permitsPerSecond:%d", delta, PERMITS_PER_SECOND));}long now = System.currentTimeMillis();if (now - timestamp < DELTA_MILLIS) {if (counter + delta <= PERMITS_PER_SECOND) {counter += delta;} else {try {Thread.sleep(timestamp + PERMITS_PER_SECOND - now);counter = delta;timestamp = timestamp + PERMITS_PER_SECOND;} catch (InterruptedException e) {e.printStackTrace();}}} else {counter = delta;timestamp = now;}} }分布式限速器
要解決的問題
實現方案
完整的工程代碼實現:https://github.com/singgel/SpringBoot-Templates
ratelimit模塊
這個案例是一個項目,講不使用redis制作分布式限速器服務,提供基礎服務
參見:https://www.mailgun.com/blog/gubernator-cloud-native-distributed-rate-limiting-microservices/
直接想法就是使用負載均衡的想法
舉個例子,我們有兩臺服務器實例,對應的是同一個應用程序(Application.name相同),程序中設置的QPS為100,將應用程序與同一個控制臺程序進行連接,控制臺端依據應用的實例數量將QPS進行均分,動態設置每個實例的QPS為50,若是遇到兩個服務器的配置并不相同,在負載均衡層的就已經根據服務器的優劣對流量進行分配,例如一臺分配70%流量,另一臺分配30%的流量。面對這種情況,控制臺也可以對其實行加權分配QPS的策略。
目的是因為如果單純的限速器會出現redis熱key問題,在訪問層封一層組批
這種方案的思想是建立在Redis令牌桶方案的基礎之上的。如何解決每次取令牌都伴隨一次網絡開銷,該方案的解決方法是建立一層控制端,利用該控制端與Redis令牌桶進行交互,只有當客戶端的剩余令牌數不足時,客戶端才向該控制層取令牌并且每次取一批。
這種思想類似于Java集合框架的數組擴容,設置一個閾值,只有當超過該臨界值時,才會觸發異步調用。其余存取令牌的操作與本地限流無二。雖然該方案依舊存在誤差,但誤差最大也就一批次令牌數而已。
代碼實現
step1:定義lua腳本
lua腳本的注釋不允許使用中文,這點要注意,不然報錯
令牌桶初始化,因為令牌的控制腳本需要用到前置參數
令牌桶控制腳本
-- curr_timestamp: redis current timestamp(Unit of second) redis.replicate_commands() local time = redis.call('time') local curr_timestamp = tonumber(time[1]) local curr_microseconds = tonumber(time[2]) local require_permits = tonumber(ARGV[1]) local result = {} -- result[1]: minimum time from next refresh(Unit of microsecond) result[1] = (1000000 - curr_microseconds) / 1000 local ratelimit_info = redis.call("HMGET", KEYS[1], "last_second", "curr_permits", "max_burst") local last_second = tonumber(ratelimit_info[1]) local curr_permits = tonumber(ratelimit_info[2]) local max_burst = tonumber(ratelimit_info[3]) -- If the last time has passed, update the token bucket if (curr_timestamp > last_second) thencurr_permits = max_burstredis.call("HMSET", KEYS[1], "last_second", curr_timestamp) end -- Update the last permit amount, base on the curr_permits change if (curr_permits > require_permits) thenredis.call("HMSET", KEYS[1], "curr_permits", curr_permits - require_permits)result[2] = require_permitsreturn result end if (curr_permits > 0) thenredis.call("HMSET", KEYS[1], "curr_permits", 0) end result[2] = curr_permits return resultstep2:java的spring注入
先初始化
@Bean("ratelimitLua") public DefaultRedisScript getRedisScript() {DefaultRedisScript redisScript = new DefaultRedisScript();redisScript.setLocation(new ClassPathResource("limit/ratelimit.lua"));redisScript.setResultType(java.util.List.class);return redisScript; }@Bean("ratelimitInitLua") public DefaultRedisScript getInitRedisScript() {DefaultRedisScript redisScript = new DefaultRedisScript();redisScript.setLocation(new ClassPathResource("limit/ratelimitInit.lua"));redisScript.setResultType(java.lang.Long.class);return redisScript; }再定義ratelimit方法控制入口
/*** 利用Redis進行限流,解決分布式、高TPS的問題*/ @Service public class RateLimitClient {private final static long PERMITS_PER_SECOND = 3250;@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Qualifier("getRedisScript")@Resourceprivate RedisScript<List> ratelimitLua;@Qualifier("getInitRedisScript")@Resourceprivate RedisScript<Long> ratelimitInitLua;/*** 初始化令牌桶!!!** @param key* @return*/public Token initToken(String key) {Token token = Token.SUCCESS;if (stringRedisTemplate.hasKey(getKey(key))) {return token;}Long acquire = stringRedisTemplate.execute(ratelimitInitLua,Collections.singletonList(getKey(key)), String.valueOf(PERMITS_PER_SECOND), String.valueOf(PERMITS_PER_SECOND));if (acquire == 1) {token = Token.SUCCESS;} else if (acquire == 0) {token = Token.SUCCESS;} else {token = Token.FAILED;}return token;}/*** 根據請求值去令牌桶獲取,之所以用Long是因為lua腳本返回值用Integer接收有問題* lua要不直接返回json然后轉map,為了省事直接返回list** list.get(0) 距離下一次刷新的最小時間間隔,單位:微秒* list.get(1) 獲取到的令牌數** @param key* @param permits* @return*/public List<Long> acquireIntervalAndToken(String key, Integer permits){List intervalAndToken = stringRedisTemplate.execute(ratelimitLua,Collections.singletonList(getKey(key)), permits.toString());return intervalAndToken;}public String getKey(String key) {return Constants.RATE_LIMIT_KEY + key;} }step3:業務代碼中使用
在多線程環境下驗證
@Autowired private RateLimitClient rateLimitClient;@Test public void redisLuaScriptTest() throws Exception {Random random = new Random();for (int i = 0; i < 100; i++) {// 令牌桶的線程安全驗證consumerExecutor.execute(new Runnable() {@Overridepublic void run() {int requireToken = random.nextInt(1000);LOGGER.info("requireToken:{}", requireToken);List<Long> intervalAndToken = rateLimitClient.acquireIntervalAndToken(MessageProto.Platform.HUAWEI.name(), requireToken);LOGGER.info("requireToken:{}, acquireToken:{} ", requireToken, intervalAndToken);}});} }總結
- 上一篇: 通向Golang的捷径【20. 使用 G
- 下一篇: 阜阳市计算机学校助学金申请书,计算机专业