解锁新姿势 | 如何用配置中心实现全局动态流控?
為什么80%的碼農都做不了架構師?>>> ??
摘要:?當資源成為瓶頸時,服務框架需要對消費者做限流,啟動流控保護機制。流量控制有多種策略,比較常用的有:針對訪問速率的靜態流控、針對資源占用的動態流控、針對消費者并發連接數的連接控制和針對并行訪問數的并發控制。在分布式架構中,應用和應用之間的調用類型分為以下兩種,流控方式也略有不同。
?
點此查看原文:https://yq.aliyun.com/articles/380180?spm=a2c41.11181499.0.0
當資源成為瓶頸時,服務框架需要對消費者做限流,啟動流控保護機制。流量控制有多種策略,比較常用的有:針對訪問速率的靜態流控、針對資源占用的動態流控、針對消費者并發連接數的連接控制和針對并行訪問數的并發控制。在實踐中,各種流量控制策略需要綜合使用才能起到較好的效果。
在分布式架構中,應用和應用之間的調用類型分為以下兩種,流控方式也略有不同。
同步RPC類調用,比如RESTful,Dubbo,HSF等都屬于該類。對于該類同步調用,通常限流方式為兩種:針對服務提供者的并發全局流控,或針對服務消費者的并發局部流控。兩種的控制手段類似,都是通過限制服務端或客服端并發調用數來進行限制。
異步MQ類調用,典型如RocketMQ, Kafka,等。對于該類異步調用,通常限流方式是在訂閱端限流。限流方式為兩種:針對消息訂閱者的并發流控,或針對消息訂閱者的消費延時流控。
針對消息訂閱者的消費延時流控基本原理是,在每次客戶端消費時,可以增加一個延時來控制消費速度,這樣理論消費并發最快速度為:
MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber
比如如果消息并發消費線程為20,延時為100ms,則理論上可以將并發消費控制在200以下。具體公式如下:
200 = 1 / 0.1 * 20
相比并發線程數流控,消費延時流控優點在于實現相對簡單,對MQ類客戶端包依賴較少,不需要客戶端提供控制并發線程數的動態調整接口。
以上各種流量控制方法,在分布式架構下,如果要做到全局動態控制,一個簡單的技術方法是依賴配置中心,即通過配置中心來進行流控參數的下發。
下面章節詳細介紹如何基于配置中心來實現異步消息消費的全局動態流控。使用的例子為阿里云上的 MQ (消息隊列)和 ACM (應用配置管理)兩款產品。
注:之所以用MQ為示例是因為在本文撰寫之時,正好MQ Consumer Client SDK并不支持動態調整現成并發數,因此通過基于ACM來動態調整消費延遲的方法正好可以解決MQ消費流控動態的問題。
基于消費延時流控的基本原理
基本原理如下。其中,管理員或應用程序通過ACM控制臺發布消費延時配置(RCV_INTERVAL_TIME),所有MQ消費程序訂閱該配置。理論上,該配置從發布到下發所有客戶端,可以在1秒內完成(取決于網絡延時)。
代碼示例
該章節基于配置中心來實現異步消息消費的全局動態流控的代碼示例。使用的例子為阿里云上的MQ(消息隊列)和ACM(應用配置管理)兩款產品,基于Java語言。關于SDK的詳細介紹,可參見兩款產品的官方文檔。
在ACM上創建消費延時的參數,截屏如下。
設置全局消費延時變量
首先,設置消費接收延時的全局變量, 如下。
// 初始化消息接收延時參數,單位為millisecondstatic int RCV_INTERVAL_TIME = 10000;// 初始化配置服務,控制臺通過示例代碼自動獲取下面參數ConfigService.init("acm.aliyun.com", /*租戶ID*/"xxx", /*AK*/"xxx", /*SK*/"yyy"); // 主動獲取配置String content = ConfigService.getConfig("app.mq.qos", "DEFAULT_GROUP", 6000);Properties p = new Properties();try {p.load(new StringReader(content));RCV_INTERVAL_TIME = Integer.valueOf(p.getProperty("RCV_INTERVAL_TIME"));} catch (IOException e) {e.printStackTrace();}其次,設置ACM listener,確保當配置被修改時,即使更新 RCV_INTERVAL_TIME 參數, 如下。
// 初始化的時候,給配置添加監聽,配置變更會回調通知ConfigService.addListener("app.mq.qos", "DEFAULT_GROUP", new ConfigChangeListener() {public void receiveConfigInfo(String configInfo) {Properties p = new Properties();try {p.load(new StringReader(configInfo));RCV_INTERVAL_TIME = Integer.valueOf(p.getProperty("RCV_INTERVAL_TIME"));} catch (IOException e) {e.printStackTrace();}}});設置 MQ 消費延時邏輯
完整實例如下。
注:這里 RCV_INTERVAL_TIME 參數的訪問是故意沒有加鎖的,讀者可以自行思考原因。Aliyun ONS Client不提供動態線程并發數,默認并發為20。因此這里正好使用消費延時參數來動態調節QoS。
//以下代碼可直接貼在Main()函數里Properties properties = new Properties();properties.put(PropertyKeyConst.ConsumerId, "CID_consumer_group");properties.put(PropertyKeyConst.AccessKey,"xxx");properties.put(PropertyKeyConst.SecretKey, "yyy");properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");// 設置 TCP 接入域名(此處以公共云生產環境為例)properties.put(PropertyKeyConst.ONSAddr,"http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");Consumer consumer = ONSFactory.createConsumer(properties);consumer.subscribe(/*Topic*/"topic-name", /*Tag*/null, new MessageListener() {public Action consume(Message message, ConsumeContext context) {// MQ Subscribe QoS logical start, // Each consuming process will sleep for RCV_INTERVAL_TIME seconds with 100 ms sleeping cycle.// Within each cycle, the thread will check RCV_INTERVAL_TIME in case it's set to a smaller value. // RCV_INTERVAL_TIME <= 0 means no sleeping.int rcvIntervalTimeLeft = RCV_INTERVAL_TIME;while (rcvIntervalTimeLeft > 0) {if (rcvIntervalTimeLeft > RCV_INTERVAL_TIME) {rcvIntervalTimeLeft = RCV_INTERVAL_TIME;}try {if (rcvIntervalTimeLeft >= 100) {rcvIntervalTimeLeft -= 100;Thread.sleep(100);} else {Thread.sleep(rcvIntervalTimeLeft);rcvIntervalTimeLeft = 0;}} catch (InterruptedException e) {e.printStackTrace();}}// MQ Subscribe interval logical endsSystem.out.println("Receive: " + message);/** Put your business logic here.*/doSomething();return Action.CommitMessage;}});consumer.start();運行結果
單機運行consumer進行消費,假設queue內的消息無限多,不存在消費萬的情況,分三段測試,分別運行約5分鐘,通過ACM配置推送來達到以下效果。
RCV_INTERVAL_TIME = 100 ms
RCV_INTERVAL_TIME = 5000 ms
RCV_INTERVAL_TIME = 1000 ms
結果如下,在單MQ消費業務處理耗時約100ms情況下的,單機并發20線程的測試結果。
RCV_INTERVAL_TIME = 100 ms:平均消費性能約為 9000 tpm 左右
RCV_INTERVAL_TIME = 5000 ms:平均消費性能被限制到了 200 tpm 左右
RCV_INTERVAL_TIME = 1000 ms:平均消費性能回升到到了 1100 tpm 左右
以上結果基本達到消費和 tpm 成反比的預期,最關鍵的是整個過程中,應用不中斷,流控推送結果秒級生效到分布式集群。單機性能結果如下所示。
轉載于:https://my.oschina.net/yunqi/blog/1612514
總結
以上是生活随笔為你收集整理的解锁新姿势 | 如何用配置中心实现全局动态流控?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 阿里云CentOS 7.4 配置Ngin
- 下一篇: 命令行以及git基础使用