python基础-第九篇-9.3线程池
生活随笔
收集整理的這篇文章主要介紹了
python基础-第九篇-9.3线程池
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
簡單版
import queue
import threadingclass ThreadPool(object):def __init__(self, max_num=20):self.queue = queue.Queue(max_num)for i in range(max_num):self.queue.put(threading.Thread)def get_thread(self):return self.queue.get()def add_thread(self):self.queue.put(threading.Thread)#實例化線程池對象
pool = ThreadPool(10)
#執行構造方法--實例一個max_num=10的隊列,并往隊列上添加10個線程類def func(arg, p):print(arg)import timetime.sleep(2)#在線程執行完前往隊列里再加上一個線程類p.add_thread()#生成30個任務
for i in range(30):#每生成一個任務就從隊列里取出一個線程類thread = pool.get_thread()#并實例化線程對象來執行任務t = thread(target=func, args=(i, pool))#線程啟動后,執行func函數t.start()
? 我們可以看到,簡單版的線程池就是簡單的,邏輯就是每次執行一個任務就從隊列取一個線程類創建一個線程,所以就有了--多少個任務,多少個線程,那和進程池相比較下,你會從中發現哪些不足呢?
- 第一,首先問你,執行完的線程去哪呢?--被程序回收銷毀了!?? 那么執行完這些任務有必要一對一的創建線程嗎??--如果任務執行的快,一個線程有時候可以做多個任務
- 第二,上面只規定了隊列的長度,并沒有規定線程池里的線程數量??
- 第三,進程池有回調函數這么一說法,上面沒有?
- 第四,進程池里提供了close和terminate方法...
?
絕版
好!我們來看看代碼,是怎樣解決上面不足的:
import queue
import threading
import contextlib
import time#放入隊列里,做為線程停止運行的信號
StopEvent = object()class ThreadPool(object):def __init__(self, max_num, max_task_num = None):#對max_task_num進行判斷if max_task_num:#如果有值傳入,在創建隊列時,就按傳入值限定隊列長度self.q = queue.Queue(max_task_num)else:#否則,就默認為隊列長度為無限長self.q = queue.Queue()#最大線程數self.max_num = max_num#close方法的狀態碼self.cancel = False#terminate方法的狀態碼self.terminal = False#生成線程列表self.generate_list = []#空閑線程列表self.free_list = []def run(self, func, args, callback=None):"""線程池執行一個任務:param func: 任務函數:param args: 任務函數所需參數:param callback: 任務執行失敗或成功后執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數):return: 如果線程池已經終止,則返回True否則None"""#如果執行close方法,這里就會執行return--中止函數,等同不再創建線程if self.cancel:return#如果空閑線程列表里為空并且已生成的線程數沒有超過規定的最大線程數if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:#那么就調用generate_thread方法創建線程self.generate_thread()#打包任務w = (func, args, callback,)#把打包的任務放入到隊列里self.q.put(w)def generate_thread(self):#創建了一個線程t = threading.Thread(target=self.call)#啟動線程,執行call方法t.start()def call(self):"""循環去獲取任務函數并執行任務函數"""#獲取當前線程變量current_thread = threading.currentThread#把當前線程添加到生成線程列表里self.generate_list.append(current_thread)#到隊列里去拿任務event = self.q.get()#對取到的任務進行判斷while event != StopEvent:#如果任務不等于停止信號,進入循環#解任務包func, arguments, callback = event#嘗試執行任務包里的函數try:#執行成功,拿到result執行結果result = func(*arguments)#并給執行狀態賦值Truesuccess = Trueexcept Exception as e:#執行失敗,賦值執行狀態Falsesuccess = False#同時賦值執行結果為Noneresult = Noneif callback is not None:#如果callback不是默認None,就嘗試下列操作try:#把執行狀態和執行結果傳給回調函數,執行callback(success, result)except Exception as e:pass#with上下文管理--等同把free_list,current_thread傳給worker_state方法,并執行with self.worker_state(self.free_list, current_thread):#判斷self.terminal的狀態if self.terminal:#如果狀態為True,就把任務變量設置為停止信號,等同于中止當前線程event = StopEventelse:#否則,就去隊列里取任務event = self.q.get()else:#如果是停止信號,就把當前線程從生成線程列表中移除self.generate_list.remove(current_thread)def close(self):"""執行完所有的任務后,所有線程停止"""#當執行close方法時,就把cancel狀態設置為Trueself.cancel = Truefull_size = len(self.generate_list)while full_size:#往隊列里加停止信號,直到生成線程列表長度為0self.q.put(StopEvent)full_size -= 1def terminate(self):"""無論是否還有任務,終止線程"""#當執行terminate方法時,把terminal狀態設置為Trueself.terminal = True#因為突然中止,難免隊列里還有任務,所以先清空一下隊列self.q.empty()while self.generate_list:#往隊列里加停止信號,值到生成線程列表為空時self.q.put(StopEvent)@contextlib.contextmanagerdef worker_state(self, state_list, worker_thread):"""用于記錄線程中正在等待的線程數"""#把執行完任務的當前線程添加到空閑線程列表里state_list.append(worker_thread)try:#暫時跳出yieldfinally:#把當前線程從空閑線程列表里移除state_list.remove(worker_thread)# How to usepool = ThreadPool(5)def callback(status, result):# status, execute action status# result, execute action return valuepassdef action(i):print(i)for i in range(30):ret = pool.run(action, (i,), callback)time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
# pool.close()
# pool.terminate()
?
?
?????????? 歡迎大家對我的博客內容提出質疑和提問!謝謝
?
????????????????? 筆者:拍省先生
轉載于:https://www.cnblogs.com/xinsiwei18/p/5709215.html
總結
以上是生活随笔為你收集整理的python基础-第九篇-9.3线程池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 耳开头的成语有哪些?
- 下一篇: 求一个qq简单网名男生!