多线程与线程锁
Thread基本方法
#! /usr/bin/env python3 # encoding: utf-8""" @Author: zengchunyun @Date: 2017/7/12 """import threading import sys import timedef read(event):print('get a task ...')print('read is_set', event.is_set()) # 獲取event狀態(tài),如果event狀態(tài)為False,調(diào)用event.wait()就會(huì)進(jìn)入阻塞狀態(tài),# 直到event狀態(tài)為True,才會(huì)繼續(xù)執(zhí)行event.wait()后面的代碼event.wait() # 此處會(huì)進(jìn)入等待狀態(tài),不會(huì)繼續(xù)執(zhí)行下面的代碼,直到event對(duì)象被設(shè)置為True,則會(huì)通知此wait進(jìn)入非阻塞狀態(tài),即,會(huì)繼續(xù)執(zhí)行下面的代碼print('read is_set', event.is_set()) # 獲取event狀態(tài),如果在wait后,通常該狀態(tài)為True,如果wait后,又立即對(duì)event調(diào)用clear()方法,則該狀態(tài)又為Falseevent.clear() # 將event狀態(tài)設(shè)置為Falseprint('read is_set', event.is_set()) # 獲取event狀態(tài),如果在wait后,通常該狀態(tài)為True,如果wait后,又立即對(duì)event調(diào)用clear()方法,則該狀態(tài)又為Falsecurrent_thread = threading.current_thread() # 獲取當(dāng)前線程對(duì)象print('current_thread', current_thread)# current_thread._tstate_lock=None# current_thread._stop()print('is_alive', current_thread.is_alive()) # 通過獲取到當(dāng)前線程對(duì)象后,可以對(duì)當(dāng)前線程做一些操作,比如判斷當(dāng)前線程狀態(tài),這里對(duì)狀態(tài)肯定是為True,因?yàn)槿绻麨镕alse,那就不可能執(zhí)行到這里了.也可以通過一些特殊方式,在此處將該結(jié)果得到的值為False,不過這種方式對(duì)程序而言就沒有太大意義了。current_thread.name = 'read_thread' # 可以對(duì)當(dāng)前線程修改名字print(current_thread.name)print('get_ident', threading.get_ident()) # 返回當(dāng)前的線程唯一標(biāo)識(shí)符,為非0的整數(shù)print('main_thread', threading.main_thread()) # 獲取主線程對(duì)象,通常這個(gè)主線程是有python解釋器啟動(dòng)的print("doing now")time.sleep(50)print('doing done.')def write(frame, event, arg):print('write', frame, event, arg, frame.f_lineno)def worker():# threading.settrace(write) # 通過調(diào)用sys.settrace方法去獲取執(zhí)行過程的棧信息,通過這些棧信息可以用于記錄日志分析,也能方便理解程序的運(yùn)行過程,該函數(shù)在run()之前調(diào)用,它有call,line,return,exception,c_call,c_return,c_exception這些事件類型# sys.settrace(write)# threading.setprofile(write) # 通過調(diào)用sys.setprofile()獲取棧信息,它不是每次都會(huì)調(diào)用。僅僅是在call一個(gè)方法時(shí)和方法return時(shí)調(diào)用。所以它的事件信息只有call,c_call,c_return, return,當(dāng)一個(gè)異常已經(jīng)發(fā)生,這個(gè)return事件也會(huì)返回print('stack_size', threading.stack_size(36864)) # 設(shè)置棧大小,最少為32KiB,即32768,每次增加的大小必須是4KiB,即4096大小,print('get_ident', threading.get_ident()) # 返回當(dāng)前的線程唯一標(biāo)識(shí)符print('active_count', threading.active_count()) # 當(dāng)前有多少個(gè)正在運(yùn)行的線程,等同于len(threading.enumerate())print('TIMEOUT_MAX', threading.TIMEOUT_MAX) # 最大超時(shí)參數(shù),用于Lock.acquire(),Rlock.acquire(),Condition.wait(),等使用超時(shí)參數(shù)等方法,如果設(shè)置一個(gè)比這個(gè)值還大等值,會(huì)出現(xiàn)OverflowError異常print('enumerate', threading.enumerate())event = threading.Event()t1 = threading.Thread(target=read, args=(event,))t1.daemon = False # 設(shè)置子線程是否為守護(hù)線程,當(dāng)為守護(hù)線程時(shí),該線程將在后臺(tái)運(yùn)行,主線程執(zhí)行完成就不會(huì)等待子線程執(zhí)行是否完成,而是直接退出,如果不是守護(hù)線程,主線程執(zhí)行完成,如果子線程沒有完成,會(huì)繼續(xù)等待# 創(chuàng)建一個(gè)線程對(duì)象有兩種方式,一種是調(diào)用線程的構(gòu)造方法,或者子類繼承Thread,重寫run方法,要啟動(dòng)這個(gè)線程對(duì)象,還需要調(diào)用線程的start()方法,然后會(huì)在獨(dú)立的線程里管理調(diào)用run里面的代碼,當(dāng)線程start().# 我們?yōu)檎J(rèn)為這個(gè)線程就是活動(dòng)的狀態(tài),直到這個(gè)線程的run方法執(zhí)行完成,或者run方法內(nèi)部出現(xiàn)異常,則這個(gè)線程就是不活躍狀態(tài),可以通過is_alive()去獲取線程狀態(tài)t1.start()print('stack_size', threading.stack_size())print('worker is_set', event.is_set())event.clear()print('worker is_set', event.is_set())timer = threading.Timer(interval=5, function=event.set)timer.start()print('active_count', threading.active_count())print('enumerate', threading.enumerate())# 調(diào)用線程的join方法會(huì)阻塞其它線程的調(diào)用,直到調(diào)用join的那個(gè)線程方法調(diào)用終止,即如果有多個(gè)線程,都調(diào)用里join,那么會(huì)依次按誰先調(diào)用join,誰就先執(zhí)行,直到調(diào)用join的那個(gè)線程執(zhí)行完成,才會(huì)繼續(xù)調(diào)用下一個(gè)線程,這樣就等于把線程變成串行方式執(zhí)行了,而不是并行# join實(shí)際上就是使用了線程鎖,讓同一時(shí)刻只能有一個(gè)線程在運(yùn)行。# 如果線程不是daemon線程,分兩張情況,# 一:timeout為None,主線程就會(huì)等待子線程執(zhí)行完畢,才繼續(xù)執(zhí)行主線程后面的代碼,直到主線程執(zhí)行完,程序才退出# 二:timeout設(shè)置了大于0的浮點(diǎn)值,就會(huì)在該超時(shí)時(shí)間內(nèi)等待子線程的返回,如果這個(gè)時(shí)間內(nèi),子線程沒有執(zhí)行完成,那么主線程不會(huì)繼續(xù)等子線程,而是繼續(xù)執(zhí)行主線程后面的代碼,最后如果主線程代碼執(zhí)行完了,如果子線程還沒有執(zhí)行完,會(huì)繼續(xù)等待子線程,直到子線程完全返回,主線程才退出# 如果子線程是daemon線程,也分兩種情況# 一: 如果timeout為None, 主線程會(huì)等待子線程執(zhí)行完畢,才會(huì)繼續(xù)執(zhí)行主線程后面的代碼,直到主線程執(zhí)行完,程序退出# 二:如果timeout設(shè)置為大于0的浮點(diǎn)值,就會(huì)在該超時(shí)時(shí)間內(nèi)等待子線程的返回,如果這個(gè)時(shí)間內(nèi),子線程沒有執(zhí)行完成,那么主線程不會(huì)繼續(xù)等待子線程,而是繼續(xù)執(zhí)行主線程后面的代碼,最后如果主線程代碼執(zhí)行完了,如果子線程還沒有執(zhí)行完,不會(huì)繼續(xù)等待子線程,而是直接退出程序# 總結(jié),在沒有timeout時(shí),不管是不是daemon線程,主線程都會(huì)等待子線程執(zhí)行完成后才會(huì)繼續(xù)執(zhí)行主線程后面的代碼,直到主線程代碼執(zhí)行完成,程序退出# 設(shè)置了timeout時(shí),子線程超時(shí)后,主線程不會(huì)繼續(xù)等待子線程返回結(jié)果,而是繼續(xù)執(zhí)行主線程代碼,最終,主線程執(zhí)行完成后,在非daemon狀態(tài)時(shí),如果子線程還沒執(zhí)行完成,會(huì)繼續(xù)等待,如果是daemon狀態(tài),那么這時(shí)主線程是不會(huì)等待子線程完成,而是直接退出程序# t1.join(timeout=10.1)print('begin ...')print('all task done.')if __name__ == '__main__':worker()線程鎖Lock基本概念
#! /usr/bin/env python3 # encoding: utf-8""" @Author: zengchunyun @Date: 2017/7/12 """ import threading import time""" lock有兩種狀態(tài),一種是locked,一種是unlocked 創(chuàng)建lock時(shí)。狀態(tài)為unlocked,它有兩個(gè)基本的方法,acquire()和release(),當(dāng)狀態(tài)為unlocked,acquire()改變這個(gè)狀態(tài)為locked, 并立即返回,當(dāng)狀態(tài)是locked時(shí),調(diào)用acquire()方法時(shí)會(huì)阻塞當(dāng)前線程,直到另一個(gè)線程調(diào)用它的release()方法,將狀態(tài)改為unlocked。 然后這個(gè)acquire()調(diào)用重置lock為locked狀態(tài),并立即返回,這個(gè)release()方法應(yīng)該在lock狀態(tài)為locked時(shí)調(diào)用,它會(huì)改變lock狀態(tài)為unlocked,并立即返回, 如果對(duì)一個(gè)已經(jīng)是unlocked狀態(tài)對(duì)lock調(diào)用release()時(shí),會(huì)拋出RuntimeError錯(cuò)誤當(dāng)多個(gè)線程調(diào)用acquire()時(shí)進(jìn)入阻塞狀態(tài),等待這個(gè)lock狀態(tài)變?yōu)閡nlocked,只有一個(gè)線程會(huì)在release()被調(diào)用后,lock狀態(tài)會(huì)變?yōu)閡nlocked后執(zhí)行, 具體是哪些線程會(huì)執(zhí)行,并沒有一個(gè)明確規(guī)定條件,且在代碼實(shí)現(xiàn)上差異也很大"""""" lock支持使用上下文管理 例如: with lock:# do something...它等效于下面這種寫法 lock.acquire() try:# do something... finally:lock.release() """lock = threading.Lock()def read(lock_obj):print('entry read function...')lock_status = lock_obj.acquire() # acquire接受兩個(gè)參數(shù),blocking=True,timeout=-1,兩個(gè)默認(rèn)參數(shù)值# 當(dāng)lock_status狀態(tài)為True時(shí),說明獲取到鎖了,當(dāng)為False說明沒有獲得鎖print(lock_status)time.sleep(5)print('read do something...')lock_obj.release()print('read done.')def write(lock_obj):print('entry write function...')# 當(dāng)blocking設(shè)置為True,默認(rèn)為True,線程會(huì)阻塞,直到lock狀態(tài)變?yōu)閡nlocked# 當(dāng)blocking設(shè)置為False,線程不會(huì)阻塞,如果一個(gè)調(diào)用使用blocking為True會(huì)被阻塞,并立即返回False,其它情況設(shè)置這個(gè)lock為locked,并返回True# 如果設(shè)置timeout值時(shí),blocking值必須是True,默認(rèn)值為True,所以可以不用指定blocking,也就是說,通常這兩個(gè)參數(shù)最好不要同時(shí)存在# 當(dāng)timeout為非-1時(shí)。線程會(huì)阻塞這個(gè)timeout值當(dāng)秒數(shù),如果超時(shí)了,依然沒有獲得鎖,則不繼續(xù)阻塞,但是返回值為False,也就是說lock狀態(tài)為unlocked,所以當(dāng)返回狀態(tài)為False時(shí),是不能調(diào)用lock的release()方法,否則拋異常# 當(dāng)timeout為-1時(shí),線程會(huì)進(jìn)入無限當(dāng)?shù)牡却隣顟B(tài),不允許在blocking為False時(shí)為timeout指定值lock_status = lock_obj.acquire(blocking=True, timeout=2) # acquire接受兩個(gè)參數(shù),blocking=True,timeout=-1,兩個(gè)默認(rèn)參數(shù)值,當(dāng)把blocking設(shè)置為False時(shí),即不會(huì)阻塞該線程,會(huì)繼續(xù)執(zhí)行后面的代碼,print(lock_status)print('write do something...')if lock_status: # 需要判斷是否獲得鎖,如果獲得,狀態(tài)為True,則需要釋放鎖lock_obj.release()print('write done.')if __name__ == '__main__':t1 = threading.Thread(target=read, args=(lock,))t1.start()t2 = threading.Thread(target=write, args=(lock,))t2.start() # 首先要明白,獲取鎖時(shí),默認(rèn)是阻塞狀態(tài),直到獲取到鎖,可以給阻塞狀態(tài)獲取鎖時(shí)設(shè)置等待超時(shí)時(shí)間,超過時(shí)間不管有沒有獲取到都會(huì)繼續(xù)執(zhí)行后面的代碼 # 對(duì)于一個(gè)新的鎖對(duì)象,也就是剛創(chuàng)建的鎖,第一次調(diào)用acquire()方法去獲取時(shí),不管是不是阻塞,都能獲取到鎖,也就是這個(gè)方法會(huì)返回True,但是 # 當(dāng)你再次使用這個(gè)鎖對(duì)象acquire(False)不阻塞方式去獲取鎖,其結(jié)果如果還是True,那其結(jié)果只有一種,那就是之前肯定釋放了一次鎖,否則,對(duì)于 # 已經(jīng)拿到了鎖,再次調(diào)用acquire(False)其結(jié)果將為False# 第一種情況 import threading lock = threading.Lock() print(lock.acquire(True)) # 返回True print(lock.acquire(False)) # 返回Falselock2 = threading.Lock() print(lock2.acquire(False)) # 返回True print(lock2.acquire(False)) # 返回False# 第二種情況 lock3 = threading.Lock() print(lock3.acquire(True)) # 返回True print(lock3.release()) print(lock3.acquire(False)) # 返回Truelock4 = threading.Lock() print(lock4.acquire(False)) # 返回True print(lock4.release()) print(lock4.acquire(False)) # 返回True多把鎖正確使用方式
import threading import time import random""" 2把鎖對(duì)兩個(gè)不同資源進(jìn)行多線程操作 """read_lock = threading.Lock() write_lock = threading.Lock()num = 0 num2 = 0def read(r_lock, w_lock):r_lock.acquire() # 先獲取一把鎖,保證同一時(shí)刻只能有一個(gè)線程操作共享數(shù)據(jù)global num # 準(zhǔn)備對(duì)共享數(shù)據(jù)進(jìn)行操作time.sleep(random.randint(0, 3)) # 修改該資源比較費(fèi)時(shí),需要大約0-3秒num += 1 # 對(duì)共享數(shù)據(jù)加1# 這時(shí)還想對(duì)一個(gè)比較耗費(fèi)時(shí)間對(duì)資源進(jìn)行操作,所以又開啟一個(gè)新線程,該資源也是共享資源,所以也需要鎖定資源new_task = threading.Thread(target=write, args=(w_lock,))new_task.start()# 然后需要釋放共享資源r_lock.release()def write(w_lock):current_thread = threading.current_thread() # 獲取當(dāng)前線程print("write %s do something..." % current_thread.name)w_lock.acquire() # 先鎖定資源global num2 # 準(zhǔn)備對(duì)共享資源進(jìn)行操作time.sleep(random.randint(0, 2)) # 修改該資源比較費(fèi)時(shí),需要大約0-5秒num2 += 1 # 花費(fèi)數(shù)秒秒才把資源修改完成w_lock.release() # 資源修改完了if __name__ == '__main__':for task in range(10): # 開啟10個(gè)任務(wù)t1 = threading.Thread(target=read, args=(read_lock, write_lock)) # 傳入2把鎖,1把用于鎖子線程鎖定共享資源,1把用于給子線程開啟的子線程鎖定共享資源t1.start()print('task done...')print(num)print(num2)while True:# 如果當(dāng)前剩下1個(gè)活動(dòng)的線程,說明其它子線程任務(wù)都完成了,只剩下主線程了if threading.active_count() == 1:print(num)print(num2)break錯(cuò)誤使用鎖方式
""" 錯(cuò)誤的加鎖方式 以下代碼加鎖方式不可取,一個(gè)線程內(nèi),應(yīng)該加鎖和解鎖是成對(duì)的,且加完鎖后,在一個(gè)線程內(nèi),不該再去加鎖,必須先解鎖后,再加鎖,否則非常容易造成死鎖 """read_lock = threading.Lock() write_lock = threading.Lock()num = 0 num2 = 0def read(r_lock, w_lock):r_lock.acquire() # 先獲取一把鎖,保證同一時(shí)刻只能有一個(gè)線程操作共享數(shù)據(jù)global num # 準(zhǔn)備對(duì)共享數(shù)據(jù)進(jìn)行操作time.sleep(random.randint(0, 3)) # 修改該資源比較費(fèi)時(shí),需要大約0-3秒num += 1 # 對(duì)共享數(shù)據(jù)加1# 這時(shí)還想對(duì)一個(gè)比較耗費(fèi)時(shí)間對(duì)資源進(jìn)行操作,所以又開啟一個(gè)新線程,該資源也是共享資源,所以也需要鎖定資源new_task = threading.Thread(target=write, args=(w_lock,)) # 注意,子線程如果使用也需要鎖時(shí),解鎖操作必須在子線程內(nèi)把鎖釋放完成new_task.start()# 然后需要釋放共享資源w_lock.release() # 不能在此處釋放鎖,這種代碼設(shè)計(jì)很容易造成死鎖現(xiàn)象,r_lock.release()def write(w_lock):current_thread = threading.current_thread() # 獲取當(dāng)前線程print("write %s do something..." % current_thread.name)w_lock.acquire() # 先鎖定資源global num2 # 準(zhǔn)備對(duì)共享資源進(jìn)行操作time.sleep(random.randint(0, 2)) # 修改該資源比較費(fèi)時(shí),需要大約0-5秒num2 += 1 # 花費(fèi)數(shù)秒秒才把資源修改完成w_lock.release() # 此處注釋后,很可能會(huì)有鎖沒有得到釋放,這種寫法容易造成死鎖,就是遞歸鎖也不能這樣寫,if __name__ == '__main__':for task in range(10): # 開啟10個(gè)任務(wù)t1 = threading.Thread(target=read, args=(read_lock, write_lock)) # 傳入2把鎖,1把用于鎖子線程鎖定共享資源,1把用于給子線程開啟的子線程鎖定共享資源t1.start()print('task done...')print(num)print(num2)while True:# 如果當(dāng)前剩下1個(gè)活動(dòng)的線程,說明其它子線程任務(wù)都完成了,只剩下主線程了if threading.active_count() == 1:print(num)print(num2)break常見的死鎖
""" 使用同一把鎖對(duì)不同資源加鎖,沒有及時(shí)釋放,造成死鎖 """read_lock = threading.Lock()num = 0 num2 = 0def read(r_lock):r_lock.acquire() # 先獲取一把鎖,保證同一時(shí)刻只能有一個(gè)線程操作共享數(shù)據(jù)global num # 準(zhǔn)備對(duì)共享數(shù)據(jù)進(jìn)行操作time.sleep(random.randint(0, 1)) # 修改該資源比較費(fèi)時(shí),需要大約0-3秒num += 1 # 對(duì)共享數(shù)據(jù)加1write(r_lock) # 使用了同一把鎖,且未先釋放鎖,會(huì)造成死鎖# 然后需要釋放共享資源r_lock.release()def write(w_lock):current_thread = threading.current_thread() # 獲取當(dāng)前線程print("write %s do something..." % current_thread.name)w_lock.acquire() # 先鎖定資源global num2 # 準(zhǔn)備對(duì)共享資源進(jìn)行操作time.sleep(random.randint(0, 1)) # 修改該資源比較費(fèi)時(shí),需要大約0-5秒num2 += 1 # 花費(fèi)數(shù)秒秒才把資源修改完成w_lock.release() # 資源修改完了if __name__ == '__main__':for task in range(10): # 開啟10個(gè)任務(wù)t1 = threading.Thread(target=read, args=(read_lock,)) # 傳入2把鎖,1把用于鎖子線程鎖定共享資源,1把用于給子線程開啟的子線程鎖定共享資源t1.start()print('task done...')print(num)print(num2)while True:# 如果當(dāng)前剩下1個(gè)活動(dòng)的線程,說明其它子線程任務(wù)都完成了,只剩下主線程了if threading.active_count() == 1:print(num)print(num2)break遞歸鎖
上面的這個(gè)死鎖解決方式
""" 使用1把遞歸鎖可以解決上面對(duì)問題,但是上面對(duì)問題是可以避免的,只需先釋放鎖,在調(diào)用write(r_lock)方法,即可 """read_lock = threading.RLock()num = 0 num2 = 0def read(r_lock):r_lock.acquire() # 先獲取一把鎖,保證同一時(shí)刻只能有一個(gè)線程操作共享數(shù)據(jù)global num # 準(zhǔn)備對(duì)共享數(shù)據(jù)進(jìn)行操作time.sleep(random.randint(0, 3)) # 修改該資源比較費(fèi)時(shí),需要大約0-3秒num += 1 # 對(duì)共享數(shù)據(jù)加1# 這時(shí)還想對(duì)一個(gè)比較耗費(fèi)時(shí)間對(duì)資源進(jìn)行操作,所以又開啟一個(gè)新線程,該資源也是共享資源,所以也需要鎖定資源# new_task = threading.Thread(target=write, args=(w_lock,))# new_task.start()write(r_lock)# 然后需要釋放共享資源r_lock.release()def write(w_lock):current_thread = threading.current_thread() # 獲取當(dāng)前線程print("write %s do something..." % current_thread.name)w_lock.acquire() # 先鎖定資源global num2 # 準(zhǔn)備對(duì)共享資源進(jìn)行操作time.sleep(random.randint(0, 2)) # 修改該資源比較費(fèi)時(shí),需要大約0-5秒num2 += 1 # 花費(fèi)數(shù)秒秒才把資源修改完成w_lock.release() # 資源修改完了if __name__ == '__main__':for task in range(10): # 開啟10個(gè)任務(wù)t1 = threading.Thread(target=read, args=(read_lock,)) # 傳入2把鎖,1把用于鎖子線程鎖定共享資源,1把用于給子線程開啟的子線程鎖定共享資源t1.start()print('task done...')print(num)print(num2)while True:# 如果當(dāng)前剩下1個(gè)活動(dòng)的線程,說明其它子線程任務(wù)都完成了,只剩下主線程了if threading.active_count() == 1:print(num)print(num2)break轉(zhuǎn)載于:https://www.cnblogs.com/zengchunyun/p/7155819.html
總結(jié)
- 上一篇: Spark学习笔记(8)---Spark
- 下一篇: 织梦采集文章