python分布式框架_高性能分布式执行框架——Ray
Ray是UC Berkeley RISELab新推出的高性能分布式執(zhí)行框架,它使用了和傳統(tǒng)分布式計(jì)算系統(tǒng)不一樣的架構(gòu)和對(duì)分布式計(jì)算的抽象方式,具有比Spark更優(yōu)異的計(jì)算性能。
Ray目前還處于實(shí)驗(yàn)室階段,最新版本為0.2.2版本。雖然Ray自稱是面向AI應(yīng)用的分布式計(jì)算框架,但是它的架構(gòu)具有通用的分布式計(jì)算抽象。本文對(duì)Ray進(jìn)行簡(jiǎn)單的介紹,幫助大家更快地了解Ray是什么,如有描述不當(dāng)?shù)牡胤?#xff0c;歡迎不吝指正。
一、簡(jiǎn)單開(kāi)始
首先來(lái)看一下最簡(jiǎn)單的Ray程序是如何編寫(xiě)的。
# 導(dǎo)入ray,并初始化執(zhí)行環(huán)境
import ray
ray.init()
# 定義ray remote函數(shù)
@ray.remote
def hello():
return "Hello world !"
# 異步執(zhí)行remote函數(shù),返回結(jié)果id
object_id = hello.remote()
# 同步獲取計(jì)算結(jié)果
hello = ray.get(object_id)
# 輸出計(jì)算結(jié)果
print hello
在Ray里,通過(guò)Python注解@ray.remote定義remote函數(shù)。使用此注解聲明的函數(shù)都會(huì)自帶一個(gè)默認(rèn)的方法remote,通過(guò)此方法發(fā)起的函數(shù)調(diào)用都是以提交分布式任務(wù)的方式異步執(zhí)行的,函數(shù)的返回值是一個(gè)對(duì)象id,使用ray.get內(nèi)置操作可以同步獲取該id對(duì)應(yīng)的對(duì)象。熟悉Java里的Future機(jī)制的話對(duì)此應(yīng)該并不陌生,或許會(huì)有人疑惑這和普通的異步函數(shù)調(diào)用沒(méi)什么大的區(qū)別,但是這里最大的差異是,函數(shù)hello是分布式異步執(zhí)行的。
remote函數(shù)是Ray分布式計(jì)算抽象中的核心概念,通過(guò)它開(kāi)發(fā)者擁有了動(dòng)態(tài)定制計(jì)算依賴(任務(wù)DAG)的能力。比如:
@ray.remote
def A():
return "A"
@ray.remote
def B():
return "B"
@ray.remote
def C(a, b):
return "C"
a_id = A.remote()
b_id = B.remote()
c_id = C.remote(a_id, b_id)
print ray.get(c_id)
例子代碼中,對(duì)函數(shù)A、B的調(diào)用是完全并行執(zhí)行的,但是對(duì)函數(shù)C的調(diào)用依賴于A、B函數(shù)的返回結(jié)果。Ray可以保證函數(shù)C需要等待A、B函數(shù)的結(jié)果真正計(jì)算出來(lái)后才會(huì)執(zhí)行。如果將函數(shù)A、B、C類比為DAG的節(jié)點(diǎn)的話,那么DAG的邊就是函數(shù)C參數(shù)對(duì)函數(shù)A、B計(jì)算結(jié)果的依賴,自由的函數(shù)調(diào)用方式允許Ray可以自由地定制DAG的結(jié)構(gòu)和計(jì)算依賴關(guān)系。另外,提及一點(diǎn)的是Python的函數(shù)可以定義函數(shù)具有多個(gè)返回值,這也使得Python的函數(shù)更天然具備了DAG節(jié)點(diǎn)多入和多出的特點(diǎn)。
二、系統(tǒng)架構(gòu)
Ray是使用什么樣的架構(gòu)對(duì)分布式計(jì)算做出如上抽象的呢,一下給出了Ray的系統(tǒng)架構(gòu)(來(lái)自Ray論文,參考文獻(xiàn)1)。
作為分布式計(jì)算系統(tǒng),Ray仍舊遵循了典型的Master-Slave的設(shè)計(jì):Master負(fù)責(zé)全局協(xié)調(diào)和狀態(tài)維護(hù),Slave執(zhí)行分布式計(jì)算任務(wù)。不過(guò)和傳統(tǒng)的分布式計(jì)算系統(tǒng)不同的是,Ray使用了混合任務(wù)調(diào)度的思路。在集群部署模式下,Ray啟動(dòng)了以下關(guān)鍵組件:
GlobalScheduler:Master上啟動(dòng)了一個(gè)全局調(diào)度器,用于接收本地調(diào)度器提交的任務(wù),并將任務(wù)分發(fā)給合適的本地任務(wù)調(diào)度器執(zhí)行。
RedisServer:Master上啟動(dòng)了一到多個(gè)RedisServer用于保存分布式任務(wù)的狀態(tài)信息(ControlState),包括對(duì)象機(jī)器的映射、任務(wù)描述、任務(wù)debug信息等。
LocalScheduler:每個(gè)Slave上啟動(dòng)了一個(gè)本地調(diào)度器,用于提交任務(wù)到全局調(diào)度器,以及分配任務(wù)給當(dāng)前機(jī)器的Worker進(jìn)程。
Worker:每個(gè)Slave上可以啟動(dòng)多個(gè)Worker進(jìn)程執(zhí)行分布式任務(wù),并將計(jì)算結(jié)果存儲(chǔ)到ObjectStore。
ObjectStore:每個(gè)Slave上啟動(dòng)了一個(gè)ObjectStore存儲(chǔ)只讀數(shù)據(jù)對(duì)象,Worker可以通過(guò)共享內(nèi)存的方式訪問(wèn)這些對(duì)象數(shù)據(jù),這樣可以有效地減少內(nèi)存拷貝和對(duì)象序列化成本。ObjectStore底層由Apache Arrow實(shí)現(xiàn)。
Plasma:每個(gè)Slave上的ObjectStore都由一個(gè)名為Plasma的對(duì)象管理器進(jìn)行管理,它可以在Worker訪問(wèn)本地ObjectStore上不存在的遠(yuǎn)程數(shù)據(jù)對(duì)象時(shí),主動(dòng)拉取其它Slave上的對(duì)象數(shù)據(jù)到當(dāng)前機(jī)器。
需要說(shuō)明的是,Ray的論文中提及,全局調(diào)度器可以啟動(dòng)一到多個(gè),而目前Ray的實(shí)現(xiàn)文檔里討論的內(nèi)容都是基于一個(gè)全局調(diào)度器的情況。我猜測(cè)可能是Ray尚在建設(shè)中,一些機(jī)制還未完善,后續(xù)讀者可以留意此處的細(xì)節(jié)變化。
Ray的任務(wù)也是通過(guò)類似Spark中Driver的概念的方式進(jìn)行提交的,有所不同的是:
Spark的Driver提交的是任務(wù)DAG,一旦提交則不可更改。
而Ray提交的是更細(xì)粒度的remote function,任務(wù)DAG依賴關(guān)系由函數(shù)依賴關(guān)系自由定制。
論文給出的架構(gòu)圖里并未畫(huà)出Driver的概念,因此我在其基礎(chǔ)上做了一些修改和擴(kuò)充。
Ray的Driver節(jié)點(diǎn)和和Slave節(jié)點(diǎn)啟動(dòng)的組件幾乎相同,不過(guò)卻有以下區(qū)別:
Driver上的工作進(jìn)程DriverProcess一般只有一個(gè),即用戶啟動(dòng)的PythonShell。Slave可以根據(jù)需要?jiǎng)?chuàng)建多個(gè)WorkerProcess。
Driver只能提交任務(wù),卻不能接收來(lái)自全局調(diào)度器分配的任務(wù)。Slave可以提交任務(wù),也可以接收全局調(diào)度器分配的任務(wù)。
Driver可以主動(dòng)繞過(guò)全局調(diào)度器給Slave發(fā)送Actor調(diào)用任務(wù)(此處設(shè)計(jì)是否合理尚不討論)。Slave只能接收全局調(diào)度器分配的計(jì)算任務(wù)。
三、核心操作
基于以上架構(gòu),我們簡(jiǎn)單討論一下Ray中關(guān)鍵的操作和流程。
1. ray.init()
在PythonShell中,使用ray.init()可以在本地啟動(dòng)ray,包括Driver、HeadNode(Master)和若干Slave。
import ray
ray.init()
如果是直連已有的Ray集群,只需要指定RedisServer的地址即可。
ray.init(redis_address="")
本地啟動(dòng)Ray得到的輸出如下:
>>> ray.init()
Waiting for redis server at 127.0.0.1:58807 to respond...
Waiting for redis server at 127.0.0.1:23148 to respond...
Allowing the Plasma store to use up to 13.7439GB of memory.
Starting object store with directory /tmp and huge page support disabled
Starting local scheduler with 8 CPUs, 0 GPUs
======================================================================
View the web UI at http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5
======================================================================
{'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store73540254', manager_name='/tmp/plasma_manager78072648', manager_port=39874)], 'redis_address': '127.0.0.1:58807', 'local_scheduler_socket_names': ['/tmp/scheduler98624129'], 'webui_url': 'http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5', 'node_ip_address': '127.0.0.1'}
>>>
本地啟動(dòng)Ray時(shí),可以看到Ray的WebUI的訪問(wèn)地址。
2. ray.put()
使用ray.put()可以將Python對(duì)象存入本地ObjectStore,并且異步返回一個(gè)唯一的ObjectID。通過(guò)該ID,Ray可以訪問(wèn)集群中任一個(gè)節(jié)點(diǎn)上的對(duì)象(遠(yuǎn)程對(duì)象通過(guò)查閱Master的對(duì)象表獲得)。
對(duì)象一旦存入ObjectStore便不可更改,Ray的remote函數(shù)可以將直接將該對(duì)象的ID作為參數(shù)傳入。使用ObjectID作為remote函數(shù)參數(shù),可以有效地減少函數(shù)參數(shù)的寫(xiě)ObjectStore的次數(shù)。
@ray.remote
def f(x):
pass
x = "hello"
# 對(duì)象x往ObjectStore拷貝里10次
[f.remote(x) for _ in range(10)]
# 對(duì)象x僅往ObjectStore拷貝1次
x_id = ray.put(x)
[f.remote(x_id) for _ in range(10)]
3. ray.get()
使用ray.get()可以通過(guò)ObjectID獲取ObjectStore內(nèi)的對(duì)象并將之轉(zhuǎn)換為Python對(duì)象。對(duì)于數(shù)組類型的對(duì)象,Ray使用共享內(nèi)存機(jī)制減少數(shù)據(jù)的拷貝成本。而對(duì)于其它對(duì)象則需要將數(shù)據(jù)從ObjectStore拷貝到進(jìn)程的堆內(nèi)存中。
如果調(diào)用ray.get()操作時(shí),對(duì)象尚未創(chuàng)建好,則get操作會(huì)阻塞,直到對(duì)象創(chuàng)建完成后返回。get操作的關(guān)鍵流程如下:
Driver或者Worker進(jìn)程首先到ObjectStore內(nèi)請(qǐng)求ObjectID對(duì)應(yīng)的對(duì)象數(shù)據(jù)。
如果本地ObjectStore沒(méi)有對(duì)應(yīng)的對(duì)象數(shù)據(jù),本地對(duì)象管理器Plasma會(huì)檢查Master上的對(duì)象表查看對(duì)象是否存儲(chǔ)其它節(jié)點(diǎn)的ObjectStore。
如果對(duì)象數(shù)據(jù)在其它節(jié)點(diǎn)的ObjectStore內(nèi),Plasma會(huì)發(fā)送網(wǎng)絡(luò)請(qǐng)求將對(duì)象數(shù)據(jù)拉到本地ObjectStore。
如果對(duì)象數(shù)據(jù)還沒(méi)有創(chuàng)建好,Master會(huì)在對(duì)象創(chuàng)建完成后通知請(qǐng)求的Plasma讀取。
如果對(duì)象數(shù)據(jù)已經(jīng)被所有的ObjectStore移除(被LRU策略刪除),本地調(diào)度器會(huì)根據(jù)任務(wù)血緣關(guān)系執(zhí)行對(duì)象的重新創(chuàng)建工作。
一旦對(duì)象數(shù)據(jù)在本地ObjectStore可用,Driver或者Worker進(jìn)程會(huì)通過(guò)共享內(nèi)存的方式直接將對(duì)象內(nèi)存區(qū)域映射到自己的進(jìn)程地址空間中,并反序列化為Python對(duì)象。
另外,ray.get()可以一次性讀取多個(gè)對(duì)象的數(shù)據(jù):
result_ids = [ray.put(i) for i in range(10)]
ray.get(result_ids) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
4. @ray.remote
Ray中使用注解@ray.remote可以聲明一個(gè)remote function。remote函數(shù)時(shí)Ray的基本任務(wù)調(diào)度單元,remote函數(shù)定義后會(huì)立即被序列化存儲(chǔ)到RedisServer中,并且分配了一個(gè)唯一的ID,這樣就保證了集群的所有節(jié)點(diǎn)都可以看到這個(gè)函數(shù)的定義。
不過(guò),這樣對(duì)remote函數(shù)定義有了一個(gè)潛在的要求,即remote函數(shù)內(nèi)如果調(diào)用了其它的用戶函數(shù),則必須提前定義,否則remote函數(shù)無(wú)法找到對(duì)應(yīng)的函數(shù)定義內(nèi)容。
remote函數(shù)內(nèi)也可以調(diào)用其它的remote函數(shù),Driver和Slave每次調(diào)用remote函數(shù)時(shí),其實(shí)都是向集群提交了一個(gè)計(jì)算任務(wù),從這里也可以看到Ray的分布式計(jì)算的自由性。
Ray中調(diào)用remote函數(shù)的關(guān)鍵流程如下:
調(diào)用remote函數(shù)時(shí),首先會(huì)創(chuàng)建一個(gè)任務(wù)對(duì)象,它包含了函數(shù)的ID、參數(shù)的ID或者值(Python的基本對(duì)象直接傳值,復(fù)雜對(duì)象會(huì)先通過(guò)ray.put()操作存入ObjectStore然后返回ObjectID)、函數(shù)返回值對(duì)象的ID。
任務(wù)對(duì)象被發(fā)送到本地調(diào)度器。
本地調(diào)度器決定任務(wù)對(duì)象是在本地調(diào)度還是發(fā)送給全局調(diào)度器。如果任務(wù)對(duì)象的依賴(參數(shù))在本地的ObejctStore已經(jīng)存在且本地的CPU和GPU計(jì)算資源充足,那么本地調(diào)度器將任務(wù)分配給本地的WorkerProcess執(zhí)行。否則,任務(wù)對(duì)象被發(fā)送給全局調(diào)度器并存儲(chǔ)到任務(wù)表(TaskTable)中,全局調(diào)度器根據(jù)當(dāng)前的任務(wù)狀態(tài)信息決定將任務(wù)發(fā)給集群中的某一個(gè)本地調(diào)度器。
本地調(diào)度器收到任務(wù)對(duì)象后(來(lái)自本地的任務(wù)或者全局調(diào)度分配的任務(wù)),會(huì)將其放入一個(gè)任務(wù)隊(duì)列中,等待計(jì)算資源和本地依賴滿足后分配給WorkerProcess執(zhí)行。
Worker收到任務(wù)對(duì)象后執(zhí)行該任務(wù),并將函數(shù)返回值存入ObjectStore,并更新Master的對(duì)象表(ObjectTable)信息。
@ray.remote注解有一個(gè)參數(shù)num_return_vals用于聲明remote函數(shù)的返回值個(gè)數(shù),基于此實(shí)現(xiàn)remote函數(shù)的多返回值機(jī)制。
@ray.remote(num_return_vals=2)
def f():
return 1, 2
x_id, y_id = f.remote()
ray.get(x_id) # 1
ray.get(y_id) # 2
@ray.remote注解的另一個(gè)參數(shù)num_gpus可以為任務(wù)指定GPU的資源。使用內(nèi)置函數(shù)ray.get_gpu_ids()可以獲取當(dāng)前任務(wù)可以使用的GPU信息。
@ray.remote(num_gpus=1)
def gpu_method():
return "This function is allowed to use GPUs {}.".format(ray.get_gpu_ids())
5. ray.wait()
ray.wait()操作支持批量的任務(wù)等待,基于此可以實(shí)現(xiàn)一次性獲取多個(gè)ObjectID對(duì)應(yīng)的數(shù)據(jù)。
# 啟動(dòng)5個(gè)remote函數(shù)調(diào)用任務(wù)
results = [f.remote(i) for i in range(5)]
# 阻塞等待4個(gè)任務(wù)完成,超時(shí)時(shí)間為2.5s
ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)
上述例子中,results包含了5個(gè)ObjectID,使用ray.wait操作可以一直等待有4個(gè)任務(wù)完成后返回,并將完成的數(shù)據(jù)對(duì)象放在第一個(gè)list類型返回值內(nèi),未完成的ObjectID放在第二個(gè)list返回值內(nèi)。如果設(shè)置了超時(shí)時(shí)間,那么在超時(shí)時(shí)間結(jié)束后仍未等到預(yù)期的返回值個(gè)數(shù),則已超時(shí)完成時(shí)的返回值為準(zhǔn)。
6. ray.error_info()
使用ray.error_info()可以獲取任務(wù)執(zhí)行時(shí)產(chǎn)生的錯(cuò)誤信息。
>>> import time
>>> @ray.remote
>>> def f():
>>> time.sleep(5)
>>> raise Exception("This task failed!!")
>>> f.remote()
Remote function __main__.f failed with:
Traceback (most recent call last):
File "", line 4, in f
Exception: This task failed!!
You can inspect errors by running
ray.error_info()
If this driver is hanging, start a new one with
ray.init(redis_address="127.0.0.1:65452")
>>> ray.error_info()
[{'type': 'task', 'message': 'Remote function \x1b[31m__main__.f\x1b[39m failed with:\n\nTraceback (most recent call last):\n File "", line 4, in f\nException: This task failed!!\n', 'data': '{\'function_id\': "Hm\\xde\\x93\'\\x91\\xce\\x13ld\\xf4O\\xd7\\xce\\xc2\\xe1\\x151\\x1e3", \'function_name\': u\'__main__.f\'}'}]
7. Actor
Ray的remote函數(shù)只能處理無(wú)狀態(tài)的計(jì)算需求,有狀態(tài)的計(jì)算需求需要使用Ray的Actor實(shí)現(xiàn)。在Python的class定義前使用@ray.remote可以聲明Actor。
@ray.remote
class Counter(object):
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
使用如下方式創(chuàng)建Actor對(duì)象。
a1 = Counter.remote()
a2 = Counter.remote()
Ray創(chuàng)建Actor的流程為:
Master選取一個(gè)Slave,并將Actor創(chuàng)建任務(wù)分發(fā)給它的本地調(diào)度器。
創(chuàng)建Actor對(duì)象,并執(zhí)行它的構(gòu)造函數(shù)。
從流程可以看出,Actor對(duì)象的創(chuàng)建時(shí)并行的。
通過(guò)調(diào)用Actor對(duì)象的方法使用Actor。
a1.increment.remote() # ray.get returns 1
a2.increment.remote() # ray.get returns 1
調(diào)用Actor對(duì)象的方法的流程為:
首先創(chuàng)建一個(gè)任務(wù)。
該任務(wù)被Driver直接分配到創(chuàng)建該Actor對(duì)應(yīng)的本地執(zhí)行器執(zhí)行,這個(gè)操作繞開(kāi)了全局調(diào)度器(Worker是否也可以使用Actor直接分配任務(wù)尚存疑問(wèn))。
返回Actor方法調(diào)用結(jié)果的ObjectID。
為了保證Actor狀態(tài)的一致性,對(duì)同一個(gè)Actor的方法調(diào)用是串行執(zhí)行的。
四、安裝Ray
如果只是使用Ray,可以使用如下命令直接安裝。
pip intall ray
如果需要編譯Ray的最新源碼進(jìn)行安裝,按照如下步驟進(jìn)行(MaxOS):
# 更新編譯依賴包
brew update
brew install cmake pkg-config automake autoconf libtool boost wget
pip install numpy cloudpickle funcsigs click colorama psutil redis flatbuffers cython --ignore-installed six
# 下載源碼編譯安裝
git clone https://github.com/ray-project/ray.git
cd ray/python
python setup.py install
# 測(cè)試
python test/runtest.py
# 安裝WebUI需要的庫(kù)[可選]
pip install jupyter ipywidgets bokeh
# 編譯Ray文檔[可選]
cd ray/doc
pip install -r requirements-doc.txt
make html
open _build/html/index.html
我在MacOS上安裝jupyter時(shí),遇到了Python的setuptools庫(kù)無(wú)法升級(jí)的情況,原因是MacOS的安全性設(shè)置問(wèn)題,可以使用如下方式解決:
重啟電腦,啟動(dòng)時(shí)按住Command+R進(jìn)入Mac保護(hù)模式。
打開(kāi)命令行,輸入命令csrutils disable關(guān)閉系統(tǒng)安全策略。
重啟電腦,繼續(xù)安裝jupyter。
安裝完成后,重復(fù)如上的方式執(zhí)行csrutils enable,再次重啟即可。
進(jìn)入PythonShell,輸入代碼本地啟動(dòng)Ray:
import ray
ray.init()
瀏覽器內(nèi)打開(kāi)WebUI界面如下:
參考資料
總結(jié)
以上是生活随笔為你收集整理的python分布式框架_高性能分布式执行框架——Ray的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: unzip命令常用参数
- 下一篇: 性能优化系列(四)电量性能优化