python_day10_并发编程
10.1 操作系統
? ?分時多道技術
10.2 進程、線程
10.3?GIL: 全局解釋鎖
10.4?鎖
????????同步鎖
????????死鎖 遞歸鎖
10.5?同步 與 異步
????????同步事件、信號量
????????隊列
10.6?生產者-消費者模型
10.7?多進程模塊
10.8?進程間通信
? ? ? ?進程隊列Queue
? ? ? ?管道
? ? ? ?manager
? ? ? ?數據同步
? ? ? ?進程池
10.1?操作系統
操作系統的作用:
? ? 1:隱藏丑陋復雜的硬件接口,提供良好的抽象接口
? ? 2:管理、調度進程,并且將多個進程對硬件的競爭變得有序
多道技術:
? ? 1.產生背景:針對單核,實現并發
? ? ps:
? ? 現在的主機一般是多核,那么每個核都會利用多道技術
? ? 有4個cpu,運行于cpu1的某個程序遇到io阻塞,會等到io結束再重新調度,會被調度到4個
? ? cpu中的任意一個,具體由操作系統調度算法決定。
? ??
? ? 2.空間上的復用:如內存中同時有多道程序
? ? 3.時間上的復用:復用一個cpu的時間片
? ? ? ?強調:遇到io切,占用cpu時間過長也切,核心在于切之前將進程的狀態保存下來,這樣
? ? ? ? ? ? 才能保證下次切換回來時,能基于上次切走的位置繼續運行
多道程序設計:
在不同用戶遇到IO進行切換操作,當用戶A IO阻塞時,切換到B用戶,當B執行完再處理A
SPOOLING: 外部設備聯機操作
分時操作系統: 多個聯機操作 + 多道程序設計? ? 類似原先網吧的分屏技術
10.2? 進程、線程?
進程
? ? 本質上就是一段程序運行過程(一段抽象概念) 就是一個過程
定義: 進程就是一個程序在數據集上的一次動態執行過程
進程一般是由程序、數據集、進程控制塊三部分組成
數據集: 程序運行過程中所需的一切數據資源
進程控制塊: 目的是用于切換,狀態保存,恢復操作
進程切換: IO切換,時間輪詢切換
進程:最小的資源單元
線程:
特點:共享整個進程的內存,數據集
單個的線程不可能脫離進程而存在
一個線程可以創建和撤銷另一個線程;同一個進程中的多個線程之間可以并發執行
?線程:最小的執行單元,微進程 (運行實例)
一個進程至少得有一個線程
程序運行在內存中,CPU在取內存中的數據拿出并執行,
1.什么是進程?進程和程序之間有什么區別?
?進程:一個程序的執行實例稱為進程;
?每個進程都提供執行程序所需的資源。
?進程有一個虛擬地址空間、可執行代碼、對系統對象的開放句柄、一個安全上下文、一個惟一的進程標識符、環境變量、一個優先級類、最小和最大工作集大小,以及至少一個執行線程;
? ? 每個進程都由一個線程啟動,這個線程通常被稱為主線程,但是可以從它的任何線程中創建額外的線程;
? ? 程序并不能單獨運行,只有將程序裝載到內存中,系統為它分配資源才能運行,而這種執行的程序就稱之為進程;
? ? 程序和進程的區別就在于:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬于動態概念。
? ? 在多道編程中,我們允許多個程序同時加載到內存中,在操作系統的調度下,可以實現并發地執行,大大提高了CPU的利用率
2.什么是線程?
? ? 進程的缺點有:
? ? ? ? 進程只能在一個時間干一件事,如果想同時干兩件事或多件事,進程就無能為力了。
? ? ? ? 進程在執行的過程中如果阻塞,例如等待輸入,整個進程就會掛起,即使進程中有些工作不依賴于輸入的數據,也將無法執行。
? ? 線程是操作系統能夠進行運算調度的最小單位。
? ? 它被包含在進程之中,是進程中的實際運作單位。
? ? 一條線程指的是進程中一個單一順序的控制流,一個進程中可以并發多個線程,每條線程并行執行不同的任務
? ? 線程是一個執行上下文,它是一個CPU用來執行指令流的所有信息。
3.進程和線程之間的關系?
? ? 線程共享創建它的進程的地址空間;進程有自己的地址空間。(內存地址)
? ? 線程可以直接訪問其進程的數據段;進程有自己的父進程數據段的副本。
? ? 線程可以直接與進程的其他線程通信;進程必須使用進程間通信來與兄弟進程通信。
? ? 新線程很容易創建;新進程需要復制父進程。
? ? 線程可以對同一進程的線程進行相當大的控制;進程只能對子進程執行控制。
? ? 對主線程的更改(取消、優先級更改等)可能會影響流程的其他線程的行為;對父進程的更改不會影響子進程。
? ??
程序計數器: 真正保存的就是一個內存地址, 上下文切換的時候在取出上一次的運行狀態,并進程恢復執行操作
############# 進程運行,生成子線程,主子線程開始順序運行,遇至 IO阻塞開始執行下一條。
import?threading import?timedef?Tv():print(time.ctime())print('look?Tv')time.sleep(2)print(time.ctime())#?生成一個對象 T=threading.Thread(target=Tv) #?跟socket一樣調用threading父類的start方法執行 T.start() print('start?')#?結果Thu?Jan?18?15:00:50?2018look?TvstartThu?Jan?18?15:00:52?2018############? ?線程之間可以并發執行? 順序執行 ,執行完主線程然后再執行子線程,直到程序退出
import?threading import?timedef?Tv():print('tv?show?start?time?',time.ctime())print('look?Tv')time.sleep(2)print('tv?show?end?time?',time.ctime())def?Eat():print('eat?start?time?',time.ctime())print('eat?')time.sleep(2)print('eat?end?time?',time.ctime())#?生成一個對象 T=threading.Thread(target=Tv) #?跟socket一樣調用threading父類的start方法執行 T.start() T2=threading.Thread(target=Eat) T2.start()print('start?')##?結果 tv?show?start?time??Thu?Jan?18?15:06:00?2018 look?Tv eat?start?time??Thu?Jan?18?15:06:00?2018 eat? start? tv?show?end?time??Thu?Jan?18?15:06:02?2018 eat?end?time??Thu?Jan?18?15:06:02?2018########? 當程序中有join時,會先執行子線程的程序,最后再去執行主線程的代碼,并發運行
import?threading import?timedef?Tv():print('tv?show?start?time?',time.ctime())print('look?Tv')time.sleep(2)print('tv?show?end?time?',time.ctime())def?Eat():print('eat?start?time?',time.ctime())print('eat?')time.sleep(2)print('eat?end?time?',time.ctime())#?生成一個對象 T=threading.Thread(target=Tv)T2=threading.Thread(target=Eat)T.start() T2.start()T.join() T2.join() print('start?')##?結果 tv?show?start?time??Thu?Jan?18?15:14:15?2018 look?Tv eat?start?time??Thu?Jan?18?15:14:15?2018 eat? tv?show?end?time??Thu?Jan?18?15:14:17?2018 eat?end?time??Thu?Jan?18?15:14:17?2018 start######## 如果join在start前,那么就會先執行子線程中的程序 然后再執行下一段子線程,就無法實現并發的效果了
setDaemon(True) 守護線程?
子線程是否要守護主線程, 當主線程退出之后, 子線程也會直接跟著退出?
其它方法:
Thread實例對象的方法
? # isAlive(): 返回線程是否活動的。
? # getName(): 返回線程名。
? # setName(): 設置線程名。
# 繼承 threading.Thread 需要重寫一下run方法
class?Mythread(threading.Thread):def?__init__(self,num):#?繼承父類的?init方法threading.Thread.__init__(self)self.num?=?num#?重寫run方法?跟sockserver的header方法一樣def?run(self):print('測試一下:?%s'?%self.num)time.sleep(2)if?__name__?==?'__main__':t1=Mythread(11111)t2=Mythread(22222)t1.start() #?使用start方法,類繼承方法必須要有run方法t2.start()print(t1.getName())?????#?Thread-1t1.setName('test_Thread')print(t1.getName())?????#?test_Threadprint('主線程跑起來')#?結果測試一下:?11111測試一下:?22222主線程跑起來10.3. python GIL全局解釋器鎖
? ? 無論你啟多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只允許一個線程運行
任務分為兩部分:IO密集型:計算密集型
sleep等同于IO操作
對于IO密集型的任務:? ? python的多線程的是有意義的
? ? 可以采用多進程+協程(也解決IO密集型問題)
對于計算密集型的任務:? python的多線程就不推薦,
10.4? 鎖
同步鎖:?
lock=threading.Lock()??創建一個對象 lock.acquire() 鎖住,只能有一個線程被執行 代碼塊 lock.release() 執行完釋放#######?例一 import?threading import?time num=100 def?Nums():global?numlock.acquire()temp=numtemp-=1#?當睡眠時間為1時,結果一定是99?同時執行100個線程,每次取出的結果都為99time.sleep(0.01)num=templock.release()print('k')s=[] #?增加同步鎖功能就跟異步類似,但它也能同時并發操作,比如在加鎖前與加鎖后進行打印。 lock=threading.Lock()???????#?同步鎖if?__name__?==?'__main__':for?i?in?range(100):i=threading.Thread(target=Nums)i.start()???#每個對象都需要start一次s.append(i)?#附加for?x?in?s:x.join()????#?最后一次的joinprint(num)??????#?主線程死鎖 遞歸鎖:
線程間共享多個資源,兩個線程分別占有一部分資源,并且同時等待對方釋放,就會造成死鎖,
#####?死鎖?例一 import?threading import?time class?Mythread(threading.Thread):def?Ltest(self):A.acquire()print(self.name,'Ltest1_A',time.ctime())time.sleep(2)print(self.name,'Ltest1_B',time.ctime())B.acquire()print(self.name,'Ltest1_A',time.ctime())time.sleep(2)print(self.name,'Ltest1_B',time.ctime())B.release()A.release()def?Ltest2(self):B.acquire()print(self.name,?'Ltest1_A',?time.ctime())time.sleep(2)print(self.name,?'Ltest1_B',?time.ctime())A.acquire()print(self.name,?'Ltest2_A',?time.ctime())time.sleep(2)print(self.name,?'Ltest2_B',?time.ctime())A.release()B.release()def?run(self):self.Ltest()self.Ltest2()li=[] A?=?threading.Lock() B?=?threading.Lock()if?__name__?==?'__main__':for?i?in?range(2):f=Mythread()f.start()li.append(f)#?執行最后一個數值for?x?in?li:x.join()#?Ltest第一次執行完所有的鎖并釋放,再執行Ltest2,B鎖開始運行,然后run方法調用Ltest方法(再次執行A鎖,A鎖運行完執行B鎖的),?死鎖就出來了,因為Ltest2的B鎖還沒有釋放##?結果Thread-1?L1_test1_A?Fri?Jan?19?17:28:29?2018Thread-1?L1_test1_B?Fri?Jan?19?17:28:31?2018Thread-1?L2_test1_A?Fri?Jan?19?17:28:31?2018Thread-1?L2_test1_B?Fri?Jan?19?17:28:33?2018Thread-1?L3_test2_A?Fri?Jan?19?17:28:33?2018Thread-2?L1_test1_A?Fri?Jan?19?17:28:33?2018Thread-1?L3_test2_B?Fri?Jan?19?17:28:35?2018Thread-2?L1_test1_B?Fri?Jan?19?17:28:35?2018## 死鎖解決辦法? 遞歸鎖
rlock 自身維護著一個計數器,每次加一個鎖conut +1? 執行完減一?
import?threading import?time class?Mythread(threading.Thread):def?Ltest(self):R.acquire()print(self.name,'L1_test1_A',time.ctime())time.sleep(2)print(self.name,'L1_test1_B',time.ctime())R.acquire()print(self.name,'L2_test1_A',time.ctime())time.sleep(2)print(self.name,'L2_test1_B',time.ctime())R.release()R.release()def?Ltest2(self):R.acquire()print(self.name,?'L3_test2_A',?time.ctime())time.sleep(2)print(self.name,?'L3_test2_B',?time.ctime())R.acquire()print(self.name,?'L4_test2_A',?time.ctime())time.sleep(2)print(self.name,?'L4_test2_B',?time.ctime())R.release()R.release()def?run(self):self.Ltest()self.Ltest2()li=[] #?A?=?threading.Lock() #?B?=?threading.Lock()R=threading.RLock()?????#?遞歸鎖if?__name__?==?'__main__':for?i?in?range(2):f=Mythread()f.start()li.append(f)#?執行最后一個數值for?x?in?li:x.join()#?結果太長了,資源都是搶占方式運行,所以顯示出來的信息是不按順序打印的10.5?同步與異步
同步 與 異步
同步: 當進程執行到IO操作(等待外部數據)的時候, 如socket recv與send一發一收時,等。同步
異步不等,一直等到數據接收成功,再回來處理
信號量和同步對象? ?# 需了解的東西
同步事件
讓兩個線程之間處于一個同步狀態
event.wait() #?等待?需要設置一個flag位,False就表示阻塞 event.set() #?如果設置這個了?就表示為True event.clear() #?將set設置為false######?例一 import?threading import?timeclass?Boss(threading.Thread):def?run(self):print('Boss:?項目急,大家在加油一下')print(events.isSet())#?創建事件對象,設置flag為Trueevents.set()time.sleep(3)print(events.isSet())print('Boss:?下班,大家辛苦啦')events.set() class?Worker(threading.Thread):def?run(self):#?默認是faluse,等待其它事件為true時執行events.wait()#?線程并發操作,沉睡時間取決于上一個線程的最大時間print('Worker:?回去又得好慢了,加油干')time.sleep(1)events.clear()events.wait()print('Worker:?下班咯')Li_dist=[] if?__name__?==?'__main__':events=threading.Event()????#?創建一個事件對象for?L?in?range(5):Li_dist.append(Worker())Li_dist.append(Boss())for?x?in?Li_dist:x.start()for?J?in?Li_dist:???????#?最后一個線程joinJ.join()print('結束循環')#?結果Boss:?項目急,大家在加油一下FalseWorker:?回去又得好慢了,加油干*5Boss:?下班,大家辛苦啦Worker:?下班咯*5結束循環信號量:
允許有多少線程同時運行
####?例一 import?threading from?time?import?sleep class?Mythread(threading.Thread):def?run(self):#?增加一把鎖,同時3次sem.acquire()print(self.name)sleep(1)#?釋放鎖sem.release()Li=[] #?定義允許同時能有多少個線程能同時運行,類似電梯,一次只能多重 sem?=?threading.Semaphore(3)if?__name__?==?'__main__':for?i?in?range(20):Li.append(Mythread())for?L?in?Li:L.start()#?結果 #?每次打印三個隊列------生產者消費者模型
# 隊列是用來解決多線程安全的,有多線程的出現才有隊列
? 隊列優先級
1、先進先出
2、先進后出
3、優先級
1、先進先出###?例一import?queue????#?線程隊列#?首先先創建一個對象?比如列表Lis=[] q=queue.Queue()q.put(123) q.put('asf') q.put({'name':'xiong'})while?1:data=q.get()print(data)print('-----')#?結果 123 ----- asf ----- {'name':?'xiong'} ----- #?需要注意的是這里并沒有打印完,它一直在這里阻塞,同步狀態,一直等待下一個值的接收#?例二:q=queue.Queue(3)#設置最大put值,當定義的put值,超過了定義的值的時候,會直接阻塞#?例三:q.put('adsf',False)??#?自已定義一個阻塞,?默認是True?raise?Fullqueue.Full#?當定義的值超過例二定義的范圍時,會報full錯誤2、先進后出 q=queue.LifoQueue(3) #?好比類似一個細長的水壺,往里放小石子,最前的就是最后倒出來的.3、優先級 import?queue q=queue.PriorityQueue() q.put([3,123])???????????????#定義優先級,值越小優先級越高 q.put([5,'asf']) q.put([4,{'name':'xiong'}]) while?1:data=q.get()print(data)print('-----')[3,?123]-----[4,?{'name':?'xiong'}]-----[5,?'asf']----- #?最后這里打印的也是會一直阻塞,等待下一個值的輸入#?其它方法 import?queue q=queue.PriorityQueue() q.put([3,123]) q.put([5,'asf']) q.put([4,{'name':'xiong'}]) print('qsize:?',q.qsize()) print('是否full:',q.full()) print('是否為空:',q.empty()) while?1:data=q.get()print(data)print('-----')#?結果 qsize:??3???????#?這里不是對象定義的值,而是根據put做出的判斷,有多少個put就有多少個值 是否full:?False?#?沒滿 是否為空:?False??#?不是為空,?可以用于其它程序的判斷,比如我只讓你輸入二個,你輸入三個就報錯 [3,?123] ----- [4,?{'name':?'xiong'}] ----- [5,?'asf'] -----q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大于從隊列中刪除項目的數量,將引發ValueError異常 q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止10.6?生產者-消費者模型
###?例一 import?queue import?time import?threading #?生成一個線程對象 q=queue.Queue()def?Producer(name):count=0while?count?<?10:time.sleep(2)q.put(count)#?q.task_done()???????#?發送包子已經制作好print('Producer?%s?makeing?%s?baizi'%(name,count))q.join()print('realy?make?baizi.....')count+=1def?Consumer(name):count=0while?count?<?10:time.sleep(3)???????#?等待#?if?not?q.empty(): #?判斷如果是空?那么就應該顯示沒有包子#?????print('wait........')#?q.join()????????????#?等待生產者發送包子作好的消息#?消費者開始獲取包子data=q.get()#?消費發送信息到隊列中說包子已經開吃了,?生產者又繼續開始制作下一個包子q.task_done() ??print('eat.....')print('Consumer?%s?eat?%s?baizi'%(name,data))#?else:#?????print('not?baizi')count+=1P1=threading.Thread(target=Producer,args=('A')) C1=threading.Thread(target=Consumer,args=('B')) C2=threading.Thread(target=Consumer,args=('C')) C3=threading.Thread(target=Consumer,args=('D')) P1.start() C1.start() C3.start() C2.start()10.7?多進程模塊
多進程模塊? multiprocessing
開啟10進程以內可接受,原因python因為GIL一次只能運行一個線程,嚴重導致了多核CPU的使用
# 例一
# 導入多進程模塊
import?multiprocessing import?os def?Mympro(name):print('name?%s'%name)print('master?ppid:?',os.getppid())print('slave?pid:?',os.getpid())if?__name__?==?'__main__':p=multiprocessing.Process(target=Mympro,args=('xiong',)) #?嚴重注意的是:?args傳遞參數需要有一個逗號?,??不然會報格式錯誤p.start()p.join()#?結果 name?xiong master?ppid:??7648 #?父進程id slave?pid:??7656 #?子進程id### 例二 繼承
from?multiprocessing?import?Process import?os class?Mympro(Process):def?__init__(self,name):super(Mympro,self).__init__()#?Process().__init__(self)self.name=namedef?run(self):print('name:?',self.name)print('master?ppid:?',os.getppid())print('slave?pid:?',os.getpid())if?__name__?==?'__main__':M=Mympro('xiong')M.start()M.join()#?結果 name:??xiong master?ppid:??3320 slave?pid:??7516# 進程
實例方法:
run(): start()調用run方法
terminate(): 不管任務是否完成,立即停止工作過程
屬性
daemon: 跟線程的setDaemon一樣
進程間通信:
進程互相通信,共享通信 ,隊列與管道只能實現數據交互,沒有實現數據共享,manager可以用于數據之間共享
1、進程隊列Queue
2、管道
3、manager
4、數據同步
5、進程池
2、管道
from?multiprocessing?import?Process,Pipe #?駝峰的還是比較漂亮的def?MultiP(conn):conn.send([123,{'12':'33'}])data=conn.recv()print(data)print('son?id:?',id(conn))conn.close()if?__name__?==?'__main__':#?這個Pipe類似于socket的conn一樣,前面是描述信息,后面是地址#?這里創建的是兩個對象,主一個,子一個用于進程間傳遞信息main_conduit,?son_conduit?=?Pipe()M=Process(target=MultiP,args=(son_conduit,))M.start()print(main_conduit.recv())main_conduit.send('11111') #?可以是字符串,列表,字典等M.join()print('main?id:?',?id(son_conduit))#?結果 [123,?{'12':?'33'}] 11111 son?id:??47170728 main?id:??437191443、manager
from?multiprocessing?import?Process,Managerdef?MultiP(Li,Di,i):Di['2']='2'Di[i]='1'Li.append(i)lis=[] if?__name__?==?'__main__': #?????#?manager=Manager()with?Manager()?as?manager:Li=manager.list(range(5))Di=manager.dict()for?i?in?range(5):#?必須先將定義的manager的字典傳遞到進進程才能操作,類似淺拷貝M=Process(target=MultiP,args=(Li,Di,i))M.start()lis.append(M)for?x?in?lis:#?進程隊列需要先運行子進程然后再運行父進程,否則會報錯x.join()print(Di)print(Li)#?結果{'2':?'2',?0:?'1',?1:?'1',?2:?'1',?4:?'1',?3:?'1'}[0,?1,?2,?3,?4,?0,?1,?2,?4,?3]4、數據同步
當加了一把同步鎖之后, 不管是進程還是線程 都是串行
from?multiprocessing?import?Process,Lock from?time?import?sleepdef?MultiP(lock,nums):lock.acquire()sleep(1)print('main?id:?%s'?%nums)lock.release()if?__name__?==?'__main__':lock=Lock()#?啟動10個進程li=[]for?i?in?range(10):M=Process(target=MultiP,args=(lock,i))M.start()li.append(M)for?L?in?li:L.join()#?結果:??數據同步是同步方式,這是為了保證數據安全,類似tcp操作5、進程池
apply? ? 同步
apply_async? ?異步
# 回調函數: 就是某個動作或者函數執行成功后,再去執行的函數
好處: 主進程直接調用回調函數,節省內存資源,如日志?
回調函數接收的值來自于 子進程的 return值
from?multiprocessing?import?Process,Pool from?time?import?sleep import?os def?Foo(nums):sleep(1)print('son?process:',os.getpid())print(nums)return?'Foo:?%s'%nums#?這里的args接受的值來自于Foo函數的return值 def?Pro(args):print(args)if?__name__?==?'__main__':#?定義最大進程池,同時能打開多少個進程,默認是機器的CPU核數pool=Pool(5)for?i?in?range(20):#?異步進程池,callback回調函數,這里回調函數是由主線程運行的pool.apply_async(func=Foo,args=(i,),callback=Pro)#?這里的格式是固定的?close?,?joinpool.close()pool.join()print('end')#?打印結果:??每次五個?首先先打印子進程內容,然后再打印回調函數,依次執行完進程,最后打印end6、協程? ? 協作式, 非搶占式
用戶態: 通過用戶自己進行操作的模式
協程主要解決的也是IO操作的時候
協程:本質上也是一個線程
協程的優勢:?
1、沒有切換的消耗
2、沒有鎖的概念 (本身就是一個線程,遇到IO就切換)
缺點: 不能使用多核,(單線程)? 但可以采用多進程+協程, 一個很好的解決并發的方案
python自帶的yield ####?yield基礎用法?例一:def?Yi():print('ok')while?True:x=yieldprint(x)y=Yi()next(y) y.send(1) #?發送一個值到yield?x接受并打印,沒有while就會直接打印出?StopIteration####?例二 def?Yi():print('ok')while?True:x=yieldprint('resv?from?Y2?value:?%s'%x)def?Y2():next(y)n=0while?n<=4:print('relay?send?Yi?value?%s'%(n))y.send(n)n+=2if?__name__?==?'__main__':y=Yi()p=Y2()#?結果ok #?使用next打印生成器中的值relay?send?Yi?value?0 #?send發送值到Y1,yield接收并賦值給x,然后打印resv?from?Y2?value:?0 #?使用的是同一個線程,所以沒有切換的損耗relay?send?Yi?value?2resv?from?Y2?value:?2relay?send?Yi?value?4resv?from?Y2?value:?4greenlet
switch()? 啟動 切換
缺點: 每一個需求都需要手動switch切換
py3.6沒有這個模塊了
gevent? ?重要
轉載于:https://blog.51cto.com/xiong51/2062515
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的python_day10_并发编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Nginx 实现AJAX跨域请求
- 下一篇: margin-塌陷