python服务端多进程压测工具
本文描述一個python實現的多進程壓測工具,這個壓測工具的特點如下:
多進程
在大多數情況下,壓測一般適用于IO密集型場景(如訪問接口并等待返回),在這種場景下多線程多進程的區分并不明顯(詳情請參見GIL相關)。不過一旦出現詞表參數加密、返回內容校驗等事情的話,多進程對發送效率的提升還是很明顯的。
可以指定發送QPS
可以指定發壓的QPS,根據并行度和請求相應時間,可以估算出可發送QPS峰值。例如并行度是10,響應時間是100ms,那么QPS峰值應該是(1s/100ms * 10)=100,此工具可以將QPS穩定的維持在小于峰值的一個量上。
便于擴展
為什么要DIY壓測工具了?一般的服務端壓測工具,例如http_load和jmeter,不是http協議的,就是需要通過代碼進行擴展。例如在壓測thrift接口的時候,即使通過jmeter擴展java程序也很麻煩。但是當涉及到場景化壓測,或者是奇怪的SDK,例如本文要壓測的接口是通過java代碼自動生成的python消息類SDK,并且涉及到場景化的壓測,很難通過一般的服務端壓測工具搞定。
1、發壓代碼
解耦
下面是壓測代碼的實現,可以看到,我這里使用abc包,做了一個抽象類。
業務測試代碼,例如自動化case,只要繼承了這個抽象類,就獲得壓測的能力,做到壓測和自動化測試的解耦。
這里有兩個抽象方法
-
vocab() - 構造詞表
-
press() - 發壓邏輯
是被@abc.abstractmethod裝飾器裝飾,在子類中,是一定要被實現的。
run()方法是壓測執行的方法,實現子類的詞表方法和發壓邏輯之后,直接調用run()方法就可以壓測了。
固定QPS
固定QPS是通過管理進程實現的。可以看到有兩種進程:
一種是worker_process進程,調用了press()發壓邏輯函數,并且這個進程可以指定并發度concurrent,是實際的發壓進程,值得注意的是在worker_process中使用了time.sleep(),是為了控制發送速度。
另一種是manager_process進程,這個進程每隔一段時間計算實際的qps,并和設置的qps比較,然后調整worker_process中的sleep時間,例如實際qps小于設定qps,那么就少睡一會兒。
這里不得不提到的是,多進程如何共享變量?
這里使用的是multiprocessing中的Manager包,這個包提供了多進程共享變量的能力,我這里用到的是Namespace數據結構來存儲多進程的計數。在使用過程中我懷疑Manager Namespace是通過讀寫文件的形式進行進程間共享變量的,這個我沒有深入的研究。
# -*- coding:utf-8 -*- import abc import time from multiprocessing import Lock, Process, Managerclass Press(object):__metaclass__ = abc.ABCMetadef __init__(self, qps=100, concurrent=10):self.qps = qpsself.concurrent = concurrentself.mutex = Lock()self.local = Manager().Namespace()self.local.count = 0self.local.sleep = 0.1self.manager_gap = 0.5self.precision = 0.1self.vocab_list = list()self.vocab()def manager_process(self):while True:with self.mutex:current_qps = self.local.count / self.manager_gapself.local.count = 0print self.local.sleep, current_qpsif current_qps < self.qps:self.local.sleep = self.local.sleep * (1.0 - self.precision)else:self.local.sleep = self.local.sleep * (1.0 + self.precision)time.sleep(self.manager_gap)def worker_process(self):while True:with self.mutex:self.local.count += 1time.sleep(self.local.sleep)self.press()@abc.abstractmethoddef vocab(self):return@abc.abstractmethoddef press(self):returndef run(self):processes = [Process(target=self.worker_process) for index in range(self.concurrent)]processes.append(Process(target=self.manager_process))for process in processes:process.start()for process in processes:process.join()2、實際壓測
給出一個發壓的例子。分三步~
QueryVmPress繼承了Press類,獲得了發壓能力。
然后實現了vocab方法,構造了詞表。
實現了press方法,這里是發壓邏輯,可以看到QueryVmScenario.press_vm(vocab),QueryVmScenario放的是自動化case。發壓只是調用了其中的一個接口。這個接口的編寫很復雜,也是為什么要自己做一個壓測工具的原因。
''' 遇到問題沒人解答?小編創建了一個Python學習交流QQ群:778463939 尋找有志同道合的小伙伴,互幫互助,群里還有不錯的視頻學習教程和PDF電子書! ''' # -*- coding:utf-8 -*- import random from query.query_vm_scenario import QueryVmScenario from db.vm_dao import Dao as vm_dao from db.account_dao import Dao as account_dao from press import Press from lib import common from vocab import Vocabclass QueryVmVocab(Vocab):def __init__(self):Vocab.__init__(self)class QueryVmPress(Press):def __init__(self, qps=100, concurrent=10):Press.__init__(self, qps, concurrent)def vocab(self):for account in account_dao.query_all_account(limit=10):account_name = account[1]account_password = account[2]res = common.login_by_account(account_name, account_password)for item in vm_dao.query_vm_by_account(account_name, limit=100):vm_uuid = item[1]vocab = QueryVmVocab()vocab.add('session_uuid', res.inventory.uuid)vocab.add('vm_uuid', vm_uuid)self.vocab_list.append(vocab)return self.vocab_listdef press(self):vocab = self.vocab_list[random.randint(0, len(self.vocab_list)-1)]QueryVmScenario.press_vm(vocab)if __name__ == '__main__':QueryVmPress(qps=100, concurrent=10).run()QueryVmPress(qps=100, concurrent=10).run(),就按照100QPS進行壓測了。
0.1 20.0 0.09 40.0 0.081 60.0 0.0729 80.0 0.06561 60.0 0.059049 80.0 0.0531441 60.0 0.04782969 80.0 0.043046721 80.0 0.0387420489 80.0 0.03486784401 80.0 0.031381059609 100.0 0.0345191655699 80.0 0.0310672490129 88.0 0.0279605241116 92.0 0.0251644717005 100.0 0.0276809188705 80.0 0.0249128269835 100.0 0.0274041096818 100.0 0.03014452065 80.0 0.027130068585 100.0 0.0298430754435 80.0 0.0268587678991 100.0 0.029544644689 92.0第一列是sleep時間,第二列是實際QPS,可以看到,qps會被動態的穩定在設置的值上。
3、混壓
當要做多個接口混壓的時候,可以這樣做。
先寫好單壓的python類,在單壓的代碼里,可以看到我實現了QueryVmVocab類,表名了詞表的類型,這個類集成自Vocab,Vocab就是一個字典的封裝。
混壓的時候,先將詞表匯總,并且shuffle,然后彈出詞表的時候,使用isinstance判斷詞表的類型,調用不同的發壓函數進行壓測。
vocab的實現
''' 遇到問題沒人解答?小編創建了一個Python學習交流QQ群:778463939 尋找有志同道合的小伙伴,互幫互助,群里還有不錯的視頻學習教程和PDF電子書! ''' # -*- coding:utf-8 -*- import abcclass Vocab(object):__metaclass__ = abc.ABCMetadef __init__(self):self.vocab = dict()def add(self, key, value):self.vocab[key] = valuedef get(self, key):return self.vocab.get(key)def remove(self, key):del self.vocab[key]混壓的實現
# -*- coding:utf-8 -*- import randomfrom press import Press from query_eip_press import QueryEipPress, QueryEipVocab from query_image_press import QueryImagePress, QueryImageVocab from query_snapshot_press import QuerySnapshotPress, QuerySnapshotVocab from query_vm_press import QueryVmPress, QueryVmVocabfrom query.query_eip_scenario import QueryEipScenario from query.query_image_scenario import QueryImageScenario from query.query_snapshot_scenario import QuerySnapshotScenario from query.query_vm_scenario import QueryVmScenarioclass MixedPress(Press):def __init__(self, qps=100, concurrent=10):Press.__init__(self, qps, concurrent)def vocab(self):self.vocab_list.extend(QueryEipPress().vocab())self.vocab_list.extend(QueryImagePress().vocab())self.vocab_list.extend(QuerySnapshotPress().vocab())self.vocab_list.extend(QueryVmPress().vocab())def press(self):vocab = self.vocab_list[random.randint(0, len(self.vocab_list)-1)]if isinstance(vocab, QueryEipVocab):QueryEipScenario.press_eip(vocab)elif isinstance(vocab, QueryImageVocab):QueryImageScenario.press_image(vocab)elif isinstance(vocab, QuerySnapshotVocab):QuerySnapshotScenario.press_snapshot(vocab)elif isinstance(vocab, QueryVmVocab):QueryVmScenario.press_vm(vocab)if __name__ == '__main__':MixedPress(200, 50).run()后記
這只是一個很小的功能實現,提供給大家參考。如果有不對的地方,希望得到大家指正。
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的python服务端多进程压测工具的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python装饰器实现对异常代码出现进行
- 下一篇: Python 定时调度