线程队列 线程池 协程
1 . 線程隊(duì)列
from multiprocessing Queue , JoinableQueue? #進(jìn)程IPC隊(duì)列
from queue import Queue? #線程隊(duì)列? 先進(jìn)先出
from queue import LifoQueue? #后進(jìn)先出的
方法都是一樣的 : put , get , put_nowait , get_nowait , full , empty , qsize
隊(duì)列 Queue : 先進(jìn)先出 , 自帶鎖 , 數(shù)據(jù)安全
棧 LifoQueue : 后進(jìn)先出 , 自帶鎖 , 數(shù)據(jù)安全
q = LifoQueue(5)
q.put(3)
q.put_nowait(4)
print(q.get()) #誰(shuí)最后進(jìn)的,就會(huì)先取誰(shuí)
q.get_nowait()
print(q.full())
pirnt(q.empty())
print(q.qsize())
from queue import PriorityQueue? #優(yōu)先級(jí)隊(duì)列
q = PriorityQueue()
q.put((10,"aaa"))
q.put((4,"bbb"))
print(q.get())
?
線程池
Threading 沒(méi)有線程池的
Multiprocessing Pool
concurrent.futures? 幫助管理線程池和進(jìn)程池
import time
from threading import currentThread,get_ident
from concurrent.futures import ThreadPoolExecutor? #幫助啟動(dòng)線程池
from concurrent.futures import ProcessPoolExecutor? #幫助啟動(dòng)進(jìn)程池
def func(i):
time.sleep(1)
print("in%s%s"%(i,currentThread()))
return i**2
def back(fn):
print(fn.result(),currentThread())
#map啟動(dòng)多線程任務(wù)
t = ThreadPoolExecutor(5)
t.map(func,range(20))?? ==? ? ?for i in range(20):
? ? t.submit(func,i)
#submit異步提交任務(wù)
t = ThreadPoolExecutor(5)
for i in range(20):
t.submit(func,i)
t.shutdown()
print("main:",currentThread()) #起多少個(gè)線程池 , 5*CPU的個(gè)數(shù)
#獲取任務(wù)結(jié)果
t = ThreadPoolExecutor(5)
li = []
for i in range(20):
ret = t.submit(func,i)
li.append(ret)
t.shutdown()
for i in li:
print(i.result())
print("main:",currentThread())
#回調(diào)函數(shù)
t = ThreadPoolExecutor(5)
for i in range(20):
t.submit(func,i).add_done_callback(back)
#回調(diào)函數(shù)(進(jìn)程版)
import os,time
from concurrent.futures import ProcessPoolExecutor
def func(i):
print("in%s%s"%(i,os.getpid()))
return i**2
def back(fn):
print(fn.result(),os.getpid())
if __name__ == "__main__":
p = ProcessPoolExecutor(5)
for i in range(20):
p.submit(func,i).add_done_callback(back)
print("main:",os.getpid())
multiprocessing模塊自帶進(jìn)程池的
threading模塊是沒(méi)有線程池的
concurrent.futures 進(jìn)程池和線程池 : 高度封裝 , 進(jìn)程池/線程池的統(tǒng)一的使用方式
創(chuàng)建線程池/進(jìn)程池 ProcessPoolExecutor? ThreadPoolExecutor
ret .result()獲取結(jié)果,如果想實(shí)現(xiàn)異步效果,應(yīng)該使用列表
shutdown? == close + join 同步控制
add_done_callback 回調(diào)函數(shù),在回調(diào)函數(shù)內(nèi)接收的參數(shù)是一個(gè)對(duì)象,需要通過(guò)result來(lái)獲取返回值. 進(jìn)程池的回調(diào)函數(shù)仍然在主進(jìn)程中執(zhí)行,但是線程池的回調(diào)函數(shù)是在線程中執(zhí)行.
進(jìn)程 : 資源分配的最小單位? ,? 班級(jí)
線程 : CPU調(diào)度最小單位 , 人
Cpython線程不能利用多核的 ,多線程無(wú)法利用多核 ,一個(gè)線程能同時(shí)執(zhí)行多個(gè)任務(wù).
協(xié)程 : 能在一條線程的基礎(chǔ)上 ,再過(guò)個(gè)任務(wù)之間互相切換 .節(jié)省了線程開(kāi)啟的消耗.?
協(xié)程是從python代碼的級(jí)別調(diào)度的 , 正常的線程是CPU調(diào)度的最小單位 ; 協(xié)程的調(diào)度并不是由操作系統(tǒng)來(lái)完成的.
之前學(xué)過(guò)的協(xié)程在兩個(gè)任務(wù)之間相互切換的是生成器函數(shù):yield
def func():
print(1)
x = yield "aaa"
print(x)
yield? "bbb"
g? = func()
print(next(g))
print(g.send("***"))
在多個(gè)函數(shù)之間互相切換的功能? ---? 協(xié)程
def consumer():
while True:
x = yield
print(x)
def producer():
g = consumer()
next(g)? ? #預(yù)激
for i in range(20):
g.send(i)
producer()
yield只有程序之間的切換,沒(méi)有重利用任何IO操作的時(shí)間
模塊的安裝 :
pip3 install 要安裝的模塊的名字
?
使用協(xié)程減少IO操作帶來(lái)的時(shí)間消耗
from gevent import monkey ; monkey.patch_all()
import gevent,time
def eat():
print("吃")
time.sleep(2)
print("吃完了")
def play():
print("玩")
time.sleep(1)
print("玩完了")
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
gevent.joinall([g1,g2])
沒(méi)有執(zhí)行 , 為什么沒(méi)執(zhí)行???是需要開(kāi)啟么?
沒(méi)有開(kāi)啟但是切換了
gevent幫我們做了切換,做切換是有條件的,遇到IO才切換
gevent不認(rèn)識(shí)除了gevent這個(gè)模塊內(nèi)以外的IO操作
使用join可以一直阻塞直到協(xié)程任務(wù)完成
幫助gevent來(lái)認(rèn)識(shí)其他模塊中的阻塞
from gevent import monkey;monkey.patch_all() 寫(xiě)在其他模塊導(dǎo)入之前.
使用協(xié)程來(lái)實(shí)現(xiàn)TCP協(xié)議 :
服務(wù)器 :
from gevent import monkey;monkey.patch_all()
import gevent,socket
def func(conn):
while 1:
conn.send(b"hello")
print(conn.recv(1024))
sk = socket.socket()
sk.bind(("127.0.0.1",9090))
sk.listen()
while 1:
conn,addr = sk.accept()
gevent.spawn(func,conn)
客戶端 :
import socket
from threading import Thread
def client():
sk = socket.socket()
sk.connect(("127.0.0.1",9090))
while 1:
print(sk.recv(1024))
sk.send(b"bye")
for i in range(20):
Thread(target=client).start()
?
4c 并發(fā)50000? qps
5個(gè)進(jìn)程
20個(gè)線程
500個(gè)協(xié)程
協(xié)程能夠在單核的情況極大地提高CPU的利用率
協(xié)程不存在數(shù)據(jù)不安全 , 也不存在線程切換/創(chuàng)造的時(shí)間開(kāi)銷 ; 切換時(shí)用戶級(jí)別的,程序不會(huì)因?yàn)閰f(xié)程中某一個(gè)任務(wù)進(jìn)入阻塞狀態(tài)而使整條線程阻塞
線程的切換 :
時(shí)間片到了 降低了CPU的效率
IO會(huì)切? 提高了CPU的效率
?
轉(zhuǎn)載于:https://www.cnblogs.com/fengkun125/p/9403360.html
總結(jié)
以上是生活随笔為你收集整理的线程队列 线程池 协程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: FileChannel与ByteBuff
- 下一篇: Python3 列表的基本操作