Python基础教程:线程操作(oncurrent模块)详解
進程是cpu資源分配的最小單元,一個進程中可以有多個線程。
線程是cpu計算的最小單元。
對于Python來說他的進程和線程和其他語言有差異,是有GIL鎖。
GIL鎖
GIL鎖保證一個進程中同一時刻只有一個線程被cpu調(diào)度。
GIL鎖,全局解釋器鎖。用于限制一個進程中同一時刻只有一個線程被cpu調(diào)度。
擴展:默認GIL鎖在執(zhí)行100個cpu指令(過期時間)。
查看GIL切換的指令個數(shù)
import sys v1 = sys。getcheckinterval() print(v1)一、通過threading.Thread類創(chuàng)建線程
1、 創(chuàng)建線程的方式:直接使用Thread
from threading import Thread import time def sayhi(name):time.sleep(2)print('%s say hello' %name)if __name__ == '__main__':t=Thread(target=sayhi,args=('nick',))t.start()print('主線程')2、 創(chuàng)建線程的方式:繼承Thread
''' 學習中遇到問題沒人解答?小編創(chuàng)建了一個Python學習交流QQ群:531509025 尋找有志同道合的小伙伴,互幫互助,群里還有不錯的視頻學習教程和PDF電子書! ''' from threading import Thread import timeclass Sayhi(Thread):def __init__(self,name):super().__init__()self.name=namedef run(self):time.sleep(2)print('%s say hello' % self.name)if __name__ == '__main__':t = Sayhi('nick')t.start()print('主線程')二、多線程與多進程
1、 pid的比較
from threading import Thread from multiprocessing import Process import osdef work():print('hello',os.getpid())if __name__ == '__main__':# part1:在主進程下開啟多個線程,每個線程都跟主進程的pid一樣t1=Thread(target=work)t2=Thread(target=work)t1.start()t2.start()print('主線程/主進程pid',os.getpid())# part2:開多個進程,每個進程都有不同的pidp1=Process(target=work)p2=Process(target=work)p1.start()p2.start()print('主線程/主進程pid',os.getpid())2、 開啟效率的較量
from threading import Thread from multiprocessing import Process import osdef work():print('hello')if __name__ == '__main__':# 在主進程下開啟線程t=thread(target=work)t.start()print('主線程/主進程')'''打印結(jié)果:hello主線程/主進程'''# 在主進程下開啟子進程t=Process(target=work)t.start()print('主線程/主進程')'''打印結(jié)果:主線程/主進程hello'''3、 內(nèi)存數(shù)據(jù)的共享問題
''' 學習中遇到問題沒人解答?小編創(chuàng)建了一個Python學習交流QQ群:531509025 尋找有志同道合的小伙伴,互幫互助,群里還有不錯的視頻學習教程和PDF電子書! ''' from threading import Thread from multiprocessing import Process import os def work():global nn=0if __name__ == '__main__':# n=100# p=Process(target=work)# p.start()# p.join()# print('主',n) # 毫無疑問子進程p已經(jīng)將自己的全局的n改成了0,但改的僅僅是它自己的,查看父進程的n仍然為100n=1t=Thread(target=work)t.start()t.join()print('主',n) # 查看結(jié)果為0,因為同一進程內(nèi)的線程之間共享進程內(nèi)的數(shù)據(jù)三、Thread類的其他方法
Thread實例對象的方法:
- isAlive():返回線程是否活動的。
- getName():返回線程名。
- setName():設置線程名。
threading模塊提供的一些方法:
- threading.currentThread():返回當前的線程變量。
- threading.enumerate():返回一個包含正在運行的線程的list。正在運行指線程啟動后、結(jié)束前,不包括啟動前和終止后的線程。
- threading.activeCount():返回正在運行的線程數(shù)量,與len(threading.enumerate())有相同的結(jié)果。
1、 代碼示例
from threading import Thread import threading from multiprocessing import Process import osdef work():import timetime.sleep(3)print(threading.current_thread().getName())if __name__ == '__main__':# 在主進程下開啟線程t=Thread(target=work)t.start()print(threading.current_thread().getName())print(threading.current_thread())# 主線程print(threading.enumerate())# 連同主線程在內(nèi)有兩個運行的線程print(threading.active_count())print('主線程/主進程')'''打印結(jié)果:MainThread<_MainThread(MainThread, started 140735268892672)>[<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]主線程/主進程Thread-1'''2、 join方法
''' 學習中遇到問題沒人解答?小編創(chuàng)建了一個Python學習交流QQ群:531509025 尋找有志同道合的小伙伴,互幫互助,群里還有不錯的視頻學習教程和PDF電子書! ''' from threading import Thread import time def sayhi(name):time.sleep(2)print('%s say hello' %name)if __name__ == '__main__':t=Thread(target=sayhi,args=('nick',))t.start()t.join()print('主線程')print(t.is_alive())'''nick say hello主線程False'''四、多線程實現(xiàn)socket
import multiprocessing import threadingimport socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.bind(('127.0.0.1',8080)) s.listen(5)def action(conn):while True:data=conn.recv(1024)print(data)conn.send(data.upper())if __name__ == '__main__':while True:conn,addr=s.accept()p=threading.Thread(target=action,args=(conn,))p.start()五、守護線程
無論是進程還是線程,都遵循:守護xx會等待主xx運行完畢后被銷毀。需要強調(diào)的是:運行完畢并非終止運行。
- 對主進程來說,運行完畢指的是主進程代碼運行完畢
- 對主線程來說,運行完畢指的是主線程所在的進程內(nèi)所有非守護線程統(tǒng)統(tǒng)運行完畢,主線程才算運行完畢
1、 詳細解釋
-
主進程在其代碼結(jié)束后就已經(jīng)算運行完畢了(守護進程在此時就被回收),然后主進程會一直等非守護的子進程都運行完畢后回收子進程的資源(否則會產(chǎn)生僵尸進程),才會結(jié)束。
-
主線程在其他非守護線程運行完畢后才算運行完畢(守護線程在此時就被回收)。因為主線程的結(jié)束意味著進程的結(jié)束,進程整體的資源都將被回收,而進程必須保證非守護線程都運行完畢后才能結(jié)束。
2、守護線程例
from threading import Thread import timedef foo():print(123)time.sleep(10)print("end123")def bar():print(456)time.sleep(10)print("end456")t1 = Thread(target=foo) t2 = Thread(target=bar)t1.daemon= True #必須在t.start()之前設置# t1.setDaemon(True)t1.start() t2.start() print("main-------")print(t1.is_alive())# 123 # 456 # main------- # end456六、同步鎖
1、 多個線程搶占資源的情況
''' 學習中遇到問題沒人解答?小編創(chuàng)建了一個Python學習交流QQ群:531509025 尋找有志同道合的小伙伴,互幫互助,群里還有不錯的視頻學習教程和PDF電子書! ''' from threading import Thread import os,time def work():global n temp=n time.sleep(0.1) n=temp-1 if __name__ == '__main__':n=100l=[]for i in range(100):p=Thread(target=work)l.append(p)p.start()for p in l:p.join()print(n) #結(jié)果可能為992、同步鎖的引用
對公共數(shù)據(jù)的操作
import threadingR=threading.Lock() R.acquire() ''' 對公共數(shù)據(jù)的操作 ''' R.release()3、實例
不加鎖:并發(fā)執(zhí)行,速度快,數(shù)據(jù)不安全
from threading import current_thread,Thread,Lock import os,time def task():global nprint('%s is running' %current_thread().getName())temp=ntime.sleep(0.5)n=temp-1if __name__ == '__main__':n=100lock=Lock()threads=[]start_time=time.time()for i in range(100):t=Thread(target=task)threads.append(t)t.start()for t in threads:t.join()stop_time=time.time()print('主:%s n:%s' %(stop_time-start_time,n))''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99 '''加鎖:未加鎖部分并發(fā)執(zhí)行,加鎖部分串行執(zhí)行,速度慢,數(shù)據(jù)安全
''' 學習中遇到問題沒人解答?小編創(chuàng)建了一個Python學習交流QQ群:531509025 尋找有志同道合的小伙伴,互幫互助,群里還有不錯的視頻學習教程和PDF電子書! ''' from threading import current_thread,Thread,Lock import os,timedef task():#未加鎖的代碼并發(fā)運行time.sleep(3)print('%s start to run' %current_thread().getName())global n#加鎖的代碼串行運行lock.acquire()temp=ntime.sleep(0.5)n=temp-1lock.release()if __name__ == '__main__':n=100lock=Lock()threads=[]start_time=time.time()for i in range(100):t=Thread(target=task)threads.append(t)t.start()for t in threads:t.join()stop_time=time.time()print('主:%s n:%s' %(stop_time-start_time,n))''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0 '''七、死鎖與遞歸鎖
所謂死鎖:是指兩個或兩個以上的進程或線程在執(zhí)行過程中,因爭奪資源而造成的一種互相等待的現(xiàn)象,若無外力作用,它們都將無法推進下去。此時稱系統(tǒng)處于死鎖狀態(tài)或系統(tǒng)產(chǎn)生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖
1、 死鎖
from threading import Lock as Lock import timemutexA=Lock() mutexA.acquire() mutexA.acquire()print(123)mutexA.release() mutexA.release()解決方法:遞歸鎖,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。
2、 遞歸鎖(可重入鎖)RLock
這個RLock內(nèi)部維護著一個Lock和一個counter變量,counter記錄了acquire的次數(shù),從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發(fā)生死鎖。
from threading import RLock as Lock import timemutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()3、典型問題:科學家吃面
遞歸鎖解決死鎖問題
''' 學習中遇到問題沒人解答?小編創(chuàng)建了一個Python學習交流QQ群:531509025 尋找有志同道合的小伙伴,互幫互助,群里還有不錯的視頻學習教程和PDF電子書! ''' import time from threading import Thread,RLockfork_lock = noodle_lock = RLock()def eat1(name):noodle_lock.acquire()print('%s 搶到了面條'%name)fork_lock.acquire()print('%s 搶到了叉子'%name)print('%s 吃面'%name)fork_lock.release()noodle_lock.release()def eat2(name):fork_lock.acquire()print('%s 搶到了叉子' % name)time.sleep(1)noodle_lock.acquire()print('%s 搶到了面條' % name)print('%s 吃面' % name)noodle_lock.release()fork_lock.release()for name in ['哪吒','nick','tank']:t1 = Thread(target=eat1,args=(name,))t2 = Thread(target=eat2,args=(name,))t1.start()t2.start()八、線程隊列
queue隊列:使用import queue,用法與進程Queue一樣
當必須在多個線程之間安全地交換信息時,隊列在線程編程中特別有用。
1、先進先出:Queue
通過雙向列表實現(xiàn)的
class queue.Queue(maxsize=0)import queueq=queue.Queue() q.put('first') q.put('second') q.put('third')print(q.get()) print(q.get()) print(q.get()) ''' 結(jié)果(先進先出): first second third '''2、后進先出:LifoQueue
通過堆實現(xiàn)
class queue.LifoQueue(maxsize=0)
''' 學習中遇到問題沒人解答?小編創(chuàng)建了一個Python學習交流QQ群:531509025 尋找有志同道合的小伙伴,互幫互助,群里還有不錯的視頻學習教程和PDF電子書! ''' import queueq=queue.LifoQueue() q.put('first') q.put('second') q.put('third')print(q.get()) print(q.get()) print(q.get()) ''' 結(jié)果(后進先出): third second first '''3、存儲數(shù)據(jù)時可設置優(yōu)先級的隊列:PriorityQueue
PriorityQueue類和LifoQueue類繼承Queue類然后重寫了_init、_qsize、_put、_get這四個類的私有方法.
通過list來實現(xiàn)的。
class queue.PriorityQueue(maxsize=0)優(yōu)先隊列的構(gòu)造函數(shù)。maxsize是一個整數(shù),它設置可以放置在隊列中的項數(shù)的上限。一旦達到此大小,插入將阻塞,直到隊列項被使用。如果maxsize小于或等于0,則隊列大小為無窮大。
import queueq=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優(yōu)先級(通常是數(shù)字,也可以是非數(shù)字之間的比較),數(shù)字越小優(yōu)先級越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c'))print(q.get()) print(q.get()) print(q.get()) ''' 結(jié)果(數(shù)字越小優(yōu)先級越高,優(yōu)先級高的優(yōu)先出隊): (10, 'b') (20, 'a') (30, 'c') '''更多方法說明
- __init__(self, maxsize=0):初始化隊列長度,maxsize為0的時候長度為無限
- empty(self) :返回隊列是否為空
- full(self) :返回隊列是否為滿
- qsize(self) :返回隊列的大小(并不可靠)
- get(self, block=True, timeout=None) :從隊頭獲取并刪除元素,block為true:timeout為None時候,阻塞當前線程直到隊列中有可用元素;timeout為非負時候,等了timeout的時間還沒有可用元素時候拋出一個Empty異常;block為false:timeout為None時候,隊列為空則拋出Empty異常;timeout為非負時候,等待timeout時候后沒有可用元素則拋出Empty異常。
- get_nowait(self) :#返回self.get(block=False)
- put(self, item, block=True, timeout=None): 在隊尾插入一個元素,block為true:timeout為None時候,阻塞當前線程直到隊列中有可用位置;timeout為非負時候,等了timeout時間還沒有可用位置時候拋出一個Full異常;block為false:timeout為None時候,隊列沒有位置則拋出Full異常;timeout為非負時候,等待timeout時候后還是沒有可用位置則拋出Full異常。
- put_nowait(self, item) :返回 self.put(item, block=False)
- join(self) :阻塞當前線程直到隊列的任務全部完成了
- task_done(self) :通知隊列任務的完成情況,當完成時候喚醒被join阻塞的線程
九、Python標準模塊——concurrent.futures
1、介紹
concurrent.futures模塊提供了高度封裝的異步調(diào)用接口:
- ThreadPoolExecutor:線程池,提供異步調(diào)用
- ProcessPoolExecutor:進程池,提供異步調(diào)用
兩者都實現(xiàn)了由抽象Executor類定義的相同接口。
ThreadPoolExecutor(線程池)與ProcessPoolExecutor(進程池)都是concurrent.futures模塊下的,主線程(或進程)中可以獲取某一個線程(進程)執(zhí)行的狀態(tài)或者某一個任務執(zhí)行的狀態(tài)及返回值。
通過submit返回的是一個future對象,它是一個未來可期的對象,通過它可以獲悉線程的狀態(tài)。
比較:
- 線程不是越多越好,會涉及cpu上下文的切換(會把上一次的記錄保存)。
- 進程比線程消耗資源,進程相當于一個工廠,工廠里有很多人,里面的人共同享受著福利資源,,一個進程里默認只有一個主線程,比如:開啟程序是進程,里面執(zhí)行的是線程,線程只是一個進程創(chuàng)建多個人同時去工作。
- 線程里有GIL全局解鎖器:不允許cpu調(diào)度
- 計算密度型適用于多進程
- 線程:線程是計算機中工作的最小單元
- 進程:默認有主線程 (幫工作)可以多線程共存
- 協(xié)程:一個線程,一個進程做多個任務,使用進程中一個線程去做多個任務,微線程
- GIL全局解釋器鎖:保證同一時刻只有一個線程被cpu調(diào)度
2、基本方法
- submit(fn, *args, **kwargs):異步提交任務
- map(func, *iterables, timeout=None, chunksize=1):取代for循環(huán)submit的操作
- shutdown(wait=True):相當于進程池的pool.close()+pool.join()操作
- wait=True,等待池內(nèi)所有任務執(zhí)行完畢回收完資源后才繼續(xù) ,
- wait=False,立即返回,并不會等待池內(nèi)的任務執(zhí)行完畢 ,
- 但不管wait參數(shù)為何值,整個程序都會等到所有任務執(zhí)行完畢 ,submit和map必須在shutdown之前。
- result(timeout=None):取得結(jié)果
- add_done_callback(fn):回調(diào)函數(shù)
- done():判斷某一個線程是否完成
- cancle():取消某個任務
3、ProcessPoolExecutor、ThreadPoolExecutor線程池
ThreadPoolExecutor構(gòu)造實例的時候,傳入max_workers參數(shù)來設置線程中最多能同時運行的線程數(shù)目 。
使用submit函數(shù)來提交線程需要執(zhí)行任務(函數(shù)名和參數(shù))到線程池中,并返回該任務的句柄(類似于文件、畫圖),注意submit()不是阻塞的,而是立即返回。
通過submit函數(shù)返回的任務句柄,能夠使用done()方法判斷該任務是否結(jié)束。
使用result()方法可以獲取任務的返回值,查看內(nèi)部代碼,發(fā)現(xiàn)這個方法是阻塞的。
對于頻繁的cpu操作,由于GIL鎖的原因,多個線程只能用一個cpu,這時多進程的執(zhí)行效率要比多線程高。
''' 學習中遇到問題沒人解答?小編創(chuàng)建了一個Python學習交流QQ群:531509025 尋找有志同道合的小伙伴,互幫互助,群里還有不錯的視頻學習教程和PDF電子書! ''' from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorimport os,time,random def task(n):print('%s is runing' %os.getpid())time.sleep(random.randint(1,3))return n**2if __name__ == '__main__':executor=ProcessPoolExecutor(max_workers=3)futures=[]for i in range(11):future=executor.submit(task,i)futures.append(future)executor.shutdown(True)print('+++>')for future in futures:print( future.result())4、過wait()判斷線程執(zhí)行的狀態(tài):
wait方法可以讓主線程阻塞,直到滿足設定的要求。
wait(fs, timeout=None, return_when=ALL_COMPLETED),wait接受3個參數(shù),
- s表示執(zhí)行的task序列;
- timeout表示等待的最長時間,超過這個時間即使線程未執(zhí)行完成也將返回;
- return_when表示wait返回結(jié)果的條件,默認為ALL_COMPLETED全部執(zhí)行完成再返回
4、map的用法
map(fn, *iterables, timeout=None),第一個參數(shù)fn是線程執(zhí)行的函數(shù);第二個參數(shù)接受一個可迭代對象;第三個參數(shù)timeout跟wait()的timeout一樣,但由于map是返回線程執(zhí)行的結(jié)果,如果timeout小于線程執(zhí)行時間會拋異常TimeoutError。
map的返回是有序的,它會根據(jù)第二個參數(shù)的順序返回執(zhí)行的結(jié)果:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorimport os,time,random def task(n):print('%s is runing' %os.getpid())time.sleep(random.randint(1,3))return n**2if __name__ == '__main__':executor=ThreadPoolExecutor(max_workers=3)# for i in range(11):# future=executor.submit(task,i)executor.map(task,range(1,12))#map取代了for+submit5、s_completed返回線程執(zhí)行結(jié)果
上面雖然提供了判斷任務是否結(jié)束的方法,但是不能在主線程中一直判斷,有時候我們是得知某個任務結(jié)束了,就去獲取結(jié)果,而不是一直判斷每個任務有沒有結(jié)束。這是就可以使用as_completed方法一次取出所有任務的結(jié)果。
import time from collections import OrderedDict from concurrent.futures import (ThreadPoolExecutor, as_completed )def get_thread_time(times):time.sleep(times)return timesstart = time.time() executor = ThreadPoolExecutor(max_workers=4) task_list = [executor.submit(get_thread_time, times) for times in [2, 3, 1, 4]] task_to_time = OrderedDict(zip(["task1", "task2", "task3", "task4"],[2, 3, 1, 4])) task_map = OrderedDict(zip(task_list, ["task1", "task2", "task3", "task4"]))for result in as_completed(task_list):task_name = task_map.get(result)print("{}:{}".format(task_name,task_to_time.get(task_name)))# task3: 1 # task1: 2 # task2: 3 # task4: 4task1、task2、task3、task4的等待時間分別為2s、3s、1s、4s,通過as_completed返回執(zhí)行完的線程結(jié)果,as_completed(fs, timeout=None)接受2個參數(shù),第一個是執(zhí)行的線程列表,第二個參數(shù)timeout與map的timeout一樣,當timeout小于線程執(zhí)行時間會拋異常TimeoutError。
通過執(zhí)行結(jié)果可以看出,as_completed返回的順序是線程執(zhí)行結(jié)束的順序,最先執(zhí)行結(jié)束的線程最早返回。
6、回調(diào)函數(shù)
Future對象也可以像協(xié)程一樣,當它設置完成結(jié)果時,就可以立即進行回調(diào)別的函數(shù)。add_done_callback(fn),則表示 Futures 完成后,會調(diào)?fn函數(shù)。
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import osdef get_page(url):print('<進程%s> get %s' %(os.getpid(),url))respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}def parse_page(res):res=res.result()print('<進程%s> parse %s' %(os.getpid(),res['url']))parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))with open('db.txt','a') as f:f.write(parse_res)if __name__ == '__main__':urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']# p=Pool(3)# for url in urls:# p.apply_async(get_page,args=(url,),callback=pasrse_page)# p.close()# p.join()p=ProcessPoolExecutor(3)for url in urls:p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,需要用obj.result()拿到結(jié)果Python基礎太難學?小編創(chuàng)建了一個Python學習交流QQ群:531509025,群里有大量基礎入門知識,都是從零開始的,群里還有專業(yè)的人給你解答問題。尋找有志同道合的小伙伴,互幫互助,群里還有不錯PDF電子書
總結(jié)
以上是生活随笔為你收集整理的Python基础教程:线程操作(oncurrent模块)详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python基础教程:return函数的
- 下一篇: python求两个列表的并集.交集.差集