基础10 多进程、协程(multiprocessing、greenlet、gevent、gevent.monkey、select、selector)...
1.多進(jìn)程實(shí)現(xiàn)方式(類似于多線程)
1 import multiprocessing 2 import time,threading 3 4 def thread_run():#定義一個(gè)線程函數(shù) 5 print("我是子線程%s" %threading.get_ident()) ?#threading.get_ident()函數(shù)獲取當(dāng)前線程的id 6 def run(name):#定義一個(gè)進(jìn)程函數(shù) 7 time.sleep(1) 8 print("hello,我是進(jìn)程%s" %name) 9 t = threading.Thread(target=thread_run(),)#在進(jìn)程下運(yùn)行子線程 10 t.start() 11 if __name__ == '__main__': ?#注意:啟多進(jìn)程必須要加這個(gè)判斷,否則會(huì)有意想不到的錯(cuò)誤 12 for i in range(3): 13 p = multiprocessing.Process(target=run,args=('bob %s' %i,)) 14 p.start() 15 p.join() 16 17 輸出: 18 hello,我是進(jìn)程bob 0 19 我是子線程28176 20 hello,我是進(jìn)程bob 1 21 我是子線程27016 22 hello,我是進(jìn)程bob 2 23 我是子線程275162.進(jìn)程都是由父進(jìn)程創(chuàng)建和啟動(dòng)的
1 from multiprocessing import Process 2 import os 3 4 5 def info(title): 6 print(title) 7 print('module name:', __name__) 8 print('當(dāng)前進(jìn)程父進(jìn)程id:', os.getppid()) 9 print('當(dāng)前進(jìn)程 id:', os.getpid()) 10 print("\n\n") 11 12 13 def f(name): 14 info('\033[31;1mcalled from child process function f\033[0m') 15 print('hello', name) 16 17 if __name__ == '__main__': 18 info('\033[32;1mmain process line\033[0m') 19 p = Process(target=f, args=('bob',)) # 20 p.start() 21 22 輸出: 23 main process line 24 module name: __main__ 25 當(dāng)前進(jìn)程父進(jìn)程id: 12468 #這是pycharm的進(jìn)程號(hào) 26 當(dāng)前進(jìn)程 id: 25692 #這是第一個(gè)info函數(shù)的進(jìn)程號(hào) 27 28 29 30 called from child process function f 31 module name: __mp_main__ 32 當(dāng)前進(jìn)程父進(jìn)程id: 25692 #這是第一個(gè)info函數(shù)的進(jìn)程號(hào) 33 當(dāng)前進(jìn)程 id: 27252 #這是f函數(shù)內(nèi)調(diào)用的info函數(shù)的進(jìn)程號(hào) 34 35 36 37 hello bob3.進(jìn)程間是不共享內(nèi)存的(不同于線程),即進(jìn)程之間是不能直接交換信息的
可以用以下方式實(shí)現(xiàn)進(jìn)程間通信,用Queue隊(duì)列(線程中用queue)
4.實(shí)現(xiàn)不同進(jìn)程間通信的另外一種方法
?(1)管道(Pipe()),類似于socket
1 from multiprocessing import Process, Pipe 2 def f(conn): 3 conn.send([42, 'hello'])#類似于socket,這里子進(jìn)程發(fā)了兩次,下邊父進(jìn)程就需要收兩次 4 conn.send([32, 'yyyy']) 5 print(conn.recv()) #子進(jìn)程也可以接收父進(jìn)程發(fā)過來的信息 6 conn.close() 7 8 if __name__ == '__main__': 9 parent_conn, child_conn = Pipe() 10 p = Process(target=f, args=(child_conn,)) 11 p.start() 12 print(parent_conn.recv())#收兩次 13 print(parent_conn.recv()) 14 parent_conn.send('我是父進(jìn)程A') #父進(jìn)程可以給子進(jìn)程發(fā)信息 15 p.join() 16 17 輸出: 18 [42, 'hello'] 19 [32, 'yyyy'] 20 我是父進(jìn)程A5.實(shí)現(xiàn)進(jìn)程間內(nèi)存的共享(所謂共享即所有進(jìn)程操作同一份列表、字典等等,其實(shí)也不真的是同一份數(shù)據(jù),只不過是程序內(nèi)部通過復(fù)制原始列表、字典給進(jìn)程,進(jìn)程運(yùn)行以后修改列表、字典,最后合為一個(gè)列表、字典,讓人們以為是所有進(jìn)程共享了相同的內(nèi)存)
manager模塊可實(shí)現(xiàn)這一功能
6.進(jìn)程鎖,防止在屏幕上或日志里打印亂了
??如這種亂:hello worhello world 8
7.很重要? 進(jìn)程池????防止進(jìn)程啟動(dòng)太多,消耗太多資源,一般是幾個(gè)cpu就啟幾個(gè)進(jìn)程
?
1 from multiprocessing import Process, Pool 2 import time 3 import os 4 5 def foo(i): 6 time.sleep(1) 7 print("正在執(zhí)行子進(jìn)程", os.getpid()) 8 return os.getpid() #返回的是子進(jìn)程的id號(hào) 9 10 def bar(arg): #定義回調(diào)函數(shù),參數(shù)arg是foo函數(shù)的返回值 11 print("子進(jìn)程%s執(zhí)行完畢,主進(jìn)程%s調(diào)用的我"% (arg, os.getpid())) 12 13 if __name__ == '__main__': 14 pool = Pool(5) #允許進(jìn)程池同時(shí)放入5個(gè)進(jìn)程 15 print("主進(jìn)程", os.getpid()) 16 for i in range(10): 17 pool.apply_async(func=foo, args=(i,), callback=bar)#異步,異步最好,callback后跟回調(diào)函數(shù),回調(diào)函數(shù)是由主進(jìn)程執(zhí)行的 18 #回調(diào)函數(shù)的參數(shù)不用寫,默認(rèn)就是前邊的func 19 # pool.apply(func=foo, args=(i,)) #同步,是串行的,看不出是5個(gè)5個(gè)的執(zhí)行 20 print("主進(jìn)程:end") 21 pool.close() #注意,這里要先close,再join,否則會(huì)有錯(cuò)誤 22 pool.join()輸出:
1 主進(jìn)程 57784 2 主進(jìn)程:end 3 正在執(zhí)行子進(jìn)程 58060 4 子進(jìn)程58060執(zhí)行完畢,主進(jìn)程57784調(diào)用的我 5 正在執(zhí)行子進(jìn)程 55428 6 子進(jìn)程55428執(zhí)行完畢,主進(jìn)程57784調(diào)用的我 7 正在執(zhí)行子進(jìn)程 58356 8 子進(jìn)程58356執(zhí)行完畢,主進(jìn)程57784調(diào)用的我 9 正在執(zhí)行子進(jìn)程 56716 10 子進(jìn)程56716執(zhí)行完畢,主進(jìn)程57784調(diào)用的我 11 正在執(zhí)行子進(jìn)程 57380 12 子進(jìn)程57380執(zhí)行完畢,主進(jìn)程57784調(diào)用的我13 正在執(zhí)行子進(jìn)程 58060 14 子進(jìn)程58060執(zhí)行完畢,主進(jìn)程57784調(diào)用的我 15 正在執(zhí)行子進(jìn)程 55428 16 子進(jìn)程55428執(zhí)行完畢,主進(jìn)程57784調(diào)用的我 17 正在執(zhí)行子進(jìn)程 58356 18 子進(jìn)程58356執(zhí)行完畢,主進(jìn)程57784調(diào)用的我 19 正在執(zhí)行子進(jìn)程 56716 20 子進(jìn)程56716執(zhí)行完畢,主進(jìn)程57784調(diào)用的我 21 正在執(zhí)行子進(jìn)程 57380 22 子進(jìn)程57380執(zhí)行完畢,主進(jìn)程57784調(diào)用的我
?8.協(xié)程
又稱微線程,是一種用戶態(tài)的輕量級(jí)線程,協(xié)程擁有自己的寄存器和棧,協(xié)程調(diào)度切換時(shí),將寄存器上下文和棧保存在其他地方,當(dāng)切回來的時(shí)候,恢復(fù)先前保存的寄存器上下文和棧,因此,每一次協(xié)程恢復(fù)時(shí),
都能在它離開中斷的地方繼續(xù)。
協(xié)程的好處:
- 無需線程上下文切換的開銷
- 無需原子操作鎖定及同步的開銷
- 方便切換控制流,簡(jiǎn)化編程模型
- 高并發(fā)+高擴(kuò)展性+低成本:一個(gè)CPU支持上萬的協(xié)程都不是問題。所以很適合用于高并發(fā)處理。
缺點(diǎn):
- 無法利用多核資源:協(xié)程的本質(zhì)是個(gè)單線程,它不能同時(shí)將 單個(gè)CPU 的多個(gè)核用上,協(xié)程需要和進(jìn)程配合才能運(yùn)行在多CPU上.當(dāng)然我們?nèi)粘K帉懙慕^大部分應(yīng)用都沒有這個(gè)必要,除非是cpu密集型應(yīng)用。
- 進(jìn)行阻塞(Blocking)操作(如IO時(shí))會(huì)阻塞掉整個(gè)程序
用greenlet模塊實(shí)現(xiàn)協(xié)程(手動(dòng)):
1 from greenlet import greenlet 2 def test1(): 3 print("test1") #4.打印 4 gr2.switch() #5.切換到test2,test2開始執(zhí)行 5 print("test1...") #8.接上次離開的地方,開始執(zhí)行,打印 6 gr2.switch() #9.切換到test2,test2開始執(zhí)行 7 8 def test2(): 9 print("test2") #6.打印 10 gr1.switch() #7.切換到tets1,test1開始執(zhí)行 11 print("test2...") #10.接上次離開的地方,開始執(zhí)行,打印,程序結(jié)束 12 13 gr1 = greenlet(test1) #1.啟動(dòng)一個(gè)協(xié)程 14 gr2 = greenlet(test2) #2.啟動(dòng)一個(gè)協(xié)程 15 gr1.switch() #3.切換到test1,test1開始執(zhí)行 16 17 輸出: 18 test1 19 test2 20 test1... 21 test2...重要:用gevent模塊實(shí)現(xiàn)協(xié)程(自動(dòng)切換)。遇到IO(程序睡覺或卡主)便自動(dòng)切換到下一個(gè)協(xié)程
1 import gevent 2 3 4 def t1(): 5 print("t1 運(yùn)行") #4.t1開始運(yùn)行,打印 6 gevent.sleep(2) #5.切換到下一個(gè)協(xié)程,按生成的順序,切換到t2 7 #10.程序運(yùn)行到這里,只花費(fèi)了不到0.1秒鐘,這時(shí)t1還在睡覺,又觸發(fā)了切換操作,切換到t2 8 #13.程序運(yùn)行到這里,只花費(fèi)了不到0.1秒鐘,這時(shí)t1還在睡覺,又觸發(fā)了切換操作,切換到t2 9 print("再次回到t1") #16.接上次離開的地方,執(zhí)行打印操作 10 11 12 def t2(): 13 print("t2 運(yùn)行") #6.t2開始運(yùn)行,打印 14 gevent.sleep(1) #7.切換到下一個(gè)協(xié)程,按生成的順序,切換到t3 15 #11.程序運(yùn)行到這里,只花費(fèi)了不到0.1秒鐘,這時(shí)t2還在睡覺,又觸發(fā)了切換操作,切換到t3 16 #14.程序運(yùn)行到這里,只花費(fèi)了不到0.1秒鐘,這時(shí)t2還在睡覺,又觸發(fā)了切換操作,但是t3運(yùn)行 17 # 完畢,無法切換到t3,只能切換到t1,t1還在睡覺,又切換到t2,互相切換過了1秒后,t2睡覺 18 # 完畢,這時(shí)切換到t2 19 print("再次回到t2") #15.接上次離開的地方,執(zhí)行打印操作,切換到t1 20 21 22 def t3(): 23 print("t3運(yùn)行") #8.t3開始運(yùn)行,打印 24 gevent.sleep(0) #9.雖然睡覺0秒,但是會(huì)觸發(fā)切換操作,切換到下一個(gè)協(xié)程,按生成的順序,切換到t1 25 print("再次回到t3") #12.由于t3沒有睡覺,執(zhí)行打印操作。t3運(yùn)行完畢,自動(dòng)切換到t1 26 27 gevent.joinall([ 28 gevent.spawn(t1), #1.生成協(xié)程1 29 gevent.spawn(t2), #2.生成協(xié)程2 30 gevent.spawn(t3), #3.生成協(xié)程3 31 ]) 32 33 輸出:t1 運(yùn)行
t2 運(yùn)行
t3運(yùn)行
再次回到t3
再次回到t2
再次回到t1
?9.爬蟲,簡(jiǎn)單下載網(wǎng)頁,使用gevent和gevent.monkey模塊(需python3.5以上,gevent在linux完美支持,在Windows可能會(huì)有問題)
1 from urllib import request 2 import gevent,time 3 from gevent import monkey 4 monkey.patch_all() #把當(dāng)前程序的所有的io操作給gevent單獨(dú)的做上標(biāo)記,使gevent能辨識(shí)出io操作,從而達(dá)到協(xié)程遇到io就切換的效果,不加monkey是不能識(shí)別出io的 5 6 def f(url): 7 print('GET: %s' % url) 8 resp = request.urlopen(url) 9 data = resp.read() 10 print('%d bytes received from %s.' % (len(data), url)) 11 12 urls = ['https://www.python.org/', 13 'https://www.yahoo.com/', 14 'https://github.com/' ] 15 time_start = time.time() 16 for url in urls: 17 f(url) 18 print("同步cost",time.time() - time_start) 19 async_time_start = time.time() 20 gevent.joinall([ 21 gevent.spawn(f, 'https://www.python.org/'), 22 gevent.spawn(f, 'https://www.yahoo.com/'), 23 gevent.spawn(f, 'https://github.com/'), 24 ]) 25 print("異步cost",time.time() - async_time_start)?輸出:
1 GET: https://www.python.org/ 2 47403 bytes received from https://www.python.org/. 3 GET: https://www.yahoo.com/ 4 460629 bytes received from https://www.yahoo.com/. 5 GET: https://github.com/ 6 25734 bytes received from https://github.com/. 7 同步cost 5.380241632461548 8 GET: https://www.python.org/ 9 GET: https://www.yahoo.com/ 10 GET: https://github.com/ 11 47403 bytes received from https://www.python.org/. 12 430893 bytes received from https://www.yahoo.com/. 13 25734 bytes received from https://github.com/. 14 異步cost 1.2949371337890625?10.基于協(xié)程的異步多并發(fā)
socket?server
1 #!usr/bin/env/python 2 # -*- coding:utf-8 -*- 3 # from wangteng 4 #通過協(xié)程實(shí)現(xiàn)并發(fā) 5 import sys 6 import socket 7 import time 8 import gevent 9 10 from gevent import socket,monkey 11 monkey.patch_all() 12 13 14 def server(port): 15 s = socket.socket() 16 s.bind(('0.0.0.0',port)) 17 s.listen(500)#最大連接數(shù)為500 18 while True: 19 cli, addr = s.accept() 20 gevent.spawn(handle_request, cli)# 創(chuàng)建協(xié)程,這里做了一個(gè)操作,將cli作為參數(shù)傳給handle_request, handle_request(cli) 21 22 23 def handle_request(conn): 24 try: 25 while True: 26 data = conn.recv(1024)#接收client的請(qǐng)求 27 28 with open("bbb.txt",'ab') as f:#接收文件內(nèi)容,并寫入文件 29 f.write(data) 30 print("recv:", data) 31 # post = bytes('get your request: ',encoding='utf-8') 32 # post = post + data #post為要向client發(fā)送的數(shù)據(jù),必須為bytes類型 33 conn.send(data) #向client發(fā)送數(shù)據(jù) 34 if not data: 35 conn.shutdown(socket.SHUT_WR) 36 37 except Exception as ex: 38 print(ex) 39 40 finally: 41 conn.close() 42 43 if __name__ == "__main__": 44 server(9000)client
1 #!usr/bin/env/python 2 # -*- coding:utf-8 -*- 3 # from wangteng 4 #本客戶端可以跟異步socket_server多并發(fā)、select socket server一起用 5 import socket 6 7 host = 'localhost' 8 port = 9000 9 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 10 s.connect((host, port)) 11 while True: 12 msg = bytes(input(">>:"), encoding="utf-8") #輸入要發(fā)送的內(nèi)容,必須是bytes類型 13 s.sendall(msg) #發(fā)送數(shù)據(jù) 14 # filename = input("filename:>>") #注釋的內(nèi)容是可以發(fā)送文件的 15 # with open(filename,'rb') as f:#發(fā)送文件 16 # for line in f: 17 # s.sendall(line) 18 # data = s.recv(1024) 19 # print('recv:',data) 20 data = s.recv(1024)#接收數(shù)據(jù) 21 print('recv:', data.decode())#解碼可以顯示中文 22 s.close()11.io多路復(fù)用,不是異步???基于select
server
client
1 #!usr/bin/env/python 2 # -*- coding:utf-8 -*- 3 # from wangteng 4 #本客戶端可以跟異步socket_server多并發(fā)、select socket server一起用 5 import socket 6 7 host = 'localhost' 8 port = 9000 9 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 10 s.connect((host, port)) 11 while True: 12 msg = bytes(input(">>:"), encoding="utf-8") #輸入要發(fā)送的內(nèi)容,必須是bytes類型 13 s.sendall(msg) #發(fā)送數(shù)據(jù) 14 # filename = input("filename:>>") #注釋的內(nèi)容是可以發(fā)送文件的 15 # with open(filename,'rb') as f:#發(fā)送文件 16 # for line in f: 17 # s.sendall(line) 18 # data = s.recv(1024) 19 # print('recv:',data) 20 data = s.recv(1024)#接收數(shù)據(jù) 21 print('recv:', data.decode())#解碼可以顯示中文 22 s.close()12.??重要要會(huì):IO多路復(fù)用之? selector
server
?#將會(huì)調(diào)用accept函數(shù)去新建一個(gè)連接 31 32 while True: 33 events = sel.select() #默認(rèn)阻塞,有活動(dòng)連接就返回活動(dòng)的連接列表,events是一個(gè)列表,里邊存放所有的連接 34 for key, mask in events: #mask是連接的順序數(shù) 35 callback = key.data #調(diào)用accept函數(shù) 36 callback(key.fileobj, mask) #key.fileobj= 文件句柄
client
1 #!usr/bin/env/python 2 # -*- coding:utf-8 -*- 3 # from wangteng 4 #本客戶端可以跟異步socket_server多并發(fā)、select socket server一起用 5 import socket 6 7 host = 'localhost' 8 port = 9000 9 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 10 s.connect((host, port)) 11 while True: 12 msg = bytes(input(">>:"), encoding="utf-8") #輸入要發(fā)送的內(nèi)容,必須是bytes類型 13 s.sendall(msg) #發(fā)送數(shù)據(jù) 14 # filename = input("filename:>>") #注釋的內(nèi)容是可以發(fā)送文件的 15 # with open(filename,'rb') as f:#發(fā)送文件 16 # for line in f: 17 # s.sendall(line) 18 # data = s.recv(1024) 19 # print('recv:',data) 20 data = s.recv(1024)#接收數(shù)據(jù) 21 print('recv:', data.decode())#解碼可以顯示中文 22 s.close()?
轉(zhuǎn)載于:https://www.cnblogs.com/wt11/p/5909366.html
總結(jié)
以上是生活随笔為你收集整理的基础10 多进程、协程(multiprocessing、greenlet、gevent、gevent.monkey、select、selector)...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如果是繁體,Zzk搜不搜的到呢?
- 下一篇: XSS攻击原理、示例和防范措施 --