Python笔记二之多线程
本文首發于公眾號:Hunter后端
原文鏈接:Python筆記二之多線程
這一篇筆記介紹一下在 Python 中使用多線程。
注意:以下的操作都是在 Python 3.8 版本中試驗,不同版本可能有不同之處,需要注意。
本篇筆記目錄如下:
- 概念
- 多線程的使用示例
daemon
run() - 線程對象的屬性和設置
- 線程模塊相關函數
- threading.active_count()
- threading.current_thread()
- threading.enumerate()
- 線程的異常和函數結果獲取
- 鎖
- 線程池
- result()
- done()
- exception()
- cancel()
- running()
- 如何探索出最佳的線程池線程數量
1、概念
關于進程與線程的概念,這里簡單介紹下。
一個進程是一個獨立的執行環境,包括代碼、數據和系統資源等,每個進程都有自己的內存空間、文件描述符、環境變量等。
而線程存在于進程中,共享進程內的內存和資源。
至于多進程與多線程,多進程可以充分利用計算機的多核 CPU,適用于 CPU 密集型的任務,,比如進行大量計算操作
而多線程則適用于涉及到大量的 IO 操作的任務,比如網絡請求,文件讀寫等,在 Python 中有一個 GIL 的概念,它的全稱是 Global Interpreter Lock,為全局解釋器鎖。
GIL 的存在是為了使同一時刻只有一個線程在運行 Python 代碼,保護解釋器的內部數據避免收到并發訪問的影響。
所以 Python 中的多線程操作實際上是在多個線程中進行切換,以此來實現想要的并發效果。
2、多線程的使用示例
前面介紹了 Python 中多線程的操作適用于 IO 密集型的任務,所以這里以訪問某個接口為例介紹一下多線程的使用。
那個接口我們這里用 Flask 創建一個服務器,其內容如下:
# app/__init__.py
from flask import Flask
import time
def create_app():
app = Flask(__name__)
@app.route("/test/<int:delay>")
def test(delay):
time.sleep(delay)
return str(time.time())
return app
這個接口通過 delay 參數可以指定接口的休眠時間返回,比如 /test/4,那么接口響應時間大約會是 4 秒。
在 Python 中,用到多線程的模塊是 threading 模塊,以下是一個使用示例:
import threading
import time
import requests
def get_response(url):
response = requests.get(url)
print(response.content)
def test_multi_threading():
url = "http://192.168.1.6:5000/test/2"
threads = []
for i in range(20):
threads.append(threading.Thread(target=get_response, args=(url,)))
for t in threads:
t.start()
for t in threads:
t.join()
def test_single():
url = "http://192.168.1.6:5000/test/2"
for i in range(5):
get_response(url)
if __name__ == "__main__":
start_time = time.time()
test_multi_threading()
print("運行耗時:", time.time() - start_time)
start_time = time.time()
test_single()
print("運行耗時:", time.time() - start_time)
在這里我們可以比對單個線程執行五次,需要的時間大約是 10 秒,而使用多線程的方式雖然調用了 20 次接口,但是耗時大約只有 2 秒,這就是多線程在 IO 密集型的情況下的好處。
接下來具體介紹下多線程的使用方法:
def test_multi_threading():
url = "http://192.168.1.6:5000/test/2"
threads = []
for i in range(20):
threads.append(threading.Thread(target=get_response, args=(url,)))
for t in threads:
t.start()
for t in threads:
t.join()
在這里,我們通過 threading.Thread() 的方式創建一個線程,然后通過 .start() 方法開始線程活動。
接著通過 join() 方法阻塞調用這個方法的線程,在這里也就是主線程,等待 t 線程完成后再執行主線程后面的操作。
如果我們嘗試注釋掉 t.join() 這兩行,那么主線程就會不等待 t 線程直接往后面執行,造成我們后面在主函數里計算的時間不準確。
daemon
可以根據這個參數設置線程是否為守護線程,所有線程創建的時候默認都不是守護線程,如果需要設置線程為守護線程,需要額外做設置。
守護線程是一種特殊類型的線程,生命周期受到主線程的影響,也就是說當主線程結束時,守護線程會被強制終止,它不會阻止主線程的正常執行,主線程也不會像其他線程調用了 join() 一樣被阻塞。
守護線程通常用于執行一些輔助性任務,比如日志記錄、定時任務等,示例如下,我們開啟了一個守護線程用于定時 print() 某些信息:
def print_info():
while True:
print("daemon threading, curr_time:", time.time())
time.sleep(1)
def test_daemon_threading():
base_url = "http://192.168.1.6:5000/test/"
t1 = threading.Thread(target=get_response, args=(base_url + str(6),))
t2 = threading.Thread(target=get_response, args=(base_url + str(2),))
daemon_t = threading.Thread(target=print_info, args=(), daemon=True)
t1.start()
t2.start()
daemon_t.start()
t1.join()
t2.join()
這樣,守護線程 daemon_t 就會在后臺一直循環打印信息,直到主線程結束,守護線程也會被強制終止。
run()
run() 和 start() 方法都和線程的執行有關。
start() 用于啟動線程,線程變量調用 start() 后,比如前面的 t.start(),會立即開始執行線程,且線程的執行與主線程并行進行。
而 run() 定義的是線程內的執行邏輯,是線程的入口點,表示的是線程活動的方法,線程開啟后就會調用 run() 方法,執行線程的任務。
在執行 start() 方法后,線程會自動調用 run() 方法,以此來執行線程內需要調用的函數,我們可以通過重寫 run() 方法來實現我們想要的定制化功能,比如在后面我們就是通過重寫 run() 方法來實現線程的異常信息以及函數的結果返回的,
3、線程對象的屬性和設置
線程本身有一些屬性可以用于設置和獲取,我們先創建一條線程:
t1 = threading.Thread(target=get_response, args=(base_url + str(6),))
查看線程名稱
線程名稱只是用于標記線程的,并無實際意義,根據用戶設置而定,比如前面創建了線程,默認名為 Thread-1,我們可以通過下面的兩個操作獲取,兩個操作是等效的:
t1.name
t1.getName()
設置線程名稱
設置線程名稱的方法如下:
t1.setName("test_thread")
判斷線程是否存活
在未進行 start() 操作前,不是存活狀態:
t1.is_alive()
# False
判斷線程是否是守護線程
t1.daemon
t1.isDaemon()
# False
設置線程為守護線程
將線程設置為守護線程:
t1.setDaemon(True)
True 為是,False 為否
4、線程模塊相關函數
對于 threading 模塊,有一些函數可以用于進行相關操作,比如當前存活的線程對象,異常處理等。
接下來先介紹這些函數及其功能,之后會用一個示例應用上這些函數
1. threading.active_count()
返回當前存活的 Thread 對象的數量
2. threading.current_thread()
返回當前對應調用者的線程
3. threading.enumerate()
列表形式返回當前所有存活的 Thread 對象
接下來我們修改 print_info() 函數,運用我們剛剛介紹的這幾種函數:
def print_info():
while True:
active_count = threading.active_count()
print("當前存活的線程數量為:", active_count)
for thread in threading.enumerate():
print("存活的線程分別是:", thread.getName())
print("當前所處的的線程名稱為:", threading.current_thread().getName())
print("\n")
time.sleep(1)
還是執行 test_daemon_threading() 就可以看到對應的輸出信息。
5、線程的異常和函數結果獲取
Python 中使用 threading 模塊創建的線程中的默認異常以及函數執行結果是不會被主線程捕獲的,因為線程是獨立運行的,我們可以通過定義全局的變量,比如 dict 或者隊列來獲取對應的信息。
這里介紹一下通過改寫 run() 方法來實現我們的功能。
import threading
import traceback
import time
import request
def get_response(url):
response = requests.get(url)
if url.endswith("2"):
1/0
return time.time()
def print_info():
while True:
active_count = threading.active_count()
print("當前存活的線程數量為:", active_count)
for thread in threading.enumerate():
print("存活的線程分別是:", thread.getName())
print("當前所處的的線程名稱為:", threading.current_thread().getName())
print("\n")
time.sleep(1)
class MyThread(threading.Thread):
def __init__(self, func, *args, **kwargs):
super(MyThread, self).__init__()
self.func = func
self.args = args
self.kwargs = kwargs
self.result = None
self.is_error = None
self.trace_info = None
def run(self):
try:
self.result = self.func(*self.args, **self.kwargs)
except Exception as e:
self.is_error = True
self.trace_info = traceback.format_exc()
def get_result(self):
return self.result if self.is_error is not True else None
def test_get_exception_and_result():
base_url = "http://192.168.1.6:5000/test/"
t1 = MyThread(get_response, base_url + str(3))
t2 = MyThread(get_response, base_url + str(2))
daemon_t = MyThread(print_info)
daemon_t.setDaemon(True)
t1.start()
t2.start()
daemon_t.start()
t1.join()
t2.join()
print(t1.get_result())
print(t2.is_error)
print(t2.trace_info)
if __name__ == "__main__":
test_get_exception_and_result()
在這里,我們調用 get_response 函數時,通過判斷 delay 的值,手動觸發了報錯,以及添加了一個 return 返回值,且通過 MyThread 這個重寫的 threading.Thread 來進行操作,獲取到線程執行是否有異常,以及異常信息,以及函數返回的結果。
6、鎖
如果有時候多個線程需要訪問同一個全局變量,可能會導致數據不一致的問題,我們使用線程里的鎖來控制對相關資源的訪問,以此來確保線程安全,下面是一個示例:
import threading
counter = 0
lock_counter = 0
lock = threading.Lock()
def test_no_lock():
global counter
for i in range(1000000):
counter += 1
counter -= 1
def run_no_lock_thread():
t1 = threading.Thread(target=test_no_lock)
t2 = threading.Thread(target=test_no_lock)
t1.start()
t2.start()
t1.join()
t2.join()
def test_lock():
global lock_counter
for i in range(1000000):
lock.acquire()
lock_counter += 1
lock_counter -= 1
lock.release()
def run_lock_thread():
t1 = threading.Thread(target=test_lock)
t2 = threading.Thread(target=test_lock)
t1.start()
t2.start()
t1.join()
t2.join()
if __name__ == "__main__":
print("before: ", counter)
run_no_lock_thread()
print("after: ", counter)
print("before: ", lock_counter)
run_lock_thread()
print("after: ", lock_counter)
在上面的示例中,通過比對兩個加鎖和不加鎖的情況下全局變量的值,可以發現,多執行幾次的話,可以看法 counter 的值并不總是為 0 的,而 lock_counter 的值的結果一直是 0。
我們通過這種加鎖的方式來保證 lock_counter 的值是安全的。
鎖的引入我們使用的是:
lock = threading.Lock()
獲取以及釋放的方法是:
lock.acquire()
lock.release()
在這里對于 lock.acquire() 獲取鎖,有兩個參數,blocking 和 timeout。
blocking 表示是否阻塞,默認為 True,表示如果鎖沒有被釋放,則會一直阻塞到鎖被其他線程釋放,為 False 的話,則表示不阻塞地獲取鎖,獲取到返回為 True,沒有獲取到返回為 False
lock.acquire()
# 返回為 True,表示獲取到鎖
lock.acquire()
lock.acquire(blocking=True)
# 這兩個操作都是阻塞獲取鎖,因為前一個操作已經獲取到鎖,所以這一步會被一直阻塞
is_lock = lock.acquire(blocking=False)
# 不阻塞的獲取鎖,如果拿到了鎖并加鎖,則返回為 True,否則返回為 False,表示沒有拿到鎖
還有一個參數為 timeout,表示 blocking 為True,也就是阻塞的時候,等待的秒數之后,超時沒有拿到鎖,返回為 False 。
release() 表示為鎖的釋放,沒有返回值,當前面獲取鎖之后,可以通過 lock.release() 的方式釋放鎖。
locked() 返回為布爾型數據,判斷是否獲得了鎖。
7、線程池
我們可以通過線程池的方式來自動管理我們的線程,用到的模塊是 concurrent.futures.ThreadPoolExecutor
以下是一個使用示例:
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
def get_response(url):
return True
with ThreadPoolExecutor(max_workers=8) as executor:
future_list = [executor.submit(get_response, base_url) for _ in range(20)]
for future in concurrent.futures.as_completed(future_list):
print(future.result()
在這里,首先實例化一個線程池,然后輸入 max_workers 參數,表示線程池開啟的最大的線程數。
之后通過 submit() 方法向線程池提交兩個任務,并返回一個 Future 對象,我們可以通過這個 Future 對象獲取線程函數執行的各種情況,比如線程函數的返回結果,線程異常情況等。
在這里有一個 concurrent.futures.as_completed() 輸入的是一個 Future 列表,會按照 任務完成的順序 逐個返回已經完成的 Future 對象,這個完成,可以是線程函數執行完成,也可以是出現異常的結果。
接下來介紹一下 Future 對象的幾個方法,在此之前,我們設置一下用于試驗的基本數據:
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
import requests
import time
def get_response(url):
response = requests.get(url)
if url.endswith("2"):
1/0
return time.time()
base_url = "http://192.168.1.6:5000/test/"
executor = ThreadPoolExecutor(max_workers=2)
future_1 = executor.submit(get_response, base_url + "3")
future_2 = executor.submit(get_response, base_url + "2")
其中,future_1 線程是正常運行,future_2 在線程里執行報錯了。
1. result()
用于獲取線程執行的函數返回的結果,如果線程還未完成,那么調用這個方法會阻塞,直到返回結果。
而如果線程里函數執行異常了,調用 result() 方法會重新拋出異常,希望程序正常運行的話,可以加上一個 try-except 操作,或者先通過后面的 exception()方法進行判斷。
我們調用 future_1.result() 可以正常返回,而 future_2.result() 會重新報異常。
2. done()
返回一個布爾值,表示線程是否已經完成:
future_1.done() # True
future_2.done() # True
線程執行發生異常也屬于完成。
3. exception()
如果線程執行發生異常,可以用這個方法來獲取異常對象,如果沒有異常就會返回 None。
future_2.exception()
# ZeroDivisionError('division by zero')
4. cancel()
嘗試取消線程的執行,如果線程還沒有開始執行,線程會被標記為取消狀態,如果線程已經在執行中或者執行完畢,則不會被取消:
future.cancel()
判斷一個線程是否已經被取消,使用方法 cancelled(),返回布爾型數據
5. running()
判斷線程是否還在執行中,比如下面的操作:
future_3 = executor.submit(get_response, base_url + "65")
future_3.running() # True
8、如何探索出最佳的線程池線程數量
對于線程池中線程的數量需要指定多少個,是一個需要探索的問題。
比如需要判斷我們的任務是否是 IO 密集型的,比如網絡請求等,這種的話可以設置相對較高,但也并非無限高,因為等待的過程中,線程間的切換也是一部分開銷。
在執行真正的任務前,我們可以通過一小部分任務來進行性能測試,逐步調整線程池的線程數量,然后觀察服務器的內存啊,CPU 利用率啊,以及整個操作的消耗時間等,來綜合判斷出比較適合的線程數量作為最終的結果。
如果想獲取更多后端相關文章,可掃碼關注閱讀:
總結
以上是生活随笔為你收集整理的Python笔记二之多线程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何清理Docker不用的Volume
- 下一篇: 用华为WS5200增强版路由器-华为52