python分布式进程(windows下)
分布式進程:
在Thread和Process中,應當優選Process,因為Process更穩定,而且,Process可以分布到多臺機器上,而Thread最多只能分布到同一臺機器的多個CPU上。
Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上。一個服務進程可以作為調度者,將任務分布到其他多個進程中,依靠網絡通信。由于managers模塊封裝很好,不必了解網絡通信的細節,就可以很容易地編寫分布式多進程程序。
舉個例子:如果我們已經有一個通過Queue通信的多進程程序在同一臺機器上運行,現在,由于處理任務的進程任務繁重,希望把發送任務的進程和處理任務的進程分布到兩臺機器上。怎么用分布式進程實現?
原有的Queue可以繼續使用,但是,通過managers模塊把Queue通過網絡暴露出去,就可以讓其他機器的進程訪問Queue了。
我們先看服務進程,服務進程負責啟動Queue,把Queue注冊到網絡上,然后往Queue里面寫入任務:
下面的代碼是在windows下運行的,所以出現了各種問題:
# coding=utf-8 import random, time, Queue from multiprocessing.managers import BaseManager# 發送任務的隊列: task_queue =Queue.Queue() # 接收結果的隊列: result_queue = Queue.Queue()# 從BaseManager繼承的QueueManager: class QueueManager(BaseManager):pass# 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象: QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 綁定端口5000, 設置驗證碼'abc': manager = QueueManager(address=('', 5000), authkey=b'abc') # 啟動Queue: manager.start() # 獲得通過網絡訪問的Queue對象: task = manager.get_task_queue() result = manager.get_result_queue() # 放幾個任務進去: for i in range(10):n = random.randint(0, 10000)print('Put task %d...' % n)task.put(n) # 從result隊列讀取結果: print('Try get results...') for i in range(10):r = result.get(timeout=10)print('Result: %s' % r) # 關閉: manager.shutdown() print('master exit.')在windows命令行終端的運行結果:
由錯誤信息改代碼:
# coding=utf-8import random,time, Queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_supporttask_queue = Queue.Queue() # 發送任務的隊列: result_queue = Queue.Queue() # 接收結果的隊列: class QueueManager(BaseManager): # 從BaseManager繼承的QueueManager:pass # windows下運行 def return_task_queue():global task_queuereturn task_queue # 返回發送任務隊列 def return_result_queue ():global result_queuereturn result_queue # 返回接收結果隊列def test():# 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象,它們用來進行進程間通信,交換對象#QueueManager.register('get_task_queue', callable=lambda: task_queue)#QueueManager.register('get_result_queue', callable=lambda: result_queue)QueueManager.register('get_task_queue', callable=return_task_queue) QueueManager.register('get_result_queue', callable=return_result_queue)# 綁定端口5000, 設置驗證碼'abc':#manager = QueueManager(address=('', 5000), authkey=b'abc')# windows需要寫ip地址manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')manager.start() # 啟動Queue: # 獲得通過網絡訪問的Queue對象:task = manager.get_task_queue() result = manager.get_result_queue()for i in range(10): # 放幾個任務進去:n = random.randint(0, 10000)print('Put task %d...' % n)task.put(n)# 從result隊列讀取結果:print('Try get results...') for i in range(10):# 這里加了異常捕獲try:r = result.get(timeout=5)print('Result: %s' % r)except Queue.Empty:print('result queue is empty.')# 關閉: manager.shutdown() print('master exit.') if __name__=='__main__':freeze_support()print('start!')test()運行結果:
對比上段代碼改變的地方有:
# 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象 QueueManager.register('get_task_queue',callable=return_task_queue) QueueManager.register('get_result_queue',callable=return_result_queue)其中task_queue和result_queue是兩個隊列,分別存放任務和結果。它們用來進行進程間通信,交換對象。
官網上有如下例子。
# coding=utf-8 from multiprocessing import Process, Queue def f(queue):queue.put([42, None, 'hello'])if __name__ == '__main__': q = Queue() # 創建隊列qp = Process(target=f, args=(q,)) # 創建一個進程p.start()print(q.get()) # 打印列表[42, None, 'hello']p.join()其中列表[42, None, ‘hello’]從新建p進程傳到了主進程中。
因為是分布式的環境,放入queue中的數據需要等待Workers機器運算處理后再進行讀取,這樣就需要對queue用QueueManager進行封裝放到網絡中。這是通過下面這句
QueueManager.register('get_task_queue',callable=return_task_queue)實現的,我們給return_task_queue的網絡調用接口取了一個名get_task_queue,而return_result_queue的名字是get_result_queue,方便區分對哪個queue進行操作。
task.put(n)即是對task_queue進行寫入數據,相當于分配任務。而result.get()即是等待workers處理后返回的結果
# windows需要寫ip地址manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')這點不同于linux操作系統,必須寫ip地址
if __name__=='__main__':freeze_support()print('start!')test()windows必須有 if name==’main‘: 這點從報錯的信息可以看出
中間加入了捕獲異常,使代碼運行完整,運行結果更容易看懂,在運行的時候最好用cmd終端。
下面是Worker的代碼:
# coding=utf-8 import time, sys,Queue from multiprocessing.managers import BaseManager# 創建類似的QueueManager: class QueueManager(BaseManager):pass# 由于這個QueueManager只從網絡上獲取Queue,所以注冊時只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue')# 連接到服務器,也就是運行task_master.py的機器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和驗證碼注意保持與task_master.py設置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 從網絡連接: try:m.connect() except:print('請先啟動task_master.py!')#sys.exit("sorry, goodbye!"); # 獲取Queue的對象: task = m.get_task_queue() result = m.get_result_queue() # 從task隊列取任務,并把結果寫入result隊列: for i in range(10):try:n = task.get(timeout=1)print('run task %d * %d...' % (n, n))r = '%d * %d = %d' % (n, n, n*n)time.sleep(1)result.put(r)except Queue.Empty:print('task queue is empty.') # 處理結束: print('worker exit.')這個簡單的Master/Worker模型有什么用?其實這就是一個簡單但真正的分布式計算,把代碼稍加改造,啟動多個worker,就可以把任務分布到幾臺甚至幾十臺機器上,比如把計算n*n的代碼換成發送郵件,就實現了郵件隊列的異步發送。
Queue對象存儲在哪?注意到task_worker.py中根本沒有創建Queue的代碼,所以,Queue對象存儲在task_master.py進程中:
task_worker這里的QueueManager注冊的名字必須和task_manager中的一樣。對比上面的例子,可以看出Queue對象從另一個進程通過網絡傳遞了過來。只不過這里的傳遞和網絡通信由QueueManager完成。
運行結果:
運行task_master.py
運行task_worker.py
此運行是在同一臺電腦上
參考:
https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431929340191970154d52b9d484b88a7b343708fcc60000
總結
以上是生活随笔為你收集整理的python分布式进程(windows下)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python中的ThreadLocal变
- 下一篇: 李严(说一说李严的简介)