python实现分布式_Python如何快速实现分布式任务
深入讀了讀python的官方文檔,發覺Python自帶的multiprocessing模塊有很多預制的接口可以方便的實現多個主機之間的通訊,進而實現典型的生產者-消費者模式的分布式任務架構。
之前,為了在Python中實現生產者-消費者模式,往往就會選擇一個額外的隊列系統,比如rabbitMQ之類。此外,你有可能還要設計一套任務對象的序列化方式以便塞入隊列。如果沒有隊列的支持,那不排除有些同學不得不從socket服務器做起,直接跟TCP/IP打起交道來。
其實multiprocessing.managers中有個BaseManager就為開發者提供了這樣一個快速接口。
我們假定的場景是1個生產者(producer.py)+8個消費者(worker.py)的系統,還有一個中央節點負責協調(server.py)實現如下:
server.py
from multiprocessing.managers import BaseManager
import Queue
queue = Queue.Queue() #初始化一個Q,用于消息傳遞
class QueueManager(BaseManager):
pass
QueueManager.register('get_queue', callable=lambda:queue) # 在系統中發布get_queue這個業務
if __name__ == '__main__':
m = QueueManager(address=('10.239.85.193', 50000),authkey='abr' )
# 監聽所有10.239.85.193的50000口
s = m.get_server()
s.serve_forever()
worker.py
from multiprocessing.managers import BaseManager
from multiprocessing import Pool
class QueueManager(BaseManager):
pass
QueueManager.register('get_queue')
def feb(i): #經典的'山羊增殖'
if i < 2: return 1
if i < 5 : return feb(i-1) + feb(i-2)
return feb(i-1) + feb(i-2) - feb(i-5)
def worker(i):
m = QueueManager(address=('10.239.85.193', 50000), authkey='abr')
#連接server
m.connect()
while True:
queue = m.get_queue()
# 獲取Q
c = queue.get()
print feb(c)
if __name__ == '__main__':
p = Pool(8) # 分進程啟動8個worker
p.map(worker, range(8))
producer.py
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('get_queue')
if __name__ == '__main__':
m = QueueManager(address=('10.239.85.193', 50000), authkey='abr')
m.connect()
i = 0
while True:
queue = m.get_queue()
queue.put(48)
i+=1
系統會直接將Queue() 對象中的數據直接封裝后通過TCP 50000端口在主機之間傳遞。不過需要注意的是,由于authkey的緣故,各個節點要求python的版本一致。
您可能感興趣的文章:在Python程序中實現分布式進程的教程用python + hadoop streaming 分布式編程(一) — 原理介紹,樣例程序與本地調試win10下python3.5.2和tensorflow安裝環境搭建教程Pipenv一鍵搭建python虛擬環境的方法Linux下搭建Spark 的 Python 編程環境的方法Python搭建Spark分布式集群環境
總結
以上是生活随笔為你收集整理的python实现分布式_Python如何快速实现分布式任务的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 微信微粒贷综合评估未通过
- 下一篇: 同花顺是啥意思