Python 学习笔记 多进程 multiprocessing
Python 解釋器有一個全局解釋器鎖(PIL),導致每個 Python 進程中最多同時運行一個線程,因此 Python 多線程程序并不能改善程序性能,不能發(fā)揮多核系統(tǒng)的優(yōu)勢,可以通過這篇文章了解。
但是多進程程序不受此影響, Python 2.6 引入了 multiprocessing 來解決這個問題。這里介紹 multiprocessing 模塊下的進程,進程同步,進程間通信和進程管理四個方面的內(nèi)容。 這里主要講解多進程的典型使用,multiprocessing 的 API 幾乎是完復制了 threading 的API, 因此只需花少量的時間就可以熟悉 threading 編程了。
Process
先來看一段代碼
| 1234567891011 | from multiprocessing import Process, current_processdef func():time.sleep(1)proc = current_process()proc.name, proc.pidsub_proc = Process(target=func, args=())sub_proc.start()sub_proc.join()proc = current_process()proc.name, proc.pid |
這是在主進程中創(chuàng)建子進程,然后啟動(start) 子進程,等待(join) 子進程執(zhí)行完,再繼續(xù)執(zhí)行主進程的整個的執(zhí)行流程。
那么,一個進程應該是用來做什么的,它應該保存一些什么狀態(tài),它的生命周期是什么樣的呢?
一個進程需要處理一些不同任務,或者處理不同的對象。創(chuàng)建進程需要一個 function 和相關參數(shù),參數(shù)可以是dictProcess(target=func, args=(), kwargs = {}),name?可以用來標識進程。
控制子進程進入不同階段的是?start(),?join(),?is_alive(),?terminate(),?exitcode?方法。這些方法只能在創(chuàng)建子進程的進程中執(zhí)行。
進程同步
Lock
鎖是為了確保數(shù)據(jù)一致性,比如讀寫鎖,每個進程給一個變量增加 1 ,但是如果在一個進程讀取但還沒有寫入的時候,另外的進程也同時讀取了,并寫入該值,則最后寫入的值是錯誤的,這時候就需要鎖。
| 123456789 | def func(lock):lock.acquire() # do mysql query select update ...lock.release() lock = Lock()for i in xrange(4):proc = Process(target=func, args=(lock))proc.start() |
Lock 同時也實現(xiàn)了 ContextManager API, 可以結合 with 語句使用, 關于 ContextManager, 請移步?Python 學習實踐筆記 裝飾器 與 context?查看。
Semaphore
Semaphore 和 Lock 稍有不同,Semaphore 相當于 N 把鎖,獲取其中一把就可以執(zhí)行了。 信號量的總數(shù) N 在構造時傳入,s = Semaphore(N)。 和 Lock 一樣,如果信號量為0,則進程堵塞,直到信號大于0。
Pipes
Pipe 是在兩個進程之間通信的工具,Pipe Constructor 會返回兩個端
| 1 | conn1, conn2 = Pipe(True) |
如果是全雙工的(構造函數(shù)參數(shù)為True),則雙端口都可接收發(fā)送,否則前面的端口用于接收,后面的端口用于發(fā)送。
| 1234567891011 | def proc1(pipe): for i in xrange(10000):pipe.send(i)def proc2(pipe): while True: print "proc2 rev:", pipe.recv()pipe = Pipe()Process(target=proc1, args=(pipe[0],)).start()Process(target=proc2, args=(pipe[1],)).start() |
Pipe 的每個端口同時最多一個進程讀寫,否則數(shù)據(jù)會出各種問題
Queues
multiprocessing.Queue 與 Queue.Queue 非常相似。其 API 列表如下
- qsize()
- empty()
- full()
- put()
- put_nowait()
- get()
- get_nowait()
- close()
- join_thread()
- cancel_join_thread()
當 Queue 為 Queue.Full 狀態(tài)時,再 put() 會堵塞,當狀態(tài)為 Queue.Empty 時,再 get() 也是。當 put() 或 get() 設置了超時參數(shù),而超時的時候,會拋出異常。
Queue 主要用于多個進程產(chǎn)生和消費,一般使用情況如下
| 123456789101112 | def producer(q): for i in xrange(10):q.put(i)def consumer(q): while True: print "consumer", q.get()q = Queue(40)for i in xrange(10):Process(target=producer, args=(q,)).start()Process(target=consumer, args=(q,)).start() |
十個生產(chǎn)者進程,一個消費者進程,共用同一個隊列進行同步。
有一個簡化版本的 multiprocessing.queues.SimpleQueue, 只支持3個方法 empty(), get(), put()。
也有一個強化版本的 JoinableQueue, 新增兩個方法 task_done() 和 join()。 task_done() 是給消費者使用的,每完成隊列中的一個任務,調(diào)用一次該方法。當所有的 tasks 都完成之后,交給調(diào)用 join() 的進程執(zhí)行。
| 123456789101112131415 | def consumer(q): while True: print "consumer", q.get()q.task_done()jobs = JoinableQueue()for i in xrange(10):jobs.put(i)for i in xrange(10):p = Process(target=consumer, args=(jobs,))p.daemon = Truep.start()jobs.join() |
這個 join 函數(shù)等待 JoinableQueue 為空的時候,等待就結束,外面的進程可以繼續(xù)執(zhí)行了,但是那10個進程干嘛去了呢,他們還在等待呀,上面是設置了?p.daemon = True, 子進程才隨著主進程結束的,如果沒有設置,它們還是會一直等待的呢。
Lock、Pipe、Queue 和 Pipe 需要注意的是:盡量避免使用 Process.terminate 來終止程序,否則將會導致很多問題, 詳情請移步python 官方文檔查看。
進程間數(shù)據(jù)共享
前一節(jié)中, Pipe、Queue 都有一定數(shù)據(jù)共享的功能,但是他們會堵塞進程, 這里介紹的兩種數(shù)據(jù)共享方式都不會堵塞進程, 而且都是多進程安全的。
共享內(nèi)存
共享內(nèi)存有兩個結構,一個是?Value, 一個是?Array,這兩個結構內(nèi)部都實現(xiàn)了鎖機制,因此是多進程安全的。 用法如下:
| 1234567891011 | def func(n, a):n.value = 50 for i in range(len(a)):a[i] += 10num = Value('d', 0.0)ints= Array('i', range(10))p = Process(target=func, args=(num, ints))p.start()p.join() |
Value 和 Array 都需要設置其中存放值的類型,d 是 double 類型,i 是 int 類型,具體的對應關系在Python 標準庫的 sharedctypes 模塊中查看。
服務進程 Manager
上面的共享內(nèi)存支持兩種結構 Value 和 Array, 這些值在主進程中管理,很分散。 Python 中還有一統(tǒng)天下,無所不能的 Server process,專門用來做數(shù)據(jù)共享。 其支持的類型非常多,比如list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value 和 Array 用法如下:
| 123456789101112131415 | from multiprocessing import Process, Managerdef func(dct, lst):dct[88] = 88lst.reverse()manager = Manager()dct = manager.dict()lst = manager.list(range(5,10))p = Process(target=func, args=(dct, lst))p.start()p.join()print dct, '|', lstOut: {88: 88} | [9, 8, 7, 6, 5] |
一個 Manager 對象是一個服務進程,推薦多進程程序中,數(shù)據(jù)共享就用一個 manager 管理。
進程管理
如果有50個任務要執(zhí)行, 但是 CPU 只有4核, 你可以創(chuàng)建50個進程來做這個事情。但是大可不必,徒增管理開銷。如果你只想創(chuàng)建4個進程,讓他們輪流替你完成任務,不用自己去管理具體的進程的創(chuàng)建銷毀,那 Pool 是非常有用的。
Pool 是進程池,進程池能夠管理一定的進程,當有空閑進程時,則利用空閑進程完成任務,直到所有任務完成為止,用法如下
| 12345 | def func(x): return x*xpool = Pool(processes=4)print pool.map(func, range(8)) |
Pool 進程池創(chuàng)建4個進程,不管有沒有任務,都一直在進程池中等候,等到有數(shù)據(jù)的時候就開始執(zhí)行。
Pool 的 API 列表如下:
- apply(func[, args[, kwds]])
- apply_async(func[, args[, kwds[, callback]]])
- map(func, iterable[, chunksize])
- map_async(func, iterable[, chunksize[, callback]])
- imap(func, iterable[, chunksize])
- imap_unordered(func, iterable[, chunksize])
- close()
- terminate()
- join()
異步執(zhí)行
apply_async 和 map_async 執(zhí)行之后立即返回,然后異步返回結果。 使用方法如下
| 123456789101112 | def func(x): return x*xdef callback(x): print x, 'in callback' pool = Pool(processes=4)result = pool.map_async(func, range(8), 8, callback)print result.get(), 'in main'Out:[0, 1, 4, 9, 16, 25, 36, 49] in callback[0, 1, 4, 9, 16, 25, 36, 49] in main |
有兩個值得提到的,一個是 callback,另外一個是 multiprocessing.pool.AsyncResult。 callback 是在結果返回之前,調(diào)用的一個函數(shù),這個函數(shù)必須只有一個參數(shù),它會首先接收到結果。callback 不能有耗時操作,因為它會阻塞主線程。
AsyncResult 是獲取結果的對象,其 API 如下
- get([timeout])
- wait([timeout])
- ready()
- successful()
如果設置了 timeout 時間,超時會拋出 multiprocessing.TimeoutError 異常。wait 是等待執(zhí)行完成。 ready 測試是否已經(jīng)完成,successful 是在確定已經(jīng) ready 的情況下,如果執(zhí)行中沒有拋出異常,則成功,如果沒有ready 就調(diào)用該函數(shù),會得到一個 AssertionError 異常。
Pool 管理
這里不再繼續(xù)講 map 的各種變體了,因為從上面的 API 一看便知。
然后我們來看看 Pool 的執(zhí)行流程,有三個階段。第一、一個進程池接收很多任務,然后分開執(zhí)行任務;第二、不再接收任務了;第三、等所有任務完成了,回家,不干了。
這就是上面的方法,close 停止接收新的任務,如果還有任務來,就會拋出異常。 join 是等待所有任務完成。 join 必須要在 close 之后調(diào)用,否則會拋出異常。terminate 非正常終止,內(nèi)存不夠用時,垃圾回收器調(diào)用的就是這個方法。
#python總結
以上是生活随笔為你收集整理的Python 学习笔记 多进程 multiprocessing的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python遍历字典的四种方法对比
- 下一篇: Python 调试工具 PDB(Linu