Python线程指南
http://www.cnblogs.com/huxi/archive/2010/06/26/1765808.html
3. threading
threading基于Java的線程模型設(shè)計。鎖(Lock)和條件變量(Condition)在Java中是對象的基本行為(每一個對象都自帶了鎖和條件變量),而在Python中則是獨立的對象。Python Thread提供了Java Thread的行為的子集;沒有優(yōu)先級、線程組,線程也不能被停止、暫停、恢復(fù)、中斷。Java Thread中的部分被Python實現(xiàn)了的靜態(tài)方法在threading中以模塊方法的形式提供。
threading 模塊提供的常用方法:?
threading.currentThread(): 返回當(dāng)前的線程變量。?
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結(jié)束前,不包括啟動前和終止后的線程。?
threading.activeCount(): 返回正在運行的線程數(shù)量,與len(threading.enumerate())有相同的結(jié)果。
threading模塊提供的類:??
Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local.
3.1. Thread
Thread是線程類,與Java類似,有兩種使用方法,直接傳入要運行的方法或從Thread繼承并覆蓋run():
# encoding: UTF-8 import threading# 方法1:將要執(zhí)行的方法作為參數(shù)傳給Thread的構(gòu)造方法 def func():print 'func() passed to Thread't = threading.Thread(target=func) t.start()# 方法2:從Thread繼承,并重寫run() class MyThread(threading.Thread):def run(self):print 'MyThread extended from Thread't = MyThread() t.start()
構(gòu)造方法:?
Thread(group=None, target=None, name=None, args=(), kwargs={})?
group: 線程組,目前還沒有實現(xiàn),庫引用中提示必須是None;?
target: 要執(zhí)行的方法;?
name: 線程名;?
args/kwargs: 要傳入方法的參數(shù)。
實例方法:?
isAlive(): 返回線程是否在運行。正在運行指啟動后、終止前。?
get/setName(name): 獲取/設(shè)置線程名。?
is/setDaemon(bool): 獲取/設(shè)置是否守護(hù)線程。初始值從創(chuàng)建該線程的線程繼承。當(dāng)沒有非守護(hù)線程仍在運行時,程序?qū)⒔K止。?
start(): 啟動線程。?
join([timeout]): 阻塞當(dāng)前上下文環(huán)境的線程,直到調(diào)用此方法的線程終止或到達(dá)指定的timeout(可選參數(shù))。
一個使用join()的例子:
# encoding: UTF-8 import threading import timedef context(tJoin):print 'in threadContext.'tJoin.start()# 將阻塞tContext直到threadJoin終止。tJoin.join()# tJoin終止后繼續(xù)執(zhí)行。print 'out threadContext.'def join():print 'in threadJoin.'time.sleep(1)print 'out threadJoin.'tJoin = threading.Thread(target=join) tContext = threading.Thread(target=context, args=(tJoin,))tContext.start()
in threadContext.?
in threadJoin.?
out threadJoin.?
out threadContext.
3.2. Lock
Lock(指令鎖)是可用的最低級的同步指令。Lock處于鎖定狀態(tài)時,不被特定的線程擁有。Lock包含兩種狀態(tài)——鎖定和非鎖定,以及兩個基本的方法。
可以認(rèn)為Lock有一個鎖定池,當(dāng)線程請求鎖定時,將線程至于池中,直到獲得鎖定后出池。池中的線程處于狀態(tài)圖中的同步阻塞狀態(tài)。
構(gòu)造方法:?
Lock()
實例方法:?
acquire([timeout]): 使線程進(jìn)入同步阻塞狀態(tài),嘗試獲得鎖定。?
release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
# encoding: UTF-8 import threading import timedata = 0 lock = threading.Lock()def func():global dataprint '%s acquire lock...' % threading.currentThread().getName()# 調(diào)用acquire([timeout])時,線程將一直阻塞,# 直到獲得鎖定或者直到timeout秒后(timeout參數(shù)可選)。# 返回是否獲得鎖。if lock.acquire():print '%s get the lock.' % threading.currentThread().getName()data += 1time.sleep(2)print '%s release lock...' % threading.currentThread().getName()# 調(diào)用release()將釋放鎖。lock.release()t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t3 = threading.Thread(target=func) t1.start() t2.start() t3.start()
3.3. RLock
RLock(可重入鎖)是一個可以被同一個線程請求多次的同步指令。RLock使用了“擁有的線程”和“遞歸等級”的概念,處于鎖定狀態(tài)時,RLock被某個線程擁有。擁有RLock的線程可以再次調(diào)用acquire(),釋放鎖時需要調(diào)用release()相同次數(shù)。
可以認(rèn)為RLock包含一個鎖定池和一個初始值為0的計數(shù)器,每次成功調(diào)用 acquire()/release(),計數(shù)器將+1/-1,為0時鎖處于未鎖定狀態(tài)。
構(gòu)造方法:?
RLock()
實例方法:?
acquire([timeout])/release(): 跟Lock差不多。
# encoding: UTF-8 import threading import timerlock = threading.RLock()def func():# 第一次請求鎖定print '%s acquire lock...' % threading.currentThread().getName()if rlock.acquire():print '%s get the lock.' % threading.currentThread().getName()time.sleep(2)# 第二次請求鎖定print '%s acquire lock again...' % threading.currentThread().getName()if rlock.acquire():print '%s get the lock.' % threading.currentThread().getName()time.sleep(2)# 第一次釋放鎖print '%s release lock...' % threading.currentThread().getName()rlock.release()time.sleep(2)# 第二次釋放鎖print '%s release lock...' % threading.currentThread().getName()rlock.release()t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t3 = threading.Thread(target=func) t1.start() t2.start() t3.start()
3.4. Condition
Condition(條件變量)通常與一個鎖關(guān)聯(lián)。需要在多個Contidion中共享一個鎖時,可以傳遞一個Lock/RLock實例給構(gòu)造方法,否則它將自己生成一個RLock實例。
可以認(rèn)為,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處于狀態(tài)圖中的等待阻塞狀態(tài),直到另一個線程調(diào)用notify()/notifyAll()通知;得到通知后線程進(jìn)入鎖定池等待鎖定。
構(gòu)造方法:?
Condition([lock/rlock])
實例方法:?
acquire([timeout])/release(): 調(diào)用關(guān)聯(lián)的鎖的相應(yīng)方法。?
wait([timeout]): 調(diào)用這個方法將使線程進(jìn)入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。?
notify(): 調(diào)用這個方法將從等待池挑選一個線程并通知,收到通知的線程將自動調(diào)用acquire()嘗試獲得鎖定(進(jìn)入鎖定池);其他線程仍然在等待池中。調(diào)用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。?
notifyAll(): 調(diào)用這個方法將通知等待池中所有的線程,這些線程都將進(jìn)入鎖定池嘗試獲得鎖定。調(diào)用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
例子是很常見的生產(chǎn)者/消費者模式:
# encoding: UTF-8 import threading import time# 商品 product = None # 條件變量 con = threading.Condition()# 生產(chǎn)者方法 def produce():global productif con.acquire():while True:if product is None:print 'produce...'product = 'anything'# 通知消費者,商品已經(jīng)生產(chǎn)con.notify()# 等待通知con.wait()time.sleep(2)# 消費者方法 def consume():global productif con.acquire():while True:if product is not None:print 'consume...'product = None# 通知生產(chǎn)者,商品已經(jīng)沒了con.notify()# 等待通知con.wait()time.sleep(2)t1 = threading.Thread(target=produce) t2 = threading.Thread(target=consume) t2.start() t1.start()
上面的例子對condition的解釋說明有不詳細(xì)的地方,以下是我的理解
1. 條件變量通常是和互斥量配合使用,threading.Condition([lock]),可以在構(gòu)造條件變量的時候不指定互斥量,會自動創(chuàng)建一個互斥鎖;當(dāng)幾個條件變量共享一個互斥鎖時,就要傳遞這個互斥鎖
2. 看一個官網(wǎng)給出的例子
# Consume one item cv.acquire() while not an_item_is_available():cv.wait() get_an_available_item() cv.release()# Produce one item cv.acquire() make_an_item_available() cv.notify() cv.release()
在實際編寫程序時,往往在這兩段代碼前包上while True
這段代碼問題在于,生產(chǎn)者會不停的生產(chǎn),有寫產(chǎn)品,消費者來不及消費,就被新產(chǎn)品覆蓋
wait的內(nèi)部實現(xiàn)是:
?1. 釋放鎖
?2. 堵塞線程,直到收到notify或者notifiall的信號
?3. 加鎖
所以,在wait之前此線程一定要得到鎖了,否則會有異常
3. notify或者notifyall在調(diào)用之前也一定要加鎖,否則有異常
notify會從所有等待線程中選擇一個喚醒
notifyall會喚醒所有等待線程,但是這些線程要競爭得到一個鎖,因為wait的第3步是一個加鎖的步驟
上面的代碼比較trick的應(yīng)用了wait這一個特性,
生產(chǎn)者在生成一個產(chǎn)品后,wait,就釋放了鎖,消費者得到鎖之后,解除wait, 消費一個產(chǎn)品,再次進(jìn)入wait, 釋放鎖,此時生產(chǎn)者也解除了wait
我的代碼如下:
count = 0def producer():global product, countwhile True:con.acquire()while product is not None:con.wait()count += 1product = countprint 'producing %d' %productcon.notify()time.sleep(2)con.release()def consumer():global product, countwhile True:con.acquire()while product is None:con.wait()print 'consuming %d' %counttime.sleep(2)product = Nonecon.notify()con.release()
用條件變量實現(xiàn)生產(chǎn)者/消費者在邏輯上不夠簡潔,用semaphore比較清楚,請見
http://blog.csdn.net/sunmenggmail/article/details/7679152
3.5. Semaphore/BoundedSemaphore
Semaphore(信號量)是計算機(jī)科學(xué)史上最古老的同步指令之一。Semaphore管理一個內(nèi)置的計數(shù)器,每當(dāng)調(diào)用acquire()時-1,調(diào)用release() 時+1。計數(shù)器不能小于0;當(dāng)計數(shù)器為0時,acquire()將阻塞線程至同步鎖定狀態(tài),直到其他線程調(diào)用release()。
基于這個特點,Semaphore經(jīng)常用來同步一些有“訪客上限”的對象,比如連接池。
BoundedSemaphore 與Semaphore的唯一區(qū)別在于前者將在調(diào)用release()時檢查計數(shù)器的值是否超過了計數(shù)器的初始值,如果超過了將拋出一個異常。
構(gòu)造方法:?
Semaphore(value=1): value是計數(shù)器的初始值。
實例方法:?
acquire([timeout]): 請求Semaphore。如果計數(shù)器為0,將阻塞線程至同步阻塞狀態(tài);否則將計數(shù)器-1并立即返回。?
release(): 釋放Semaphore,將計數(shù)器+1,如果使用BoundedSemaphore,還將進(jìn)行釋放次數(shù)檢查。release()方法不檢查線程是否已獲得 Semaphore。
# encoding: UTF-8 import threading import time# 計數(shù)器初值為2 semaphore = threading.Semaphore(2)def func():# 請求Semaphore,成功后計數(shù)器-1;計數(shù)器為0時阻塞print '%s acquire semaphore...' % threading.currentThread().getName()if semaphore.acquire():print '%s get semaphore' % threading.currentThread().getName()time.sleep(4)# 釋放Semaphore,計數(shù)器+1print '%s release semaphore' % threading.currentThread().getName()semaphore.release()t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t3 = threading.Thread(target=func) t4 = threading.Thread(target=func) t1.start() t2.start() t3.start() t4.start()time.sleep(2)# 沒有獲得semaphore的主線程也可以調(diào)用release # 若使用BoundedSemaphore,t4釋放semaphore時將拋出異常 print 'MainThread release semaphore without acquire' semaphore.release()
3.6. Event
Event(事件)是最簡單的線程通信機(jī)制之一:一個線程通知事件,其他線程等待事件。Event內(nèi)置了一個初始為False的標(biāo)志,當(dāng)調(diào)用set()時設(shè)為True,調(diào)用clear()時重置為 False。wait()將阻塞線程至等待阻塞狀態(tài)。
Event其實就是一個簡化版的 Condition。Event沒有鎖,無法使線程進(jìn)入同步阻塞狀態(tài)。
構(gòu)造方法:?
Event()
實例方法:?
isSet(): 當(dāng)內(nèi)置標(biāo)志為True時返回True。?
set(): 將標(biāo)志設(shè)為True,并通知所有處于等待阻塞狀態(tài)的線程恢復(fù)運行狀態(tài)。?
clear(): 將標(biāo)志設(shè)為False。?
wait([timeout]): 如果標(biāo)志為True將立即返回,否則阻塞線程至等待阻塞狀態(tài),等待其他線程調(diào)用set()。
注意set,會將event設(shè)為True,然后所有wait的線程都會被喚醒
上面的代碼輸出是:
Thread-2 waiting for event...
Thread-1 waiting for event...
mainthread set event
Thread-1 recv event
Thread-2 recv event
而不是只有一個線程被喚醒
3.7. Timer
Timer(定時器)是Thread的派生類,用于在指定時間后調(diào)用一個方法。
構(gòu)造方法:?
Timer(interval, function, args=[], kwargs={})?
interval: 指定的時間?
function: 要執(zhí)行的方法?
args/kwargs: 方法的參數(shù)
實例方法:?
Timer從Thread派生,沒有增加實例方法。
3.8. local
local是一個小寫字母開頭的類,用于管理 thread-local(線程局部的)數(shù)據(jù)。對于同一個local,線程無法訪問其他線程設(shè)置的屬性;線程設(shè)置的屬性不會被其他線程設(shè)置的同名屬性替換。
可以把local看成是一個“線程-屬性字典”的字典,local封裝了從自身使用線程作為 key檢索對應(yīng)的屬性字典、再使用屬性名作為key檢索屬性值的細(xì)節(jié)。
local = threading.local()local.ttname = 'main' def func(): local.ttname = '%s ' %threading.currentThread().getName() print local.ttname t1 = threading.Thread(target = func) t2 = threading.Thread(target = func)t1.start() t2.start() print local.ttname
Thread-1?
Thread-2?
?main
注意local的特殊性,對于同一個local,線程改變的屬性只是自己線程內(nèi)local的屬性,而其他線程不收到這個線程的影響
如果local只是一個普通類的一個實例,那么所有線程共享local的屬性
總結(jié)
以上是生活随笔為你收集整理的Python线程指南的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python的global语句
- 下一篇: python 内建比较函数详解