快速了解Python并发编程的工程实现(下)
關于我
編程界的一名小程序猿,目前在一個創業團隊任team lead,技術棧涉及Android、Python、Java和Go,這個也是我們團隊的主要技術棧。 聯系:hylinux1024@gmail.com
0x00 使用進程實現并發
上一篇文章介紹了線程的使用。然而Python中由于Global Interpreter Lock(全局解釋鎖GIL)的存在,每個線程在在執行時需要獲取到這個GIL,在同一時刻中只有一個線程得到解釋鎖的執行,Python中的線程并沒有真正意義上的并發執行,多線程的執行效率也不一定比單線程的效率更高。 如果要充分利用現代多核CPU的并發能力,就要使用multipleprocessing模塊了。
0x01 multipleprocessing
與使用線程的threading模塊類似,multipleprocessing模塊提供許多高級API。最常見的是Pool對象了,使用它的接口能很方便地寫出并發執行的代碼。
from multiprocessing import Pooldef f(x):return x * xif __name__ == '__main__':with Pool(5) as p:# map方法的作用是將f()方法并發地映射到列表中的每個元素print(p.map(f, [1, 2, 3]))# 執行結果 # [1, 4, 9] 復制代碼關于Pool下文中還會提到,這里我們先來看Process。
Process
要創建一個進程可以使用Process類,使用start()方法啟動進程。
from multiprocessing import Process import osdef echo(text):# 父進程IDprint("Process Parent ID : ", os.getppid())# 進程IDprint("Process PID : ", os.getpid())print('echo : ', text)if __name__ == '__main__':p = Process(target=echo, args=('hello process',))p.start()p.join()# 執行結果 # Process Parent ID : 27382 # Process PID : 27383 # echo : hello process 復制代碼進程池
正如開篇提到的multiprocessing模塊提供了Pool類可以很方便地實現一些簡單多進程場景。 它主要有以下接口
- apply(func[, args[, kwds]])
執行func(args,kwds)方法,在方法結束返回前會阻塞。 - apply_async(func[, args[, kwds[, callback[, error_callback]]]])
異步執行func(args,kwds),會立即返回一個result對象,如果指定了callback參數,結果會通過回調方法返回,還可以指定執行出錯的回調方法error_callback() - map(func, iterable[, chunksize])
類似內置函數map(),可以并發執行func,是同步方法 - map_async(func, iterable[, chunksize[, callback[, error_callback]]])
異步版本的map - close()
關閉進程池。當池中的所有工作進程都執行完畢時,進程會退出。 - terminate()
終止進程池 - join()
等待工作進程執行完,必需先調用close()或者terminate()
map_async()和apply_async()執行后會返回一個class multiprocessing.pool.AsyncResult對象,通過它的get()可以獲取到執行結果,ready()可以判斷AsyncResult的結果是否準備好。
進程間數據的傳輸
multiprocessing模塊提供了兩種方式用于進程間的數據共享:隊列(Queue)和管道(Pipe)
Queue是線程安全,也是進程安全的。使用Queue可以實現進程間的數據共享,例如下面的demo中子進程put一個對象,在主進程中就能get到這個對象。 任何可以序列化的對象都可以通過Queue來傳輸。
from multiprocessing import Process, Queuedef f(q):q.put([42, None, 'hello'])if __name__ == '__main__':# 使用Queue進行數據通信q = Queue()p = Process(target=f, args=(q,))p.start()# 主進程取得子進程中的數據print(q.get()) # prints "[42, None, 'hello']"p.join()# 執行結果 # [42, None, 'hello'] 復制代碼Pipe()返回一對通過管道連接的Connection對象。這兩個對象可以理解為管道的兩端,它們通過send()和recv()發送和接收數據。
from multiprocessing import Process, Pipedef write(conn):# 子進程中發送一個對象conn.send([42, None, 'hello'])conn.close()def read(conn):# 在讀的進程中通過recv接收對象data = conn.recv()print(data)if __name__ == '__main__':# Pipe()方法返回一對連接對象w_conn, r_conn = Pipe()wp = Process(target=write, args=(w_conn,))rp = Process(target=read, args=(r_conn,))wp.start()rp.start()# 執行結果 # [42, None, 'hello']復制代碼需要注意的是,兩個進程不能同時對一個連接對象進行send或recv操作。
同步
我們知道線程間的同步是通過鎖機制來實現的,進程也一樣。
from multiprocessing import Process, Lock import timedef print_with_lock(l, i):l.acquire()try:time.sleep(1)print('hello world', i)finally:l.release()def print_without_lock(i):time.sleep(1)print('hello world', i)if __name__ == '__main__':lock = Lock()# 先執行有鎖的for num in range(5):Process(target=print_with_lock, args=(lock, num)).start()# 再執行無鎖的# for num in range(5):# Process(target=print_without_lock, args=(num,)).start()復制代碼有鎖的代碼將每秒依次打印
hello world 0 hello world 1 hello world 2 hello world 3 hello world 4 復制代碼如果執行無鎖的代碼,則在我的電腦上執行結果是這樣的
hello worldhello world 0 1 hello world 2 hello world 3 hello world 4 復制代碼除了Lock,還包括RLock、Condition、Semaphore和Event等進程間的同步原語。其用法也與線程間的同步原語很類似。API使用可以參考文末中引用的文檔鏈接。
在工程中實現進程間的數據共享應當優先使用隊列或管道。
0x02 總結
本文對multiprocessing模塊中常見的API作了簡單的介紹。講述了Process和Pool的常見用法,同時介紹了進程間的數據方式:隊列和管道。最后簡單了解了進程間的同步原語。
通過與上篇的對比學習,本文的內容應該是更加容易掌握的。
0x03 引用
- python-parallel-programmning-cookbook.readthedocs.io
- docs.python.org/3/library/t…
- docs.python.org/3.7/library…
- docs.python.org/3/glossary.…
- docs.python.org/3/library/c…
轉載于:https://juejin.im/post/5cefdc60f265da1bca51c0cf
總結
以上是生活随笔為你收集整理的快速了解Python并发编程的工程实现(下)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: swoole的process模块创建和使
- 下一篇: 【学习笔记】第五章 python3核心技