python使用协程实现udp_python-socket和进程线程协程(代码展示)
#一、進(jìn)程Process#在windows中使用需要注意#在Windows操作系統(tǒng)中由于沒有fork(linux操作系統(tǒng)中創(chuàng)建進(jìn)程的機制),在創(chuàng)建子進(jìn)程的時候會自動 import 啟動它的這個文件,而在 import 的時候又執(zhí)行了整個文件。因此如果將process()直接寫在文件中就會無限遞歸創(chuàng)建子進(jìn)程報錯。#所以必須把創(chuàng)建子進(jìn)程的部分使用if __name__ =='__main__' 判斷保護(hù)起來,import 的時候 ,就不會遞歸運行了。#join:父進(jìn)程等待子進(jìn)程結(jié)束后才繼續(xù)執(zhí)行自己后續(xù)的代碼
importtimeimportrandomfrom multiprocessing importProcessdeffunc(index):
time.sleep(random.randint(1, 3))print('第%s封郵件發(fā)送完畢' %index)if __name__ == '__main__':
p_lst=[]for i in range(10):
p= Process(target=func, args=(i,))
p.start()#先讓所有子進(jìn)程都啟動
p_lst.append(p)for p in p_lst: #再進(jìn)行join阻塞
p.join()print('10封郵件全部發(fā)送完畢')#守護(hù)進(jìn)程#主進(jìn)程創(chuàng)建守護(hù)進(jìn)程#1:守護(hù)進(jìn)程會在主進(jìn)程代碼執(zhí)行結(jié)束后就終止#2:守護(hù)進(jìn)程內(nèi)無法再開啟子進(jìn)程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children#注意:進(jìn)程之間是互相獨立的,主進(jìn)程代碼運行結(jié)束,守護(hù)進(jìn)程隨即終止
#1,守護(hù)進(jìn)程會在主進(jìn)程代碼執(zhí)行結(jié)束后就終止
importtimefrom multiprocessing importProcessdeffunc():print('子進(jìn)程 start')
time.sleep(3) #睡3秒的時候主進(jìn)程的代碼已經(jīng)執(zhí)行完畢了,所以子進(jìn)程也會跟著結(jié)束
print('子進(jìn)程end')if __name__ == '__main__':
p= Process(target=func)
p.daemon= True #daemon是Process的屬性
p.start()
time.sleep(2) #睡2秒的時候,執(zhí)行了子進(jìn)程
print('主進(jìn)程')#結(jié)果:#子進(jìn)程 start#主進(jìn)程
#鎖#加鎖降低了程序的效率,讓原來能夠同時執(zhí)行的代碼變成順序執(zhí)行了,異步變同步的過程#保證了數(shù)據(jù)的安全
importtimeimportjsonfrom multiprocessing importProcessfrom multiprocessing import Lock #導(dǎo)入Lock類
defsearch(person):
with open('ticket') as f:
ticketinfo=json.load(f)print('%s查詢余票:' % person, ticketinfo['count'])defget_ticket(person):
with open('ticket') as f:
ticketinfo=json.load(f)
time.sleep(0.2) #模擬讀數(shù)據(jù)的網(wǎng)絡(luò)延遲
if ticketinfo['count'] >0:print('%s買到票了' %person)
ticketinfo['count'] -= 1time.sleep(0.2)
with open('ticket', 'w') as f:
json.dump(ticketinfo, f)else:print('%s沒買到票' %person)defticket(person, lock):
search(person)
lock.acquire()#誰獲得鑰匙 誰才能進(jìn)入
get_ticket(person)
lock.release()#用完了,把鑰匙給下一個人
if __name__ == '__main__':
lock= Lock() #創(chuàng)建一個鎖對象
for i in range(5):
p= Process(target=ticket, args=('person%s' %i, lock))
p.start()#結(jié)果:#person1查詢余票: 3#person3查詢余票: 3#person0查詢余票: 3#person2查詢余票: 3#person4查詢余票: 3#person1買到票了#person3買到票了#person0買到票了#person2沒買到票#person4沒買到票
#1、信號量的實現(xiàn)機制:計數(shù)器 + 鎖實現(xiàn)的#信號量同步基于內(nèi)部計數(shù)器,每調(diào)用一次acquire(),計數(shù)器減1;每調(diào)用一次release(),計數(shù)器加1.當(dāng)計數(shù)器為0時,acquire()#調(diào)用被阻塞。#互斥鎖同時只允許一個線程更改數(shù)據(jù),而信號量Semaphore是同時允許一定數(shù)量的線程更改數(shù)據(jù)(Samphore相當(dāng)于有幾把鑰匙,lock只能有一把鑰匙)
importtimeimportrandomfrom multiprocessing importProcessfrom multiprocessing importSemaphoredef changba(person, sem): #在唱吧 唱歌
sem.acquire() #第一次可以同時進(jìn)來兩個人
print('%s走進(jìn)唱吧' %person)
time.sleep(random.randint(3, 6)) #每個人唱歌的時間
print('%s走出唱吧' % person) #唱完走人
sem.release() #把鑰匙給下一個人
if __name__ == '__main__':
sem= Semaphore(2) #2把鑰匙
for i in range(5):
p= Process(target=changba, args=('person%s' %i, sem))
p.start()#事件 Event#事件主要提供了三個方法#set、wait、clear。#事件處理的機制:全局定義了一個“Flag”,#如果“Flag”值為False,那么當(dāng)程序執(zhí)行event.wait方法時就會阻塞,#如果“Flag”值為True,那么event.wait方法時便不再阻塞。
#阻塞事件 :wait()方法#wait是否阻塞是看event對象內(nèi)部的Flag
#控制Flag的值:#set() 將Flag的值改成True#clear() 將Flag的值改成False#is_set() 判斷當(dāng)前的Flag的值
#紅綠燈:
importtimeimportrandomfrom multiprocessing importProcessfrom multiprocessing importEventdef traffic_ligth(e): #紅綠燈
print('\033[31m紅燈亮\033[0m') #Flag 默認(rèn)是False
whileTrue:if e.is_set(): #如果是綠燈
time.sleep(2) #2秒后
print('\033[31m紅燈亮\033[0m') #轉(zhuǎn)為紅燈
e.clear() #設(shè)置為False
else: #如果是紅燈
time.sleep(2) #2秒后
print('\033[32m綠燈亮\033[0m') #轉(zhuǎn)為綠燈
e.set() #設(shè)置為True
def car(e, i): #車
if note.is_set():print('car %s在等待' %i)
e.wait()print('car %s 通過了' %i)if __name__ == '__main__':
e=Event()
p= Process(target=traffic_ligth, args=(e,)) #紅綠燈進(jìn)程
p.daemon =True
p.start()
p_lst=[]for i in range(10): #10輛車的進(jìn)程
time.sleep(random.randrange(0, 3, 2))
p= Process(target=car, args=(e, i))
p.start()
p_lst.append(p)for p inp_lst: p.join()#二、進(jìn)程通信#隊列(先進(jìn)先出)#生產(chǎn)者消費者模型
importtimeimportrandomfrom multiprocessing importProcess, Queuedefconsumer(q, name):#消費者
whileTrue:
food= q.get() #在隊列中取值
if food is None: breaktime.sleep(random.uniform(0.3, 1)) #模擬吃消耗的時間
print('%s偷吃了%s,快打死他' %(name, food))defproducter(q, name, food):#生產(chǎn)者
for i in range(10):
time.sleep(random.uniform(0.5, 0.9)) #模擬生產(chǎn)時間
print('%s生產(chǎn)了%s,序號:%s' %(name, food, i))
q.put(food+ str(i)) #把值存入隊列中
if __name__ == '__main__':
q= Queue() #Queue隊列對象
c1 = Process(target=consumer, args=(q, '小明'))
c2= Process(target=consumer, args=(q, '小東'))
c1.start()
c2.start()
p1= Process(target=producter, args=(q, '張三', '面包'))
p2= Process(target=producter, args=(q, '李四', '可樂'))
p1.start()
p2.start()
p1.join()
p2.join()
q.put(None)#有幾個consumer進(jìn)程就需要放幾個None,表示生產(chǎn)完畢(這就有點low了)
q.put(None)#JoinableQueue#JoinableQueue和Queue幾乎一樣,不同的是JoinableQueue隊列允許使用者告訴隊列某個數(shù)據(jù)已經(jīng)處理了。通知進(jìn)程是使用共享的信號和條件變量來實現(xiàn)的。#task_done():使用者使用此方法發(fā)出信號,表示q.get()返回的項目已經(jīng)被處理#join():當(dāng)隊列中有數(shù)據(jù)的時候,使用此方法會進(jìn)入阻塞,直到放入隊列中所有的數(shù)據(jù)都被處理掉(都被task_done)才轉(zhuǎn)換成不阻塞
#解決剛才生產(chǎn)者消費者模型low的問題:
importtimeimportrandomfrom multiprocessing importProcess, JoinableQueuedefconsumer(jq, name):#消費者
whileTrue:
food= jq.get() #在隊列中取值
#if food is None:break
time.sleep(random.uniform(0.3, 1)) #模擬吃消耗的時間
print('%s偷吃了%s,快打死他' %(name, food))
jq.task_done()#向jq.join()發(fā)送一次信號,證明這個數(shù)據(jù)已經(jīng)處理了
defproducter(jq, name, food):#生產(chǎn)者
for i in range(10):
time.sleep(random.uniform(0.5, 0.9)) #模擬生產(chǎn)時間
print('%s生產(chǎn)了%s,序號:%s' %(name, food, i))
jq.put(food+ str(i)) #把值存入隊列中
if __name__ == '__main__':
jq=JoinableQueue()
c1= Process(target=consumer, args=(jq, '小明'))
c2= Process(target=consumer, args=(jq, '小東'))
c1.daemon= True #把消費者設(shè)置為守護(hù)進(jìn)程
c2.daemon =True
c1.start()
c2.start()
p1= Process(target=producter, args=(jq, '張三', '面包'))
p2= Process(target=producter, args=(jq, '李四', '可樂'))
p1.start()
p2.start()
p1.join()
p2.join()
jq.join()#數(shù)據(jù)全部被task_done后才不阻塞
#管道#Pipe([duplex]):在進(jìn)程之間創(chuàng)建一條管道,并返回元組(left,right),其中l(wèi)eft,right表示管道兩端的連接對象,強調(diào)一點:必須在產(chǎn)生Process對象之前產(chǎn)生管道#duplex:默認(rèn)管道是全雙工的,如果將duplex改成False,left只能用于接收,right只能用于發(fā)送。#主要方法:#right.recv(): 接收left.send()發(fā)送的內(nèi)容。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經(jīng)關(guān)閉,那么recv方法會拋出EOFError。#letf.send(): 通過連接發(fā)送內(nèi)容。#close(): 關(guān)閉連接。
#pipe的端口管理不會隨著某一個進(jìn)程的關(guān)閉就關(guān)閉#操作系統(tǒng)來管理進(jìn)程對這些端口的使用,不使用的端口應(yīng)該關(guān)閉它#一條管道,兩個進(jìn)程,就有4個端口 每關(guān)閉一個端口計數(shù)-1,直到只剩下一個端口的時候 recv就會報錯#如果不關(guān)閉不使用的端口,在已經(jīng)把數(shù)據(jù)發(fā)送完畢的情況下,那么接收端的recv就會一直掛起,等待接收數(shù)據(jù),這個進(jìn)程就一直不能關(guān)閉#因此不使用的端口就應(yīng)該關(guān)閉它,讓recv拋出異常后對這個進(jìn)程進(jìn)行處理
from multiprocessing importProcess, Pipedefconsumer(left, right):
left.close()#若這里不close,則不會異常EOFError,數(shù)據(jù)接收完畢后,下面的right.recv()就會一直掛起
whileTrue:try:print(right.recv())exceptEOFError:break
if __name__ == '__main__':
left, right=Pipe()
Process(target=consumer, args=(left, right)).start()
right.close()for i in range(10):
left.send('Apple%s' %i)
left.close()#三、進(jìn)程池#同步、apply
importosimporttimefrom multiprocessing importPooldeftest(num):
time.sleep(1)print('%s:%s' %(num, os.getpid()))return num * 2
if __name__ == '__main__':
p=Pool()for i in range(20):
res= p.apply(test, args=(i,)) #提交任務(wù)的方法 同步提交
print('-->', res) #res就是test的return的值
#異步、apply_async
importtimefrom multiprocessing importPooldeffunc(num):
time.sleep(1)print('做了%s件衣服' %num)if __name__ == '__main__':
p= Pool(4) #進(jìn)程池中創(chuàng)建4個進(jìn)程,不寫的話,默認(rèn)值為你電腦的CUP數(shù)量
for i in range(50):
p.apply_async(func, args=(i,)) #異步提交func到一個子進(jìn)程中執(zhí)行,沒有返回值的情況
p.close() #關(guān)閉進(jìn)程池,用戶不能再向這個池中提交任務(wù)了
p.join() #阻塞,直到進(jìn)程池中所有的任務(wù)都被執(zhí)行完
#注意:#異步提交且沒有返回值接收的情況下必須要用close()和join()#因為如果沒有close()和join(),主進(jìn)程執(zhí)行完畢后會立刻把子進(jìn)程回收了,相當(dāng)于子進(jìn)程還沒來得及開啟#所以要join,讓子進(jìn)程結(jié)束后再結(jié)束父進(jìn)程,但是進(jìn)程池中要使用join就必須先進(jìn)行close
importtimeimportosfrom multiprocessing importPooldeftest(num):
time.sleep(1)print('%s:%s' %(num, os.getpid()))return num * 2
if __name__ == '__main__':
p=Pool()
res_lst=[]for i in range(20):
res= p.apply_async(test, args=(i,)) #提交任務(wù)的方法 異步提交
res_lst.append(res)for res inres_lst:print(res.get())#注意:#異步提交有返回值的情況下,res是一個對象代表的是這個任務(wù)的編號,需要用res.get()方法讓任務(wù)執(zhí)行且把返回值返回給你。#get有阻塞效果,拿到子進(jìn)程的返回值后才不阻塞,所以并不需要再使用close和join。
#map#map接收一個函數(shù)和一個可迭代對象,是異步提交的簡化版本,自帶close和join方法#可迭代對象的每一個值就是函數(shù)接收的實參,可迭代對象的長度就是創(chuàng)建的任務(wù)數(shù)量#map拿到返回值是所有結(jié)果組成的列表
importtimefrom multiprocessing importPooldeffunc(num):print('子進(jìn)程:', num)#time.sleep(1)
returnnumif __name__ == '__main__':
p=Pool()
ret= p.map(func, range(10)) #ret是列表
for i inret:print('返回值:', i)#進(jìn)程池的回調(diào)函數(shù)(同步提交apply沒有回調(diào)函數(shù))
importosfrom multiprocessing importPooldeffunc(i):print('子進(jìn)程:', os.getpid())returnidefcall_back(res):print('回調(diào)函數(shù):', os.getpid())print('res--->', res)if __name__ == '__main__':
p=Pool()print('主進(jìn)程:', os.getpid())
p.apply_async(func, args=(1,), callback=call_back) #callback關(guān)鍵字傳參,參數(shù)是回調(diào)函數(shù)
p.close()
p.join()#結(jié)果:#主進(jìn)程: 4732#子進(jìn)程: 10552#回調(diào)函數(shù): 4732#res---> 1#
#從結(jié)果可以看出:#子進(jìn)程func執(zhí)行完畢之后才去執(zhí)行callback回調(diào)函數(shù)#子進(jìn)程func的返回值會作為回調(diào)函數(shù)的參數(shù)#回調(diào)函數(shù)是在主進(jìn)程中執(zhí)行的
總結(jié)
以上是生活随笔為你收集整理的python使用协程实现udp_python-socket和进程线程协程(代码展示)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RTX5 | 线程管理02 - 创建线程
- 下一篇: 判断form表单里面的元素属性是否有数据