生活随笔
收集整理的這篇文章主要介紹了
服务的限流
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
?對于遠程調用來說,限流是很重要的,這是對自己的保護政策,因為為了保證在業務高峰期,線上系統也能保證一定的彈性和穩定性,最有效的方案就是進行服務降級了,而限流就是降級系統最常采用的方案之一
限流即流量限制,或者高大上一點,叫做流量整形,限流的目的是在遇到流量高峰期或者流量突增(流量尖刺)時,把流量速率限制在系統所能接受的合理范圍之內,不至于讓系統被高流量擊垮。
目前有幾種常見的限流方式:
1)通過限制單位時間段內調用量來限流
2)通過限制系統的并發調用程度來限流
3)使用漏桶(Leaky Bucket)算法來進行限流
4)使用令牌桶(Token Bucket)算法來進行限流
我們將做第一種最為簡單的限流方式,限制單位時間段內調用量來限流
具體的理論知識大家可以參考:
http://www.dididaijiatech.com/wordpress/中有一篇限流的文章,大家可以一起學習,我也是抄襲的~
限流最難做的就是需要統計單位時間內調用的次數:
我們來看看在Java語言中,這種方式具體應該如何做,第一步我們需要做的就是確定這個單位時間段有多長,肯定不能太長,太長將會導致限流的效果變得不夠“敏感”,因為我們知道,進入限流階段后,如果采用的手段是不允許繼續訪問,那么在該單位時間段內,該服務是不可用的,比如我們把單位時間設置成1小時,如果在第29分鐘,該服務的訪問量就達到了我們設定的閾值,那么在接下來的31分鐘,該服務都將變得不可用,這無形SO BAD!!如果單位時間段設置得太短,越短的單位時間段將導致我們的閾值越難設置,比如1秒鐘,因為高峰期的1秒鐘和低峰期的1秒鐘的流量有可能相差百倍甚至千倍,同時過短的單位時間段也對限流代碼片段提出了更高要求,限流部分的代碼必須相當穩定并且高效!最優的單位時間片段應該以閾值設置的難易程度為標準,比如我們的監控系統統計的是服務每分鐘的調用量,所以很自然我們可以選擇1分鐘作為時間片段,因為我們很容易評估每個服務在高峰期和低峰期的分鐘調用量,并可以通過服務在每分鐘的平均耗時和異常量來評估服務在不同單位時間段的服務質量,這給閾值的設置提供了很好的參考依據。當單位時間段和閾值已經確定,接下來就該考慮計數器的實現了,最快能想到的就是AtomicLong了,對于每次服務調用,我們可以通過AtomicLong#incrementAndGet()方法來給計數器加1并返回最新值,我們可以通過這個最新值和閾值來進行比較來看該服務單位時間段內是否超過了閾值。這里,如何設計計數器是個關鍵,假設單位時間段為1分鐘,我們可以做一個環狀結構的計數器,如下:
當然我們可以直接用一個數組來實現它:new AtomicLong[]{new AtomicLong(0), new AtomicLong(0), new AtomicLong(0)},當前分鐘AtomicLong保存的是當前單位時間段內該服務的調用次數,上一分鐘顯然保存的是上一單位時間段的統計數據,之所以有這個是為了統計需要,既然到了當前的單位時間段,說明上一分鐘的訪問統計已經結束,即可將上一分鐘的該接口的訪問量數據打印日志或發送到某個服務端進行統計,因為我們說過,閾值的設置是個不斷調優的過程,所以有時候這些統計數據會很有用。在對當前時間段的訪問量進行統計的時候,需要將下一分鐘的AtomicLong清零,這一步是很關鍵的,有兩種清零方案:第一種,直接(通過Executors.newSingleThreadScheduledExecutor)起個單獨線程,比如每隔50秒(這個當然必須小于單位時間段)對下一分鐘的AtomicLong進行清零。第二種,每次在給當前分鐘AtomicLong加1時,對下一分鐘的AtomicLong的值進行檢測,如果不是0,則設置成0,如果采用這種方案,這里會有一個bug,如果某個服務在某個完整的單位時間段內一次也沒有被調用,則下一分鐘的AtomicLong在使用前顯然沒有被清0,所以采用第二種方案還得通過額外的一個字段保存上一次清0的時間,每次使用當前分鐘AtomicLong時,需要先判斷這個字段,如果超過一個單位時間段,這則需要先清0再使用。兩種方案對比來看,第一種方案實現起來更簡單。對于如何訪問當前分鐘、上一分鐘和下一分鐘的AtomicLong,可以直接通過當前分鐘數來對數組的length取模即可(比如獲取當前分鐘的數據index:(System.currentTimeMillis() / 60000) % 3)。
對于限制單位時間段內調用量的這種限流方式,實現簡單,適用于大多數場景,如果閾值可以通過服務端來動態配置,甚至可以當做業務開關來使用,但也有一定的局限性,因為我們的閾值是通過分析單位時間段內調用量來設置的,如果它在單位時間段的前幾秒就被流量突刺消耗完了,將導致該時間段內剩余的時間內該服務“拒絕服務”,可以將這種現象稱為“突刺消耗”,但慶幸的是,這種情況并不常見。
(上面的文字抄與滴滴藏經閣)
我們來根據這個理論進行限流編碼,首先先按照上述的理論實現:
[java]?view plain
?copypublic?static?class?FlowController?{????????????????????private?AtomicLong[]?metricses?=?new?AtomicLong[]{new?AtomicLong(0),?new?AtomicLong(0),?new?AtomicLong(0)};??????????????????????????????public?void?incrementAtCurrentMinute(){????????????????????????????long?currentTime?=?SystemClock.millisClock().now();??????????????int?index?=?(int)?((currentTime?/?60000)?%?3);????????????????????????????AtomicLong?atomicLong?=?metricses[index];??????????????atomicLong.incrementAndGet();????????????????????????}????????????????????public?long?getLastCallCountAtLastMinute(){????????????????????????????long?currentTime?=?SystemClock.millisClock().now();??????????????int?index?=?(int)?(((currentTime?/?60000)?-?1)?%?3);??????????????AtomicLong?atomicLong?=?metricses[index];??????????????return?atomicLong.get();????????????????????????}????????????????????public?long?getCurrentCallCount(){????????????????????????????long?currentTime?=?SystemClock.millisClock().now();??????????????int?index?=?(int)?(((currentTime?/?60000))?%?3);??????????????AtomicLong?atomicLong?=?metricses[index];??????????????return?atomicLong.get();????????????????????????}??????????public?long?getNextMinuteCallCount(){????????????????????????????long?currentTime?=?SystemClock.millisClock().now();??????????????int?index?=?(int)?(((currentTime?/?60000)?+?1)?%?3);??????????????AtomicLong?atomicLong?=?metricses[index];??????????????return?atomicLong.get();????????????????????????}????????????????????public?void?clearNextMinuteCallCount(){????????????????????????????System.out.println("清理開始");??????????????long?currentTime?=?SystemClock.millisClock().now();??????????????int?index?=?(int)?(((currentTime?/?60000)?+?1)?%?3);??????????????AtomicLong?atomicLong?=?metricses[index];??????????????atomicLong.set(0);??????????}????????????public?AtomicLong[]?getMetricses()?{??????????????return?metricses;??????????}????????????public?void?setMetricses(AtomicLong[]?metricses)?{??????????????this.metricses?=?metricses;??????????}????????????????}??
我們將其添加到我們自己限流的使用場景,某一個服務對應一個自己的限流器
[java]?view plain
?copyprivate?static?Map<String,FlowController>?serviceFlowController?=?new?HashMap<String,?FlowController>();??
我們模仿調用的場景:
[java]?view plain
?copyfor(int?i?=?0;i?<10000;i++){??????????????callXXXXService("TEST.SERVICE");??????????????Thread.sleep(20l);??????????}??
每隔20毫秒去調用一下某個服務:
[java]?view plain
?copyprivate?static?void?callXXXXService(String?service)?{????????????????????FlowController?controller?=?serviceFlowController.get(service);??????????if(null?==?controller){??????????????controller?=?new?FlowController();??????????????serviceFlowController.put(service,?controller);??????????}??????????controller.incrementAtCurrentMinute();????????????????????????????????????}??
我們還需要有個定時任務,定時清理下一個槽位的已經統計的值
[java]?view plain
?copyprivate?static?final?ScheduledExecutorService?scheduledExecutorService?=?Executors.newSingleThreadScheduledExecutor(new?NamedThreadFactory("provider-timer"));??
我們每隔45秒清理一下下一個時間段的槽位
[java]?view plain
?copyscheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{??????????????@Override??????????????public?void?run()?{??????????????????try?{??????????????????????System.out.println("清理下一個時間的槽位");??????????????????????clearNextMinuteCallCount();??????????????????}?catch?(Exception?e)?{??????????????????????logger.warn("schedule?publish?failed?[{}]",?e.getMessage());??????????????????}??????????????}????????????????????????},?3,?45,?TimeUnit.SECONDS);??
這樣我們就完成了基本的限流功能,完整的代碼示例:
[java]?view plain
?copypackage?org.laopopo.example.flow.controller;????import?java.util.HashMap;??import?java.util.Map;??import?java.util.concurrent.Executors;??import?java.util.concurrent.ScheduledExecutorService;??import?java.util.concurrent.TimeUnit;??import?java.util.concurrent.atomic.AtomicLong;????import?org.laopopo.common.utils.NamedThreadFactory;??import?org.laopopo.common.utils.SystemClock;??import?org.slf4j.Logger;??import?org.slf4j.LoggerFactory;????public?class?PerMinuteFlowController?{????????????private?static?final?Logger?logger?=?LoggerFactory.getLogger(PerMinuteFlowController.class);????????????private?static?Map<String,FlowController>?serviceFlowController?=?new?HashMap<String,?FlowController>();????????????private?static?final?ScheduledExecutorService?scheduledExecutorService?=?Executors.newSingleThreadScheduledExecutor(new?NamedThreadFactory("provider-timer"));????????????public?static?void?main(String[]?args)?throws?InterruptedException?{????????????????????Thread?t?=?new?Thread(new?MetricsScanner(),?"timeout.scanner");??????????t.setDaemon(true);??????????t.start();????????????????????scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{??????????????@Override??????????????public?void?run()?{??????????????????try?{??????????????????????System.out.println("清理下一個時間的槽位");??????????????????????clearNextMinuteCallCount();??????????????????}?catch?(Exception?e)?{??????????????????????logger.warn("schedule?publish?failed?[{}]",?e.getMessage());??????????????????}??????????????}????????????????????????},?3,?45,?TimeUnit.SECONDS);????????????????????for(int?i?=?0;i?<10000;i++){??????????????callXXXXService("TEST.SERVICE");??????????????Thread.sleep(20l);??????????}??????}????????????private?static?void?clearNextMinuteCallCount()?{????????????????????for(String?str?:?serviceFlowController.keySet()){??????????????FlowController?flowController?=?serviceFlowController.get(str);??????????????flowController.clearNextMinuteCallCount();??????????}??????}????????????private?static?void?callXXXXService(String?service)?{????????????????????FlowController?controller?=?serviceFlowController.get(service);??????????if(null?==?controller){??????????????controller?=?new?FlowController();??????????????serviceFlowController.put(service,?controller);??????????}??????????controller.incrementAtCurrentMinute();????????????????????????????????????}????????public?static?class?FlowController?{????????????????????private?AtomicLong[]?metricses?=?new?AtomicLong[]{new?AtomicLong(0),?new?AtomicLong(0),?new?AtomicLong(0)};??????????????????????????????public?void?incrementAtCurrentMinute(){????????????????????????????long?currentTime?=?SystemClock.millisClock().now();??????????????int?index?=?(int)?((currentTime?/?60000)?%?3);????????????????????????????AtomicLong?atomicLong?=?metricses[index];??????????????atomicLong.incrementAndGet();????????????????????????}????????????????????public?long?getLastCallCountAtLastMinute(){????????????????????????????long?currentTime?=?SystemClock.millisClock().now();??????????????int?index?=?(int)?(((currentTime?/?60000)?-?1)?%?3);??????????????AtomicLong?atomicLong?=?metricses[index];??????????????return?atomicLong.get();????????????????????????}????????????????????public?long?getCurrentCallCount(){????????????????????????????long?currentTime?=?SystemClock.millisClock().now();??????????????int?index?=?(int)?(((currentTime?/?60000))?%?3);??????????????AtomicLong?atomicLong?=?metricses[index];??????????????return?atomicLong.get();????????????????????????}??????????public?long?getNextMinuteCallCount(){????????????????????????????long?currentTime?=?SystemClock.millisClock().now();??????????????int?index?=?(int)?(((currentTime?/?60000)?+?1)?%?3);??????????????AtomicLong?atomicLong?=?metricses[index];??????????????return?atomicLong.get();????????????????????????}????????????????????public?void?clearNextMinuteCallCount(){????????????????????????????System.out.println("清理開始");??????????????long?currentTime?=?SystemClock.millisClock().now();??????????????int?index?=?(int)?(((currentTime?/?60000)?+?1)?%?3);??????????????AtomicLong?atomicLong?=?metricses[index];??????????????atomicLong.set(0);??????????}????????????public?AtomicLong[]?getMetricses()?{??????????????return?metricses;??????????}????????????public?void?setMetricses(AtomicLong[]?metricses)?{??????????????this.metricses?=?metricses;??????????}????????????????}????????????public?static?class?MetricsScanner?implements?Runnable?{????????????@Override??????????public?void?run()?{??????????????for?(;;)?{??????????????????logger.info("統計中");??????????????????try?{??????????????????????Thread.sleep(5000);??????????????????????for(String?str?:?serviceFlowController.keySet()){??????????????????????????FlowController?flowController?=?serviceFlowController.get(str);??????????????????????????logger.info("上一秒調用的次數是[{}]",flowController.getLastCallCountAtLastMinute());??????????????????????????logger.info("當前秒調用的次數是[{}]",flowController.getCurrentCallCount());??????????????????????????logger.info("下以秒調用的次數是[{}]",flowController.getNextMinuteCallCount());??????????????????????}??????????????????}?catch?(InterruptedException?e)?{??????????????????????e.printStackTrace();??????????????????}??????????????}??????????}????????????????}????}??
運行截圖:
過幾十秒打印日記:
看打印日記成功了,實驗基本上上成功了,我們可以做到統計每分鐘的調用次數,并且可以隨時得到上一分鐘的調用次數的統計了~
接下來我們就需要將這段測試代碼嵌入到我們實際的使用場景下去了,我們先理一下邏輯:
1)我們需要在編織服務的時候,用Annotation注明該服務實例的單位時間(分鐘)最大的調用次數
2)在服務編織的時候,將其記錄好,保存到某個全局變量中去,一個serviceName和一個最大的調用次數是一一對應的
3)當consumer實例端每調用一次,則限流器的次數加1
我們把限流的代碼嵌入到RPC的Demo中去:
還記得我們在編織的那個小節中的自定義的Annotation中規定了某個具體的服務最大的單位時間調用次數,可以如下配置:
[java]?view plain
?copypackage?org.laopopo.example.generic.test_5;????import?org.laopopo.client.annotation.RPCService;??import?org.laopopo.example.demo.service.HelloSerivce;????public?class?HelloServiceFlowControllerImpl?implements?HelloSerivce?{????????@Override??????@RPCService(responsibilityName="xiaoy",serviceName="LAOPOPO.TEST.SAYHELLO",maxCallCountInMinute?=?40)??????public?String?sayHello(String?str)?{??????????return?"hello?"+?str;??????}????}??
在編織服務的時候,將其記錄到,全局限流管理器中去:
[java]?view plain
?copyif(maxCallCount?<=?0){??????throw?new?RpcWrapperException("max?call?count?must?over?zero?at?unit?time");??}??????ServiceFlowControllerManager?serviceFlowControllerManager?=?providerController.getServiceFlowControllerManager();??????serviceFlowControllerManager.setServiceLimitVal(serviceName,?maxCallCount);??
ServiceFlowControllerManager.java
[java]?view plain
?copypackage?org.laopopo.client.provider.flow.control;????import?java.util.concurrent.ConcurrentHashMap;??import?java.util.concurrent.ConcurrentMap;????import?org.laopopo.common.utils.Pair;??import?org.slf4j.Logger;??import?org.slf4j.LoggerFactory;??????????????public?class?ServiceFlowControllerManager?{????????????private?static?final?Logger?logger?=?LoggerFactory.getLogger(ServiceFlowControllerManager.class);????????????private?static?final?ConcurrentMap<String,?Pair<Long,ServiceFlowController>>?globalFlowControllerMap?=?new?ConcurrentHashMap<String,?Pair<Long,ServiceFlowController>>();??????????????????public?void?setServiceLimitVal(String?serviceName,Long?maxCallCount){????????????????????Pair<Long,ServiceFlowController>?pair?=?new?Pair<Long,?ServiceFlowController>();??????????pair.setKey(maxCallCount);??????????pair.setValue(new?ServiceFlowController());??????????globalFlowControllerMap.put(serviceName,?pair);????????????????}?????????????????????public?void?incrementCallCount(String?serviceName){????????????????????Pair<Long,ServiceFlowController>?pair?=?globalFlowControllerMap.get(serviceName);????????????????????if(null?==?pair){??????????????logger.warn("serviceName?[{}]?matched?no?flowController",serviceName);??????????????return;??????????}????????????????????ServiceFlowController?serviceFlowController?=?pair.getValue();??????????serviceFlowController.incrementAtCurrentMinute();????????????????}??????????????????????public?boolean?isAllow(String?serviceName){????????????????????Pair<Long,ServiceFlowController>?pair?=?globalFlowControllerMap.get(serviceName);????????????????????if(null?==?pair){??????????????logger.warn("serviceName?[{}]?matched?no?flowController",serviceName);??????????????return?false;??????????}????????????????????ServiceFlowController?serviceFlowController?=?pair.getValue();??????????Long?maxCallCount?=?pair.getKey();??????????long?hasCallCount?=?serviceFlowController.incrementAtCurrentMinute();????????????????????serviceFlowController.incrementAtCurrentMinute();??????????return?hasCallCount?>?maxCallCount???false?:true;????????????????}??????????????????????public?Long?getLastMinuteCallCount(String?serviceName){??????????Pair<Long,ServiceFlowController>?pair?=?globalFlowControllerMap.get(serviceName);????????????????????if(null?==?pair){??????????????logger.warn("serviceName?[{}]?matched?no?flowController",serviceName);??????????????return?0l;??????????}??????????ServiceFlowController?serviceFlowController?=?pair.getValue();??????????return?serviceFlowController.getLastCallCountAtLastMinute();??????}????????????????????public?void?clearAllServiceNextMinuteCallCount(){????????????????????for(String?service?:?globalFlowControllerMap.keySet()){????????????????????????????Pair<Long,ServiceFlowController>?pair?=?globalFlowControllerMap.get(service);??????????????if(null?==?pair){??????????????????logger.warn("serviceName?[{}]?matched?no?flowController",service);??????????????????continue;??????????????}??????????????ServiceFlowController?serviceFlowController?=?pair.getValue();??????????????serviceFlowController.clearNextMinuteCallCount();??????????}??????}??????????????}??
然后在調用的遠程調用的過程中加入一個簡單的判斷就可以了:
[java]?view plain
?copy???ServiceFlowControllerManager?serviceFlowControllerManager?=?defaultProvider.getProviderController().getServiceFlowControllerManager();????????if?(!serviceFlowControllerManager.isAllow(serviceName))?{????????????rejected(APP_FLOW_CONTROL,channel,?request,serviceName);????????????return;??}??
以上代碼就可以實現簡單的限流控制了:
源碼PerMinuteFlowController.java
https://github.com/BazingaLyn/laopopo-rpc/blob/master/laopopo-example/src/main/java/org/laopopo/example/flow/controller/PerMinuteFlowController.java
限流測試代碼:
https://github.com/BazingaLyn/laopopo-rpc/tree/master/laopopo-example/src/main/java/org/laopopo/example/generic/test_5
總結
以上是生活随笔為你收集整理的服务的限流的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。