python结束线程池正在运行的线程_python之线程与线程池
#進程是資源分配的最小單位,線程是CPU調度的最小單位.每一個進程中至少有一個線程。#傳統的不確切使用線程的程序稱為只含有一個線程或單線程程序,而可以使用線程的程序被稱為多線程程序,在程序中使用一個線程的方法#被稱為多線程#線程的模塊:#thread >> 實現線程的低級接口#threading>>> 可以提供高級方法
#同一進程下的各個線程是可以共享該進程的所有的資源的,各個線程之間是可以相互影響的
1.線程創建的兩種方式,與進程創建的兩種方式基本
from threading importThreadfrom multiprocessing importProcessimporttimedeffucn1(n):
time.sleep(1)print('XXXXXXXXXXXXX',n)if __name__ == '__main__':#p = Process(target=fucn1,args=(1,))
t = Thread(target=fucn1,args=(1,))print(t1.isAlive())#返回線程是否活動的
print(t1.getName())#返回線程名
t1.setName()#設置線程名
t.start()#開啟線程的速度非常快
t.join() #等待子線程運行結束之后才進行下面的代碼
print('主線程結束')
方式1
classgg(Thread):def __init__(self,n):
super().__init__()
self.n=ndefrun(self):print('xxx')print(self.n)if __name__ == '__main__':
t1= gg(66)
t1.start()
與進程創建運行相似
方式2
# threading模塊提供的一些方法:
# threading.currentThread(): 返回當前的線程變量。
# threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
# threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果
1.1
from threading importThreadfrom multiprocessing importProcessimporttimedefjisuan():for i in range(100000):
i+= 1
#print(i)
defpro():
p= Process(target=jisuan)
p1= Process(target=jisuan)
p.start()
p1.start()
p1.join()
p.join()defthreading():
t1= Thread(target=jisuan)
t2= Thread(target=jisuan)
t1.start()
t2.start()if __name__ == '__main__':
t1=time.time()
pro()
t2=time.time()
t3=time.time()
threading()
t4=time.time()print(t2-t1)#0.24815773963928223
print(t4-t3)#0.01202702522277832
線程與進程之間的效率對比
1.2
from threading importThreadimportthreadingfrom multiprocessing importProcessimportosdefwork():importtime
time.sleep(3)print(threading.current_thread().getName())if __name__ == '__main__':#在主進程下開啟線程
t=Thread(target=work)
t.start()print(threading.current_thread())#主線程對象
print(threading.current_thread().getName()) #主線程名稱
print(threading.current_thread().ident) #主線程ID
print(threading.get_ident()) #主線程ID
print(threading.enumerate()) #連同主線程在內有兩個運行的線程
print(threading.active_count())print('主線程/主進程')#'''#打印結果:#<_mainthread started>#MainThread#14104#[<_mainthread started>, ]#主線程/主進程#Thread-1#'''
一些不常用的方法
1.3
#一個主線程要等待所有的非守護線程結束才結束#(主線程的代碼執行完之后主線程并沒有結束,而要等待所有的非守護進程執行完并返回結果后才結束)
#主進程默認是在執行完代碼之后,相當于結束了,并不關心所有的子進程的執行結果,只是關心所有的子進程是否結束的的信號,#接收到所有子進程結束的信號之后,主進程(程序)才結束
importtimefrom threading importThreaddeffunc():
time.sleep(3)print('任務1')deffunc1():
time.sleep(2)print('任務2')if __name__ == '__main__':
t1= Thread(target=func)
t2= Thread(target=func1)
t1.daemon=True
t1.start()
t2.start()print('主線程結束')#結果
'''主線程結束
任務2'''
守護進程
1.4
from threading importThreadfrom multiprocessing importProcessimporttime
a= 100
deffucn1():globala
a-= 1 #等同于
temp =a
time.sleep(0.001)#驗證關鍵點
temp = temp -1a=temp
time.sleep(5)if __name__ == '__main__':
gg=[]for i in range(100):
t= Thread(target=fucn1)
t.start()#開啟線程的速度非常快
gg.append(t)print(t.is_alive())
[tt.join()for tt ingg]print(a)print('主線程結束')#線程共享進程的數據,由于數據是共享的也會有數據的不安全的情況(數據混亂),#但是由于線程的創建的速度非常快,如果加上系統的線程不多的話,#效果不明顯#解決共享數據不安全: 加鎖 ,對取值和修改值的的操作開始加鎖(與多進程加鎖一樣)
驗證線程之間的數據是共享的,但是也存在數據的安全的問題
信號量,事件等與進程的操作方法一樣#Semaphore管理一個內置的計數器,#每當調用acquire()時內置計數器-1;#調用release() 時內置計數器+1;#計數器不能小于0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。#
#實例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數為5):## 基本代碼如下#def func(sm):#sm.acquire()## 賦值或修改的代碼#sm.release()#if __name__ == '__main__':#sm=Semaphore(5)#t = Thread(target=func)
#事件與進程的一樣,#event.isSet() 查看等待的狀態不一樣#event.wait():如果 event.isSet()==False將阻塞線程;#event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;#event.clear():恢復event的狀態值為False。
與進程功能基本一樣的相關說明
線程隊列
#線程隊列#使用import queue,用法與進程Queue一樣,直接引入,不用通過threading 模塊引入
#1>class queue.Queue(maxsize=0) #先進先出
importqueue#q = queue.Queue(3)#創建一個容量為3的隊列#q.put(1)#q.put(2)#q.put(3)#在隊列塞滿三個元素后,如果繼續塞元素,就會進入一個阻塞的狀態## 但是如果使用q.put_nowait()塞元素的話,到塞滿之后再塞的話,就會直接拋出隊列已滿的異常,## 不會進入阻塞的狀態,與q.get_nowait()相似#print(q.get())#1#print(q.get())#2#print(q.get())#3##按照添加的順序進行輸出#print(q.get())#>>>>>>>>>取到第四個的時候,隊列已經是空的了,如果使用這個的話,就會進入## 阻塞的狀態#print(q.get_nowait())#>>>>>>但是如果取到第四個使用這個的話,不會進入阻塞的狀態,直接##拋出異常#
#
## 2>class queue.LifoQueue(maxsize=0) #先進后出#q = queue.LifoQueue(3)#q.put(1)#q.put(2)#q.put(None)#### # 取值的時候輸出為#print(q.get())#None#print(q.get())#2#print(q.get())#1#
## 3>class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列#q = queue.PriorityQueue(4)## 在設置的時候,元組的方式進行添加 如:(優先級,元素),## 優先級通過使用數字來表示,數字越小優先級越高##如果優先級一樣,就會按照元素的ASCIll順序進行輸出,相同優先級的兩個元素能夠進行比較(同優先級的兩個元素必須是同種類型的)##字典類型的東西不能進行比較#q.put((-10,1))#q.put((-10,3))#q.put((1,20))#q.put((2,'我'))#
##按照優先級進行輸出#print(q.get())#(-10, 1)#print(q.get())#(-10, 3)#print(q.get())#(1, 20)#print(q.get())#(2, '我')#
#
## 這三隊列是安全的,不存在多個線程搶占同一資源或數據的情況
線程三種隊列的使用
線程池
submit的使用
源代碼欣賞importthreadingimportosclassThreadPoolExecutor(_base.Executor):def __init__(self, max_workers=None, thread_name_prefix=''): #初始化方法,設置線程池的最大線程數量
if max_workers is None:#線程池的默認設置
max_workers = (os.cpu_count() or 1) *5默認設置的線程數是CPU核數的5倍if max_workers <=0:raise ValueError("max_workers must be greater than 0")
self._max_workers=max_workers
self._work_queue=queue.Queue()
self._threads=set()
self._shutdown=False
self._shutdown_lock= threading.Lock()#創建線程鎖
self._thread_name_prefix = (thread_name_prefix or("ThreadPoolExecutor-%d" %self._counter()))def submit(self, fn, *args, **kwargs):#創建一個線程,并異步提交任務
with self._shutdown_lock:ifself._shutdown:raise RuntimeError('cannot schedule new futures after shutdown')
f=_base.Future()
w=_WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()returnf
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):#調整線程池的數量
def weakref_cb(_, q=self._work_queue):
q.put(None)
num_threads=len(self._threads)if num_threads < self._max_workers: #創建線程的過程
thread_name = '%s_%d' % (self._thread_name_prefix orself,
num_threads)
t= threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon=True
t.start()
self._threads.add(t)
_threads_queues[t]=self._work_queuedef shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown=True
self._work_queue.put(None)ifwait:for t inself._threads:
t.join()
class ThreadPoolExecutor的源碼欣賞
1.1 submit的基本使用
#常用基本方法#class ThreadPoolExecutor():#def submit(self, fn, *args, **kwargs):#創建一個線程,并異步提交任務#pass#def shutdown(self,wait=True):#相當于進程池中的p.close() 和p.join()#pass##wait = True ,等待池內所有任務執行完畢回收完資源后才繼續##wait = False,立即返回,并不會等待池內的任務執行完畢## 但不管wait參數為何值,整個程序都會等到所有任務執行完畢## submit和map必須在shutdown之前#通過
#import time#import threading#from concurrent.futures import ThreadPoolExecutor#def func(i):#time.sleep(2)#print('%s打印的:'%(threading.get_ident()),)#return i*i#
#tpool = ThreadPoolExecutor(max_workers= 5)#
#t_lst = []#for i in range(5):#t = tpool.submit(func,i)#異步提交任務,與apply_async 相似,返回的也是一個結果對象#t_lst.append(t)#tpool.shutdown()#for a in t_lst:#print('>>',a.result())#獲取
線程池submit的基本使用
1.2 map的基本使用
1.2.1 源碼欣賞
def map(self, fn, *iterables, timeout=None, chunksize=1):if timeout is notNone:
end_time= timeout +time.time()
fs= [self.submit(fn, *args) for args in zip(*iterables)]def result_iterator():#生成器
try:
fs.reverse()whilefs:#Careful not to keep a reference to the popped future
if timeout isNone:yieldfs.pop().result()else:yield fs.pop().result(end_time -time.time())finally:for future infs:
future.cancel()return result_iterator()
map方法的源碼欣賞
1.2.2 map的基本使用(驗證使用過程)
#簡單使用
from concurrent.futures importThreadPoolExecutorimportthreadingimportos,time,randomdeftask(n):print('%s is running'%(threading.get_ident()))#time.sleep(random.randint(1,2))
time.sleep(10)#測試for循環取值的時候,如果執行的子線程還沒有執行完的時候的情況
return n**2
if __name__ == '__main__':
t_pool= ThreadPoolExecutor(max_workers=3)
s= t_pool.map(task,range(1,5))#map取代for + sumbit
print(s) #.result_iterator at 0x000000C536C2B0F8>
for i ins :print(i)#print([i for i in s])#
print('主程序結束')'''#前面4個瞬間就出來
7252 is running
3084 is running
7824 is running
.result_iterator at 0x000000AF18AAA2B0>
7252 is running #延遲大概10s后后面4個瞬間出來
1
4
9
16#延遲10s后兩個瞬間出來
主程序結束'''
map的簡單使用
1.3 submit回調函數的應用
from concurrent.futures importThreadPoolExecutor,ProcessPoolExecutorfrom multiprocessing importPoolimportrequestsimportjsonimportosdefget_page(url):print(' get %s' %(os.getpid(),url))
respone=requests.get(url)if respone.status_code == 200:return {'url':url,'text':respone.text}defparse_page(res):
res=res.result()print(' parse %s' %(os.getpid(),res['url']))
parse_res='url: size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res)if __name__ == '__main__':
urls=['https://www.baidu.com','https://www.python.org','https://www.openstack.org','https://help.github.com/','http://www.sina.com.cn/']#p=Pool(3)
#for url in urls:
#p.apply_async(get_page,args=(url,),callback=pasrse_page)
#p.close()
#p.join()
p=ProcessPoolExecutor(3)for url inurls:
p.submit(get_page,url).add_done_callback(parse_page)#parse_page拿到的是一個future對象obj,需要用obj.result()拿到結果
回調函數的應用
1.4 線程與進程之間的性能測試
#進程可以利用多核,但是開銷大,而python的多線程開銷小,但卻無法利用多核優勢
#現在的計算機基本上都是多核,python對于計算密集型的任務開多線程的效率并不能帶來多大性能上的提升,#甚至不如串行(沒有大量切換),但是,對于IO密集型的任務效率還是有顯著提升的。
from multiprocessing importProcessfrom threading importThreadimportthreadingimportos,timedefwork():
time.sleep(2)print('===>')if __name__ == '__main__':
l=[]print(os.cpu_count()) #本機為4核
start=time.time()for i in range(400):#p=Process(target=work) #耗時12s多,大部分時間耗費在創建進程上
p=Thread(target=work) #耗時2s多
l.append(p)
p.start()for p inl:
p.join()
stop=time.time()print('run time is %s' %(stop-start))## I/O密集型:多線程效率高
from multiprocessing importProcessfrom threading importThreadimportos,timedefwork():
res=0for i in range(100000000):
res*=iif __name__ == '__main__':
l=[]print(os.cpu_count()) #本機為4核
start=time.time()for i in range(4):
p=Process(target=work) #耗時5s多
p=Thread(target=work) #耗時18s多
l.append(p)
p.start()for p inl:
p.join()
stop=time.time()print('run time is %s' %(stop-start))#
## 計算密集型:多進程效率高#多線程用于IO密集型,如socket,爬蟲,web#多進程用于計算密集型,如金融分析
線程與進程之家你的性能測試
1.5 線程的使用補充
#線程提供了一種便利的能夠同時處理多個請求的高效的服務器#多線程服務器基本有著同樣的體系結構, :主線程負責偵聽請求的線程#當它收到一個請求的時候,一個新的工作者線程就會被建立起來,處理該客戶端#的請求,當客戶端斷開連接時候,工作者線程會終止
#線程池被設計成一個線程同時只為一個客戶服務,但是在服務結束之后#線程并不終止,線程池中的線程要么是事先全部建立起來,要么是在需要的時候被建立起來#在客戶端斷開連接的時候,線程并不終止,而是保持著,等待為更多的連接提供服務#
#線程池通常包含:#1.一個主要的偵聽線程來接收和分派客戶端的連接#2.一些工作者線程用來處理客戶端請求#3.一個線程管理系統用來處理那些意外終止的線程#
總結
以上是生活随笔為你收集整理的python结束线程池正在运行的线程_python之线程与线程池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: windows ftp服务器_ftp客户
- 下一篇: python扩展库丰富吗_python扩