python线程池模块_python并发编程之进程池,线程池,协程(Python标准模块--concurrent.futures(并发未来))...
需要注意一下
不能無限的開進程,不能無限的開線程
最常用的就是開進程池,開線程池。其中回調函數非常重要
回調函數其實可以作為一種編程思想,誰好了誰就去掉
只要你用并發,就會有鎖的問題,但是你不能一直去自己加鎖吧
那么我們就用QUEUE,這樣還解決了自動加鎖的問題
由Queue延伸出的一個點也非常重要的概念。以后寫程序也會用到
這個思想。就是生產者與消費者問題
一、Python標準模塊--concurrent.futures(并發未來)
concurent.future模塊需要了解的
1.concurent.future模塊是用來創建并行的任務,提供了更高級別的接口,
為了異步執行調用
2.concurent.future這個模塊用起來非常方便,它的接口也封裝的非常簡單
3.concurent.future模塊既可以實現進程池,也可以實現線程池
4.模塊導入進程池和線程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
還可以導入一個Executor,但是你別這樣導,這個類是一個抽象類
抽象類的目的是規范他的子類必須有某種方法(并且抽象類的方法必須實現),但是抽象類不能被實例化
5.
p = ProcessPoolExecutor(max_works)對于進程池如果不寫max_works:默認的是cpu的數目,默認是4個
p = ThreadPoolExecutor(max_works)對于線程池如果不寫max_works:默認的是cpu的數目*5
6.如果是進程池,得到的結果如果是一個對象。我們得用一個.get()方法得到結果
但是現在用了concurent.future模塊,我們可以用obj.result方法
p.submit(task,i) #相當于apply_async異步方法
p.shutdown() #默認有個參數wite=True (相當于close和join)
那么什么是線程池呢?我們來了解一下
二、線程池
進程池:就是在一個進程內控制一定個數的線程
基于concurent.future模塊的進程池和線程池 (他們的同步執行和異步執行是一樣的)
1 #1.同步執行--------------
2 from concurrent.futures importProcessPoolExecutor,ThreadPoolExecutor3 importos,time,random4 deftask(n):5 print('[%s] is running'%os.getpid())6 time.sleep(random.randint(1,3)) #I/O密集型的,,一般用線程,用了進程耗時長
7 return n**2
8 if __name__ == '__main__':9 start =time.time()10 p =ProcessPoolExecutor()11 for i in range(10): #現在是開了10個任務, 那么如果是上百個任務呢,就不能無線的開進程,那么就得考慮控制
12 #線程數了,那么就得考慮到池了
13 obj = p.submit(task,i).result() #相當于apply同步方法
14 p.shutdown() #相當于close和join方法
15 print('='*30)16 print(time.time() - start) #17.36499309539795
17
18
19 #2.異步執行-----------
20 #from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
21 #import os,time,random
22 #def task(n):
23 #print('[%s] is running'%os.getpid())
24 #time.sleep(random.randint(1,3)) #I/O密集型的,,一般用線程,用了進程耗時長
25 #return n**2
26 #if __name__ == '__main__':
27 #start = time.time()
28 #p = ProcessPoolExecutor()
29 #l = []
30 #for i in range(10): #現在是開了10個任務, 那么如果是上百個任務呢,就不能無線的開進程,那么就得考慮控制
31 ## 線程數了,那么就得考慮到池了
32 #obj = p.submit(task,i) #相當于apply_async()異步方法
33 #l.append(obj)
34 #p.shutdown() #相當于close和join方法
35 #print('='*30)
36 #print([obj.result() for obj in l])
37 #print(time.time() - start) #5.362306594848633
基于concurrent.futures模塊的進程池
1 from concurrent.futures importProcessPoolExecutor,ThreadPoolExecutor2 from threading importcurrentThread3 importos,time,random4 deftask(n):5 print('%s:%s is running'%(currentThread().getName(),os.getpid())) #看到的pid都是一樣的,因為線程是共享了一個進程
6 time.sleep(random.randint(1,3)) #I/O密集型的,,一般用線程,用了進程耗時長
7 return n**2
8 if __name__ == '__main__':9 start =time.time()10 p = ThreadPoolExecutor() #線程池 #如果不給定值,默認cup*5
11 l =[]12 for i in range(10): #10個任務 # 線程池效率高了
13 obj = p.submit(task,i) #相當于apply_async異步方法
14l.append(obj)15 p.shutdown() #默認有個參數wite=True (相當于close和join)
16 print('='*30)17 print([obj.result() for obj inl])18 print(time.time() - start) #3.001171827316284
基于concurrent.futures模塊的線程池
應用線程池(下載網頁并解析)
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import requests
import time,os
def get_page(url):
print(' is getting [%s]'%(os.getpid(),url))
response = requests.get(url)
if response.status_code==200: #200代表狀態:下載成功了
return {'url':url,'text':response.text}
def parse_page(res):
res = res.result()
print(' is getting [%s]'%(os.getpid(),res['url']))
with open('db.txt','a') as f:
parse_res = 'url:%s size:%s\n'%(res['url'],len(res['text']))
f.write(parse_res)
if __name__ == '__main__':
# p = ThreadPoolExecutor()
p = ProcessPoolExecutor()
l = [
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
]
for url in l:
res = p.submit(get_page,url).add_done_callback(parse_page) #這里的回調函數拿到的是一個對象。得
# 先把返回的res得到一個結果。即在前面加上一個res.result() #誰好了誰去掉回調函數
# 回調函數也是一種編程思想。不僅開線程池用,開線程池也用
p.shutdown() #相當于進程池里的close和join
print('主',os.getpid())
map函數的應用
# map函數舉例
obj= map(lambda x:x**2 ,range(10))
print(list(obj))
#運行結果[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
可以和上面的開進程池/線程池的對比著看,就能發現map函數的強大了
map函數的應用
三、協程介紹
協程:單線程下實現并發(提高效率)
說到協成,我們先說一下協程聯想到的知識點
yield復習
1 3.yield功能2(可以吧函數暫停住,保存原來的狀態)--------------
2 deff1():3 print('first')4 yield 1
5 print('second')6 yield 2
7 print('third')8 yield 3
9 #print(f1()) #加了yield返回的是一個生成器
10 g =f1()11 print(next(g)) #當遇見了yield的時候就返回一個值,而且保存原來的狀態
12 print(next(g)) #當遇見了yield的時候就返回一個值
13 print(next(g)) #當遇見了yield的時候就返回一個值
yield功能示例1
1 #3.yield表達式(對于表達式的yield)--------------------
2 importtime3 defwrapper(func):4 def inner(*args,**kwargs):5 ret =func(*args,**kwargs)6next(ret)7 returnret8 returninner9@wrapper10 defconsumer():11 whileTrue:12 x= yield
13 print(x)14
15 defproducter(target):16 '''生產者造值'''
17 #next(g) #相當于g.send(None)
18 for i in range(10):19 time.sleep(0.5)20 target.send(i)#要用send就得用兩個yield
21 producter(consumer())
yield功能示例2
引子
本節主題是實現單線程下的并發,即只在一個主線程,并且很明顯的是,可利用的cpu只有一個情況下實現并發,
為此我們需要先回顧下并發的本質:切換+保存狀態
cpu正在運行一個任務,會在兩種情況下切走去執行其他的任務(切換由操作系統強制控制),
一種情況是該任務發生了阻塞,另外一種情況是該任務計算的時間過長
其中第二種情況并不能提升效率,只是為了讓cpu能夠雨露均沾,實現看起來大家都被執行的效果,如果多個程序都是純計算任務,這種切換反而會降低效率。為此我們可以基于yield來驗證。yield本身就是一種在單線程下可以保存任務運行狀態的方法,我們來簡單復習一下:
單純的切反而會影響效率
1 #串行執行
2 importtime3 defconsumer(res):4 '''任務1:接收數據,處理數據'''
5 pass
6
7 defproducer():8 '''任務2:生產數據'''
9 res=[]10 for i in range(10000000):11res.append(i)12 returnres13
14 start=time.time()15 #串行執行
16 res=producer()17consumer(res)18 stop=time.time()19 print(stop-start) #1.5536692142486572
串行執行
1 importtime2 defwrapper(func):3 def inner(*args,**kwargs):4 ret =func(*args,**kwargs)5next(ret)6 returnret7 returninner8@wrapper9 defconsumer():10 whileTrue:11 x= yield
12 print(x)13
14 defproducter(target):15 '''生產者造值'''
16 #next(g) #相當于g.send(None)
17 for i in range(10):18 time.sleep(0.5)19 target.send(i)#要用send就得用兩個yield
20 producter(consumer())
基于yield并發執行
對于單線程下,我們不可避免程序中出現io操作,但如果我們能在自己的程序中(即用戶程序級別,而非操作系統級別)控制單線程下多個任務能遇到io就切換,這樣就保證了該線程能夠最大限度地處于就緒態,即隨時都可以被cpu執行的狀態,相當于我們在用戶程序級別將自己的io操作最大限度地隱藏起來,對于操作系統來說:這哥們(該線程)好像是一直處于計算過程的,io比較少。
協程的本質就是在單線程下,由用戶自己控制一個任務遇到io阻塞了就切換另外一個任務去執行,以此來提升效率。
因此我們需要找尋一種可以同時滿足以下條件的解決方案:
1. 可以控制多個任務之間的切換,切換之前將任務的狀態保存下來(重新運行時,可以基于暫停的位置繼續)
2. 作為1的補充:可以檢測io操作,在遇到io操作的情況下才發生切換
四、Greenlet
Greenlet模塊和yield沒有什么區別,就只是單純的切,跟效率無關。
只不過比yield更好一點,切的時候方便一點。但是仍然沒有解決效率
Greenlet可以讓你在多個任務之間來回的切
舉例:
1 from greenlet importgreenlet2 importtime3 defeat(name):4 print('%s eat 1' %name)5 time.sleep(10) #當遇到IO的時候它也沒有切,這就得用gevent了
6 g2.switch('egon')7 print('%s eat 2' %name)8g2.switch()9 defplay(name):10 print('%s play 1' %name)11g1.switch()12 print('%s play 2' %name)13
14 g1=greenlet(eat)15 g2=greenlet(play)16
17 g1.switch('egon')#可以在第一次switch時傳入參數,以后都不需要
greenlet
所以上面的方法都不可行,那么這就用到了Gevert ,也就是協程。就解決了單線程實現并發的問題,還提升了效率
五、Gevent介紹
Gevent 是一個第三方庫,可以輕松通過gevent實現并發同步或異步編程,在gevent中用到的主要模式是Greenlet,
它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。
#用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)創建一個協程對象g1,spawn括號內第一個參數是函數名,如eat,后面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的
g2=gevent.spawn(func2)
g1.join() #等待g1結束
g2.join() #等待g2結束
#或者上述兩步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值
舉例;
1 from gevent importmonkey;monkey.patch_all()2 importgevent3 importtime4 defeat(name):5 print('%s eat 1' %name)6 time.sleep(2) #我們用等待的時間模擬IO阻塞
7 '''在gevent模塊里面要用gevent.sleep(2)表示等待的時間
8 然而我們經常用time.sleep()用習慣了,那么有些人就想著
9 可以用time.sleep(),那么也不是不可以。要想用,就得在
10 最上面導入from gevent import monkey;monkey.patch_all()這句話
11 如果不導入直接用time.sleep(),就實現不了單線程并發的效果了
12'''
13 #gevent.sleep(2)
14 print('%s eat 2' %name)15 return 'eat'
16 defplay(name):17 print('%s play 1' %name)18 time.sleep(3)19 #gevent.sleep(3)
20 print('%s play 2' %name)21 return 'paly' #當有返回值的時候,gevent模塊也提供了返回結果的操作
22
23 start =time.time()24 g1 = gevent.spawn(eat,'egon') #執行任務
25 g2 = gevent.spawn(play,'egon') #g1和g2的參數可以不一樣
26 #g1.join() #等待g1
27 #g2.join() #等待g2
28 #上面等待的兩句也可以這樣寫
29gevent.joinall([g1,g2])30 print('主',time.time()-start) #3.001171588897705
31
32 print(g1.value)33 print(g2.value)
gevent的一些方法(重要)
需要說明的是:
gevent.sleep(2)模擬的是gevent可以識別的io阻塞,
而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行代碼,打補丁,就可以識別了
from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊之前
或者我們干脆記憶成:要用gevent,需要將from gevent import monkey;monkey.patch_all()放到文件的開頭
六、Gevent之同步于異步
1 from gevent importspawn,joinall,monkey;monkey.patch_all()2
3 importtime4 deftask(pid):5 """6 Some non-deterministic task
7"""
8 time.sleep(0.5)9 print('Task %s done' %pid)10
11
12 defsynchronous():13 for i in range(10):14task(i)15
16 defasynchronous():17 g_l=[spawn(task,i) for i in range(10)]18joinall(g_l)19
20 if __name__ == '__main__':21 print('Synchronous:')22synchronous()23
24 print('Asynchronous:')25asynchronous()26 #上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,后者阻塞當前流程,并執行所有給定的greenlet。執行流程只會在 所有greenlet執行完后才會繼續向下走。
View Code
七、Gevent之應用舉例一
1 from gevent import monkey;monkey.patch_all() #打補丁
2 importgevent3 importrequests4 importtime5 defget_page(url):6 print('get :%s'%url)7 response =requests.get(url)8 if response.status_code==200: #下載成功的狀態
9 print('%d bytes received from:%s'%(len(response.text),url))10 start=time.time()11gevent.joinall([12 gevent.spawn(get_page,'http://www.baidu.com'),13 gevent.spawn(get_page, 'https://www.yahoo.com/'),14 gevent.spawn(get_page, 'https://github.com/'),15])16 stop =time.time()17 print('run time is %s' %(stop-start))
協程應用爬蟲
from gevent importjoinall,spawn,monkey;monkey.patch_all()importrequestsfrom threading importcurrent_threaddefparse_page(res):print('%s PARSE %s' %(current_thread().getName(),len(res)))def get_page(url,callback=parse_page):print('%s GET %s' %(current_thread().getName(),url))
response=requests.get(url)if response.status_code == 200:
callback(response.text)if __name__ == '__main__':
urls=['https://www.baidu.com','https://www.taobao.com','https://www.openstack.org',
]
tasks=[]for url inurls:
tasks.append(spawn(get_page,url))
joinall(tasks)
協程應用爬蟲(回調函數)
八、Gevent之應用舉例二
也可以利用協程實現并發
1 #!usr/bin/env python
2 #-*- coding:utf-8 -*-
3 from gevent importmonkey;monkey.patch_all()4 importgevent5 from socket import *
6 print('start running...')7 deftalk(conn,addr):8 whileTrue:9 data = conn.recv(1024)10 print('%s:%s %s'%(addr[0],addr[1],data))11conn.send(data.upper())12conn.close()13 defserver(ip,duankou):14 server =socket(AF_INET, SOCK_STREAM)15 server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)16server.bind((ip,duankou))17 server.listen(5)18 whileTrue:19 conn,addr = server.accept() #等待鏈接
20 gevent.spawn(talk,conn,addr) #異步執行 (p =Process(target=talk,args=(coon,addr))
21 #p.start())相當于開進程里的這兩句
22server.close()23 if __name__ == '__main__':24 server('127.0.0.1',8081)
服務端利用協程
1 #!usr/bin/env python
2 #-*- coding:utf-8 -*-
3 from multiprocessing importProcess4 from gevent importmonkey;monkey.patch_all()5 from socket import *
6 defclient(ip,duankou):7 client =socket(AF_INET, SOCK_STREAM)8client.connect((ip,duankou))9 whileTrue:10 client.send('hello'.encode('utf-8'))11 data = client.recv(1024)12 print(data.decode('utf-8'))13 if __name__ == '__main__':14 for i in range(100):15 p = Process(target=client,args=(('127.0.0.1',8081)))16 p.start()
客戶端開了100個進程
總結
以上是生活随笔為你收集整理的python线程池模块_python并发编程之进程池,线程池,协程(Python标准模块--concurrent.futures(并发未来))...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何看当前windows是utf8还是g
- 下一篇: vivox27升级鸿蒙,vivo x27