并发编程——多线程
本節導讀:
- 什么是線程
- 線程與進程的區別
- 開啟線程的兩種方法
- 多線程與多進程的區別
- thread對象的其他屬性
- 守護線程
- gil全局解釋器鎖
- 死鎖現象與遞歸鎖
- 信號量,event,定時器
- 線程queue
- 進程池與線程池
?
一 什么是線程
線程顧名思義,就是一條流水線工作的過程(流水線的工作需要電源,電源就相當于cpu),而一條流水線必須屬于一個車間,一個車間的工作過程是一個進程,車間負責把資源整合到一起,是一個資源單位,而一個車間內至少有一條流水線。
所以,進程只是用來把資源集中到一起(進程只是一個資源單位,或者說資源集合),而線程才是cpu上的執行單位。
多線程(即多個控制線程)的概念是,在一個進程中存在多個線程,多個線程共享該進程的地址空間,相當于一個車間內有多條流水線,都共用一個車間的資源。例如,北京地鐵與上海地鐵是不同的進程,而北京地鐵里的13號線是一個線程,北京地鐵所有的線路共享北京地鐵所有的資源,比如所有的乘客可以被所有線路拉。
?
二 線程與進程的區別
- 同一個進程內的多個線程共享該進程內的地址資源
- 創建線程的開銷要遠小于創建進程的開銷(創建一個進程,就是創建一個車間,涉及到申請空間,而且在該空間內建至少一條流水線,但創建線程,就只是在一個車間內造一條流水線,無需申請空間,所以創建開銷小)
三 開啟線程的兩種方法
threading模塊介紹
multiprocess模塊的完全模仿了threading模塊的接口,二者在使用層面,有很大的相似性,因而不再詳細介紹
方式一
from threading import Thread import timedef sayhi(name):time.sleep(2)print('%s say hello' %name)if __name__ == '__main__':t=Thread(target=sayhi,args=('egon',))t.start()print('主線程') 實例Thread對象方式二
from threading import Thread import timeclass Sayhi(Thread):def __init__(self,name):super().__init__()self.name=namedef run(self):time.sleep(2)print('%s say hello' % self.name)if __name__ == '__main__':t = Sayhi('egon')t.start()print('主線程') 繼承Thread類?
四 多線程與多進程的區別
開啟速度
在主進程下開啟線程,幾乎是t.start ()的同時就將線程開啟了,說明開銷極小
在主進程下開子進程,p.start ()將開啟進程的信號發給操作系統后,操作系統要申請內存空間,讓好拷貝父進程地址空間到子進程,開銷遠大于線程
pid
在主進程下開啟多個線程,每個線程都跟主進程的pid一樣
開多個進程,每個進程都有不同的pid
數據共享
進程之間地址空間是隔離的
同一進程內開啟的多個線程是共享該進程地址空間的
?
五 thread對象的其他屬性
Thread實例對象的方法# isAlive(): 返回線程是否活動的。# getName(): 返回線程名。# setName(): 設置線程名。 threading模塊提供的一些方法:# threading.currentThread(): 返回當前的線程變量。# threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。# threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。?
六 守護線程
無論是進程還是線程,都遵循:守護xxx會等待主xxx運行完畢后被銷毀
需要強調的是:運行完畢并非終止運行
對主進程來說,運行完畢指的是主進程代碼運行完畢,
對主線程來說,運行完畢指的是主線程所在的進程內所有非守護線程統統運行完畢,主線程才算運行完畢,
主進程在其代碼結束后就已經算運行完畢了(守護進程在此時就被回收),然后主進程會一直等非守護的子進程都運行完畢后回收子進程的資源(否則會產生僵尸進程),才會結束,
主線程在其他非守護線程運行完畢后才算運行完畢(守護線程在此時就被回收)。因為主線程的結束意味著進程的結束,進程整體的資源都將被回收,而進程必須保證非守護線程都運行完畢后才能結束。
?
七 gil全局解釋器鎖
首先需要明確的一點是GIL并不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標準,但是可以用不同的編譯器來編譯成可執行代碼。>有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因為CPython是大部分環境下默認的Python執行環境。所以在很多人的概念里CPython就是Python,也就想當然的把GIL歸結為Python語言的缺陷。所以這里要先明確一點:GIL并不是Python的特性,Python完全可以不依賴于GIL
八 死鎖現象與遞歸鎖
死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處于死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖
?
from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock()class MyThread(Thread):def run(self):self.func1()self.func2()def func1(self):mutexA.acquire()print('\033[41m%s 拿到A鎖\033[0m' %self.name)mutexB.acquire()print('\033[42m%s 拿到B鎖\033[0m' %self.name)mutexB.release()mutexA.release()def func2(self):mutexB.acquire()print('\033[43m%s 拿到B鎖\033[0m' %self.name)time.sleep(2)mutexA.acquire()print('\033[44m%s 拿到A鎖\033[0m' %self.name)mutexA.release()mutexB.release()if __name__ == '__main__':for i in range(10):t=MyThread()t.start() 死鎖遞歸鎖
?
遞歸鎖,死鎖的解決方案,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。
?
這個RLock內部維護著一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖,二者的區別是:遞歸鎖可以連續acquire多次,而互斥鎖只能acquire一次
from threading import Thread,RLock import timemutexA=mutexB=RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的情況,則counter繼續加1,這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止class MyThread(Thread):def run(self):self.func1()self.func2()def func1(self):mutexA.acquire()print('\033[41m%s 拿到A鎖\033[0m' %self.name)mutexB.acquire()print('\033[42m%s 拿到B鎖\033[0m' %self.name)mutexB.release()mutexA.release()def func2(self):mutexB.acquire()print('\033[43m%s 拿到B鎖\033[0m' %self.name)time.sleep(2)mutexA.acquire()print('\033[44m%s 拿到A鎖\033[0m' %self.name)mutexA.release()mutexB.release()if __name__ == '__main__':for i in range(10):t=MyThread()t.start() 遞歸鎖?
?
九 信號量,event,定時器
?
信號量
信號量也是一把鎖,可以指定信號量為5,對比互斥鎖同一時間只能有一個任務搶到鎖去執行,信號量同一時間可以有5個任務拿到鎖去執行,如果說互斥鎖是合租房屋的人去搶一個廁所,那么信號量就相當于一群路人爭搶公共廁所,公共廁所有多個坑位,這意味著同一時間可以有多個人上公共廁所,但公共廁所容納的人數是一定的,這便是信號量的大小
?
from threading import Thread,Semaphore import threading import timedef func():sm.acquire()print('%s get sm' %threading.current_thread().getName())time.sleep(3)sm.release()if __name__ == '__main__':sm=Semaphore(5)for i in range(23):t=Thread(target=func)t.start()#Semaphore管理一個內置的計數器,每當調用acquire()時內置計數器-1;調用release() 時內置計數器+1;計數器不能小于0;當計數器為0時acquire()將阻塞線程直到其他線程調用release()。 View Code?
event
線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標志被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標志為假,那么這個線程將會被一直阻塞直至該標志為真。一個線程如果將一個Event對象的信號標志設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那么它將忽略這個事件, 繼續執行
from threading import Eventevent.isSet():返回event的狀態值;event.wait():如果 event.isSet()==False將阻塞線程;event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;event.clear():恢復event的狀態值為False。例如,有多個工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務正常才讓那些工作線程去連接MySQL服務器,如果連接不成功,都會去嘗試重新連接。那么我們就可以采用threading.Event機制來協調各個工作線程的連接操作
?
from threading import Thread,Event import threading import time,random def conn_mysql():count=1while not event.is_set():if count > 3:raise TimeoutError('鏈接超時')print('<%s>第%s次嘗試鏈接' % (threading.current_thread().getName(), count))event.wait(0.5)count+=1print('<%s>鏈接成功' %threading.current_thread().getName())def check_mysql():print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())time.sleep(random.randint(2,4))event.set() if __name__ == '__main__':event=Event()conn1=Thread(target=conn_mysql)conn2=Thread(target=conn_mysql)check=Thread(target=check_mysql)conn1.start()conn2.start()check.start()定時器
定時器,指定n秒后執行某操作
?
from threading import Timerdef hello():print("hello, world")t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed十 線程queue
有三種不同的用法
class queue.Queue(maxsize=0) #隊列:先進先出
import queueq=queue.Queue() q.put('first') q.put('second') q.put('third')print(q.get()) print(q.get()) print(q.get())''' 結果(先進先出): first second third '''class queue.LifoQueue(maxsize=0) #堆棧:last in fisrt out
import queueq=queue.LifoQueue() q.put('first') q.put('second') q.put('third')print(q.get()) print(q.get()) print(q.get())''' 結果(后進先出): third second first '''class queue.PriorityQueue(maxsize=0) #優先級隊列:存儲數據時可設置優先級的隊列
?
import queueq=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c'))print(q.get()) print(q.get()) print(q.get())''' 結果(數字越小優先級越高,優先級高的優先出隊): (10, 'b') (20, 'a') (30, 'c') '''十一 進程池與線程池
在剛開始學多進程或多線程時,我們迫不及待地基于多進程或多線程實現并發的套接字通信,然而這種實現方式的致命缺陷是:服務的開啟的進程數或線程數都會隨著并發的客戶端數目地增多而增多,這會對服務端主機帶來巨大的壓力,甚至于不堪重負而癱瘓,于是我們必須對服務端開啟的進程數或線程數加以控制,讓機器在一個自己可以承受的范圍內運行,這就是進程池或線程池的用途,例如進程池,就是用來存放進程的池子,本質還是基于多進程,只不過是對開啟進程的數目加上了限制
介紹
官網:https://docs.python.org/dev/library/concurrent.futures.htmlconcurrent.futures模塊提供了高度封裝的異步調用接口 ThreadPoolExecutor:線程池,提供異步調用 ProcessPoolExecutor: 進程池,提供異步調用 Both implement the same interface, which is defined by the abstract Executor class.基本方法
1、submit(fn, *args, **kwargs) 異步提交任務2、map(func, *iterables, timeout=None, chunksize=1) 取代for循環submit的操作3、shutdown(wait=True) 相當于進程池的pool.close()+pool.join()操作 wait=True,等待池內所有任務執行完畢回收完資源后才繼續 wait=False,立即返回,并不會等待池內的任務執行完畢 但不管wait參數為何值,整個程序都會等到所有任務執行完畢 submit和map必須在shutdown之前4、result(timeout=None) 取得結果5、add_done_callback(fn) 回調函數進程池
用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorimport os,time,random def task(n):print('%s is runing' %os.getpid())time.sleep(random.randint(1,3))return n**2if __name__ == '__main__':executor=ProcessPoolExecutor(max_workers=3)futures=[]for i in range(11):future=executor.submit(task,i)futures.append(future)executor.shutdown(True)print('+++>')for future in futures:print(future.result()) 進程池線程池
用法
把ProcessPoolExecutor換成ThreadPoolExecutor,其余用法全部相同map方法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorimport os,time,random def task(n):print('%s is runing' %os.getpid())time.sleep(random.randint(1,3))return n**2if __name__ == '__main__':executor=ThreadPoolExecutor(max_workers=3)# for i in range(11):# future=executor.submit(task,i) executor.map(task,range(1,12)) #map取代了for+submit map回調函數
可以為進程池或線程池內的每個進程或線程綁定一個函數,該函數在進程或線程的任務執行完畢后自動觸發,并接收任務的返回值當作參數,該函數稱為回調函數
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import osdef get_page(url):print('<進程%s> get %s' %(os.getpid(),url))respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}def parse_page(res):res=res.result()print('<進程%s> parse %s' %(os.getpid(),res['url']))parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))with open('db.txt','a') as f:f.write(parse_res)if __name__ == '__main__':urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']p=ProcessPoolExecutor(3)for url in urls:p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,需要用obj.result()拿到結果 回調函數?
?
轉載于:https://www.cnblogs.com/leiyiming/p/9367200.html
總結
- 上一篇: linux配置网关
- 下一篇: NLTK与NLP原理及基础