Python多线程原理与实现
Date: 2019-06-04
Author: Sun
Python多線程原理與實戰
目的:
(1)了解python線程執行原理
(2)掌握多線程編程與線程同步
(3)了解線程池的使用
1 線程基本概念
1.1 線程是什么?
線程是指進程內的一個執行單元,也是進程內的可調度實體.
與進程的區別:
(1) 地址空間:進程內的一個執行單元;進程至少有一個線程;它們共享進程的地址空間;而進程有自己獨立的地址空間;
(2) 資源擁有:進程是資源分配和擁有的單位,同一個進程內的線程共享進程的資源
(3) 線程是CPU處理器調度的基本單位,但進程不是.
(4) 二者均可并發執行.
簡而言之,一個程序至少有一個進程,一個進程至少有一個線程.
線程的劃分尺度小于進程,使得多線程程序的并發性高。
另外,進程在執行過程中擁有獨立的內存單元,而多個線程共享內存,從而極大地提高了程序的運行效率。
1.2 線程和進程關系?
? 進程就是一個應用程序在處理機上的一次執行過程,它是一個動態的概念,而線程是進程中的一部分,進程包含多個線程在運行。
? 多線程可以共享全局變量,多進程不能。多線程中,所有子線程的進程號相同;多進程中,不同的子進程進程號不同。
? 進程是具有一定獨立功能的程序關于某個數據集合上的一次運行活動,進程是系統進行資源分配和調度的一個獨立單位.
? 線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.線程自己基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程序計數器,一組寄存器和棧),但是它可與同屬一個進程的其他的線程共享進程所擁有的全部資源.
? 一個線程可以創建和撤銷另一個線程;同一個進程中的多個線程之間可以并發執行.
?
2 Python線程模塊
? python主要是通過thread和threading這兩個模塊來實現多線程支持。python的thread模塊是比較底層的模塊,python的threading模塊是對thread做了一些封裝,可以更加方便的被使用。但是python(cpython)由于GIL的存在無法使用threading充分利用CPU資源,如果想充分發揮多核CPU的計算能力需要使用multiprocessing模塊(Windows下使用會有諸多問題)。
2.1 如何創建線程
? python3.x中已經摒棄了Python2.x中采用函數式thread模塊中的start_new_thread()函數來產生新線程方式。
? python3.x中通過threading模塊創建新的線程有兩種方法:一種是通過threading.Thread(Target=executable Method)-即傳遞給Thread對象一個可執行方法(或對象);第二種是繼承threading.Thread定義子類并重寫run()方法。第二種方法中,唯一必須重寫的方法是run().
(1)通過threading.Thread進行創建多線程
import threading import time def target():print("the current threading %s is runing"%(threading.current_thread().name))time.sleep(1)print("the current threading %s is ended"%(threading.current_thread().name))print("the current threading %s is runing"%(threading.current_thread().name)) ## 屬于線程t的部分 t = threading.Thread(target=target) t.start() ## 屬于線程t的部分 t.join() # join是阻塞當前線程(此處的當前線程時主線程) 主線程直到Thread-1結束之后才結束 print("the current threading %s is ended"%(threading.current_thread().name))(2)通過繼承threading.Thread定義子類創建多線程
? 使用Threading模塊創建線程,直接從threading.Thread繼承,然后重寫__init__方法和run方法:
import threading import timeclass myThread(threading.Thread): # 繼承父類threading.Threaddef __init__(self, threadID, name, counter):threading.Thread.__init__(self)self.threadID = threadIDself.name = nameself.counter = counterdef run(self): # 把要執行的代碼寫到run函數里面 線程在創建后會直接運行run函數print("Starting " + self.name)print_time(self.name, self.counter, 5)print("Exiting " + self.name)def print_time(threadName, delay, counter):while counter:time.sleep(delay)print("%s process at: %s" % (threadName, time.ctime(time.time())))counter -= 1thread1 = myThread(1, "Thread-1", 1) # 創建新線程 thread2 = myThread(2, "Thread-2", 2)thread1.start() # 開啟線程 thread2.start()thread1.join() # 等待線程結束 thread2.join() print("Exiting Main Thread")通過以上案例可以知道,thread1和thread2執行順序是亂序的。要使之有序,需要進行線程同步
3 線程間同步
? 如果多個線程共同對某個數據修改,則可能出現不可預料的結果,為了保證數據的正確性,需要對多個線程進行同步。
? 使用Thread對象的Lock和Rlock可以實現簡單的線程同步,這兩個對象都有acquire方法和release方法,對于那些需要每次只允許一個線程操作的數據,可以將其操作放到acquire和release方法之間。
? 需要注意的是,Python有一個GIL(Global Interpreter Lock)機制,任何線程在運行之前必須獲取這個全局鎖才能執行,每當執行完100條字節碼,全局鎖才會釋放,切換到其他線程執行。
3.1 線程同步問題
多線程實現同步有四種方式:
鎖機制,信號量,條件判斷和同步隊列。
下面我主要關注兩種同步機制:鎖機制和同步隊列。
(1)鎖機制
threading的Lock類,用該類的acquire函數進行加鎖,用realease函數進行解鎖
import threading import time class myThread(threading.Thread):def __init__(self, threadID, name, counter):threading.Thread.__init__(self)self.threadID = threadIDself.name = nameself.counter = counterdef run(self):print("Starting " + self.name)# 獲得鎖,成功獲得鎖定后返回True# 可選的timeout參數不填時將一直阻塞直到獲得鎖定# 否則超時后將返回FalsethreadLock.acquire()print_time(self.name, self.counter, 5)# 釋放鎖threadLock.release() def print_time(threadName, delay, counter):while counter:time.sleep(delay)print("%s: %s" % (threadName, time.ctime(time.time())))counter -= 1 threadLock = threading.Lock() threads = [] thread1 = myThread(1, "Thread-1", 1) # 創建新線程 thread2 = myThread(2, "Thread-2", 2) thread1.start() # 開啟新線程 thread2.start() threads.append(thread1) # 添加線程到線程列表 threads.append(thread2) for t in threads: # 等待所有線程完成t.join() print("Exiting Main Thread")?
(2) 線程同步隊列queue
python2.x中提供的Queue, Python3.x中提供的是queue
見import queue.
Python的queue模塊中提供了同步的、線程安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(后入先出)隊列LifoQueue,和優先級隊列PriorityQueue。這些隊列都實現了鎖原語,能夠在多線程中直接使用。可以使用隊列來實現線程間的同步。
queue模塊中的常用方法:
- queue.qsize() 返回隊列的大小
- queue.empty() 如果隊列為空,返回True,反之False
- queue.full() 如果隊列滿了,返回True,反之False
- queue.full 與 maxsize 大小對應
- queue.get([block[, timeout]])獲取隊列,timeout等待時間
- queue.get_nowait() 相當Queue.get(False)
- queue.put(item) 寫入隊列,timeout等待時間
- queue.put_nowait(item) 相當Queue.put(item, False)
- queue.task_done() 在完成一項工作之后,Queue.task_done()函數向任務已經完成的隊列發送一個信號
- queue.join() 實際上意味著等到隊列為空,再執行別的操作
案例1:
import queue import threading import timeexitFlag = 0class myThread(threading.Thread):def __init__(self, threadID, name, q):threading.Thread.__init__(self)self.threadID = threadIDself.name = nameself.q = qdef run(self):print("Starting " + self.name)process_data(self.name, self.q)print("Exiting " + self.name)def process_data(threadName, q):while not exitFlag:queueLock.acquire()if not workQueue.empty():data = q.get()queueLock.release()print("%s processing %s" % (threadName, data))else:queueLock.release()time.sleep(1)threadList = ["Thread-1", "Thread-2", "Thread-3"] nameList = ["One", "Two", "Three", "Four", "Five"] queueLock = threading.Lock() workQueue = queue.Queue(10) threads = [] threadID = 1# 創建新線程 for tName in threadList:thread = myThread(threadID, tName, workQueue)thread.start()threads.append(thread)threadID += 1# 填充隊列 queueLock.acquire() for word in nameList:workQueue.put(word) queueLock.release()# 等待隊列清空 while not workQueue.empty():pass# 通知線程是時候退出 exitFlag = 1# 等待所有線程完成 for t in threads:t.join() print("Exiting Main Thread")案例2:
import time import threading import queueclass Worker(threading.Thread):def __init__(self, name, queue):threading.Thread.__init__(self)self.queue = queueself.start() #執行run()def run(self):#循環,保證接著跑下一個任務while True:# 隊列為空則退出線程if self.queue.empty():break# 獲取一個隊列數據foo = self.queue.get()# 延時1S模擬你要做的事情time.sleep(1)# 打印print(self.getName() + " process " + str(foo))# 任務完成self.queue.task_done()# 隊列 queue = queue.Queue() # 加入100個任務隊列 for i in range(100):queue.put(i) # 開10個線程 for i in range(10):threadName = 'Thread' + str(i)Worker(threadName, queue) # 所有線程執行完畢后關閉 queue.join()4. 多線程的生產者消費者模式
# -*- coding: utf-8 -*- __author__ = 'sun' __date__ = '2019/6/04 19:40'from queue import Queue import random, threading, time# 生產者類 class Producer(threading.Thread):def __init__(self, name, queue):threading.Thread.__init__(self, name=name)self.data = queuedef run(self):for i in range(5):print("%s is producing %d to the queue!" % (self.getName(), i))self.data.put(i)time.sleep(random.randrange(10) / 5)print("%s finished!" % self.getName())# 消費者類 class Consumer(threading.Thread):def __init__(self, name, queue):threading.Thread.__init__(self, name=name)self.data = queuedef run(self):for i in range(5):val = self.data.get()print("%s is consuming. %d in the queue is consumed!" % (self.getName(), val))time.sleep(random.randrange(10))print("%s finished!" % self.getName())def main():queue = Queue()producer = Producer('Producer', queue)consumer = Consumer('Consumer', queue)producer.start()consumer.start()producer.join()consumer.join()print('All threads finished!')if __name__ == '__main__':main()5 線程池
傳統多線程問題?
? 傳統多線程方案會使用“即時創建, 即時銷毀”的策略。盡管與創建進程相比,創建線程的時間已經大大的縮短,但是如果提交給線程的任務是執行時間較短,而且執行次數極其頻繁,那么服務器將處于不停的創建線程,銷毀線程的狀態。
? 一個線程的運行時間可以分為3部分:線程的啟動時間、線程體的運行時間和線程的銷毀時間。在多線程處理的情景中,如果線程不能被重用,就意味著每次創建都需要經過啟動、銷毀和運行3個過程。這必然會增加系統相應的時間,降低了效率。
有沒有一種高效的解決方案呢? —— 線程池
線程池基本原理:
? 我們把任務放進隊列中去,然后開N個線程,每個線程都去隊列中取一個任務,執行完了之后告訴系統說我執行完了,然后接著去隊列中取下一個任務,直至隊列中所有任務取空,退出線程。
使用線程池:
? 由于線程預先被創建并放入線程池中,同時處理完當前任務之后并不銷毀而是被安排處理下一個任務,因此能夠避免多次創建線程,從而節省線程創建和銷毀的開銷,能帶來更好的性能和系統穩定性。
線程池要設置為多少?
服務器CPU核數有限,能夠同時并發的線程數有限,并不是開得越多越好,以及線程切換是有開銷的,如果線程切換過于頻繁,反而會使性能降低
線程執行過程中,計算時間分為兩部分:
- CPU計算,占用CPU
- 不需要CPU計算,不占用CPU,等待IO返回,比如recv(), accept(), sleep()等操作,具體操作就是比如
訪問cache、RPC調用下游service、訪問DB,等需要網絡調用的操作
那么如果計算時間占50%, 等待時間50%,那么為了利用率達到最高,可以開2個線程:
假如工作時間是2秒, CPU計算完1秒后,線程等待IO的時候需要1秒,此時CPU空閑了,這時就可以切換到另外一個線程,讓CPU工作1秒后,線程等待IO需要1秒,此時CPU又可以切回去,第一個線程這時剛好完成了1秒的IO等待,可以讓CPU繼續工作,就這樣循環的在兩個線程之前切換操作。
那么如果計算時間占20%, 等待時間80%,那么為了利用率達到最高,可以開5個線程:
可以想象成完成任務需要5秒,CPU占用1秒,等待時間4秒,CPU在線程等待時,可以同時再激活4個線程,這樣就把CPU和IO等待時間,最大化的重疊起來
抽象一下,計算線程數設置的公式就是:
N核服務器,通過執行業務的單線程分析出本地計算時間為x,等待時間為y,則工作線程數(線程池線程數)設置為 N*(x+y)/x,能讓CPU的利用率最大化。
由于有GIL的影響,python只能使用到1個核,所以這里設置N=1
6. python 進行并發編程
? 在Python 2的時代,高性能的網絡編程主要是使用Twisted、Tornado和Gevent這三個庫,但是它們的異步代碼相互之間既不兼容也不能移植。 asyncio是Python 3.4版本引入的標準庫,直接內置了對異步IO的支持。
? asyncio的編程模型就是一個消息循環。我們從asyncio模塊中直接獲取一個EventLoop的引用,然后把需要執行的協程扔到EventLoop中執行,就實現了異步IO。
? Python的在3.4中引入了協程的概念,可是這個還是以生成器對象為基礎。
? Python 3.5添加了async和await這兩個關鍵字,分別用來替換asyncio.coroutine和yield from。
? python3.5則確定了協程的語法。下面將簡單介紹asyncio的使用。實現協程的不僅僅是asyncio,tornado和gevent都實現了類似的功能。
(1)協程定義
用asyncio實現Hello world代碼如下:
import asyncio@asyncio.coroutine def hello():print("Hello world!")# 異步調用asyncio.sleep(1):r = yield from asyncio.sleep(1)print("Hello again!") # 獲取EventLoop: loop = asyncio.get_event_loop() # 執行coroutine loop.run_until_complete(hello()) loop.close()? @asyncio.coroutine把一個generator標記為coroutine類型,然后,我們就把這個coroutine扔到EventLoop中執行。 hello()會首先打印出Hello world!,然后,yield from語法可以讓我們方便地調用另一個generator。由于asyncio.sleep()也是一個coroutine,所以線程不會等待asyncio.sleep(),而是直接中斷并執行下一個消息循環。當asyncio.sleep()返回時,線程就可以從yield from拿到返回值(此處是None),然后接著執行下一行語句。
? 把asyncio.sleep(1)看成是一個耗時1秒的IO操作,在此期間,主線程并未等待,而是去執行EventLoop中其他可以執行的coroutine了,因此可以實現并發執行。
我們用Task封裝兩個coroutine試試:
import threading import asyncio@asyncio.coroutine def hello():print('Hello world! (%s)' % threading.currentThread())yield from asyncio.sleep(1)print('Hello again! (%s)' % threading.currentThread())loop = asyncio.get_event_loop() tasks = [hello(), hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()觀察執行過程:
Hello world! (<_MainThread(MainThread, started 140735195337472)>) Hello world! (<_MainThread(MainThread, started 140735195337472)>) (暫停約1秒) Hello again! (<_MainThread(MainThread, started 140735195337472)>) Hello again! (<_MainThread(MainThread, started 140735195337472)>)由打印的當前線程名稱可以看出,兩個coroutine是由同一個線程并發執行的。
如果把asyncio.sleep()換成真正的IO操作,則多個coroutine就可以由一個線程并發執行。
asyncio案例實戰
我們用asyncio的異步網絡連接來獲取sina、sohu和163的網站首頁:
async_wget.py
import asyncio@asyncio.coroutine def wget(host):print('wget %s...' % host)connect = asyncio.open_connection(host, 80)reader, writer = yield from connectheader = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % hostwriter.write(header.encode('utf-8'))yield from writer.drain()while True:line = yield from reader.readline()if line == b'\r\n':breakprint('%s header > %s' % (host, line.decode('utf-8').rstrip()))# Ignore the body, close the socketwriter.close()loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close()結果信息如下:
wget www.sohu.com... wget www.sina.com.cn... wget www.163.com... (等待一段時間) (打印出sohu的header) www.sohu.com header > HTTP/1.1 200 OK www.sohu.com header > Content-Type: text/html ... (打印出sina的header) www.sina.com.cn header > HTTP/1.1 200 OK www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT ... (打印出163的header) www.163.com header > HTTP/1.0 302 Moved Temporarily www.163.com header > Server: Cdn Cache Server V2.0 ...可見3個連接由一個線程通過coroutine并發完成。
小結
asyncio提供了完善的異步IO支持;
異步操作需要在coroutine中通過yield from完成;
多個coroutine可以封裝成一組Task然后并發執行。
轉載于:https://www.cnblogs.com/sunBinary/p/10976929.html
總結
以上是生活随笔為你收集整理的Python多线程原理与实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: redis底层数据结构简述
- 下一篇: 错了