Python|队列Queue
一 前言
本文算是一次隊列的學(xué)習(xí)筆記,Queue 模塊實現(xiàn)了三種類型的隊列,它們的區(qū)別僅僅是隊列中元素被取回的順序。在 FIFO 隊列中,先添加的任務(wù)先取回。在 LIFO 隊列中,最近被添加的元素先取回(操作類似一個堆棧)。優(yōu)先級隊列中,元素將保持排序( 使用 heapq 模塊 ) 并且最小值的條目第一個返回。
值得注意的是 Python 2.X 版本中調(diào)用隊列需要引用 import Queue 而在Python 3.X版本中則需要 import queue
二 隊列特性
2.1 Queue的常用函數(shù)
Queue常用的方法:
qsize() 獲取隊列的元素個數(shù)。 put(item [,block[, timeout]]): 往queue中放一個item get(item [,block[, timeout]]): 從queue中取出一個item,并在隊列中刪除的這個item需要特別說明的是:
如果 block 為 True , timeout 為 None(也是默認(rèn)的選項),那么get()/put()可能會阻塞,直到隊列中出現(xiàn)可用的數(shù)據(jù)/位置。如果 timeout 是正整數(shù),那么函數(shù)會阻塞直到超時N秒,然后拋出一個異常。
如果 block 為 False ,如果隊列無數(shù)據(jù),調(diào)用get()或者有無空余位置時調(diào)用put(),就立即拋出異常(timeout 將會被忽略)。
task_done(): 表示前面排隊的任務(wù)已經(jīng)被完成。被隊列的消費者線程使用。每個 get() 被用于獲取一個任務(wù), 后續(xù)調(diào)用 task_done() 告訴隊列,該任務(wù)的處理已經(jīng)完成。 join(): 隊列中所有的元素都被接收和處理完畢之前程序一直阻塞。在應(yīng)用程序中,如果主程序調(diào)用了join()則當(dāng)前程序發(fā)生阻塞,當(dāng)隊列中所有的元素都被處理后,將解除阻塞(意味著每個put()進隊列的條目的 task_done() 都被收到)。如果task_done()被調(diào)用的次數(shù)多于放入隊列中的項目數(shù)量,將引發(fā) ValueError 異常 。
我們通過程序向隊列添加元素的時候,未完成任務(wù)的計數(shù)就會增加。每當(dāng)消費者線程調(diào)用 task_done() 時表示這個元素已經(jīng)被回收,涉及到該元素的業(yè)務(wù)邏輯已經(jīng)完成,未完成計數(shù)就會減少。當(dāng)未完成計數(shù)降到零的時候,程序便會解除join()阻塞。
2.2 實踐
我們用一個比較經(jīng)典的案例 生產(chǎn)者和消費者模型,生產(chǎn)者生產(chǎn)饅頭放到隊列,消費者去隊列里面獲取饅頭。
# encoding: utf-8 """ author: yangyi@youzan.com time: 2019/8/14 11:20 PM func: """from multiprocessing import Process, JoinableQueue, Lock import time import randomthread_lock = Lock()def lock_print(msg):with thread_lock:print (msg)def consumer(q):while True:res = q.get(block=True, timeout=3) # 如果為空 則等待3秒超時則報錯退出print('消費者拿到了 %s' % res)q.task_done()def producer(q):for item in range(4):time.sleep(random.randrange(1, 2))q.put('饅頭{0}'.format(item))print('生產(chǎn)者做好了 %s' %'饅頭{0}'.format(item))q.join()lock_print("生產(chǎn)結(jié)束")if __name__ == '__main__':print('主進程開始')q = JoinableQueue()pd = Process(target=producer, args=(q,))cp = Process(target=consumer, args=(q,))cp.daemon = True ## pd.start()cp.start()pd.join()print('主進程結(jié)束')說明
這里生產(chǎn)者生產(chǎn)饅頭并將饅頭通過put()放到全局的隊列中,消費者從使用get()隊列中獲取饅頭然后調(diào)用 task_done() 通知隊列中的饅頭已經(jīng)被消費者獲取。
設(shè)置 cp.daemon = True 表示消費者進程會隨主進程一起結(jié)束而結(jié)束。還有一種寫法是
if __name__ == '__main__':print('主進程開始')q = JoinableQueue()pd = Process(target=producer, args=(q,))cp = Process(target=consumer, args=(q,))pd.start()cp.start()pd.join()cp.join() print('主進程結(jié)束')cp.join() 會讓消費者進程一直等待生產(chǎn)者往隊列放數(shù)據(jù)直到設(shè)置的超時時間。具體的邏輯需要結(jié)合自己程序的實際需求來定,是需要一直等待生產(chǎn)者生產(chǎn)數(shù)據(jù)還是隨著主進程結(jié)束而結(jié)束。
三 總結(jié)
本文結(jié)合前面文章中介紹的多進程中的 守護進程和 join()方法,學(xué)習(xí)如何使用隊列中的兩個函數(shù) task_done和 join。其實還有其他比較多的函數(shù)用法,需要深入的學(xué)習(xí)探索,感興趣的朋友可以動手實踐一下。
推薦閱讀
https://docs.python.org/zh-cn/3/library/queue.html https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter2/12_Thread_communication_using_a_queue.html-The End-
本公眾號長期關(guān)注于數(shù)據(jù)庫技術(shù)以及性能優(yōu)化,故障案例分析,數(shù)據(jù)庫運維技術(shù)知識分享,個人成長和自我管理等主題,歡迎掃碼關(guān)注。
轉(zhuǎn)載于:https://www.cnblogs.com/yangyi402/p/11413674.html
總結(jié)
以上是生活随笔為你收集整理的Python|队列Queue的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: webpack与vue环境搭建(转载)
- 下一篇: Redis|Sentinel 高可用架构