python是如何实现进程池和线程池的_进程、线程、线程池和协程如何理解?
1、進程、線程、線程池的概念
進程是一個動態的過程,是一個活動的實體。簡單來說,一個應用程序的運行就可以被看做是一個進程,而線程,是運行中的實際的任務執行者。可以說,進程中包含了多個可以同時運行的線程。
線程,程序執行流的最小執行單位,是進程中的實際運作單位。
線程池:Java中開辟出了一種管理線程的概念,這個概念叫做線程池,從概念以及應用場景中,我們可以看出,線程池的好處,就是可以方便的管理線程,也可以減少內存的消耗,那為什么我們要使用線程池,主要解決如下幾個問題:
創建/銷毀線程伴隨著系統開銷,過于頻繁的創建/銷毀線程,會很大程度上影響處理效率
線程并發數量過多,搶占系統資源,從而導致系統阻塞
能夠容易的管理線程,比如:線程延遲執行、執行策略等
2、線程的 生命周期
線程的生命周期,線程的生命周期可以利用以下的圖解來更好的理解:
首先使用new Thread()的方法新建一個線程,在線程創建完成之后,線程就進入了就緒(Runnable)狀態,此時創建出來的線程進入搶占CPU資源的狀態,當線程搶到了CPU的執行權之后,線程就進入了運行狀態(Running),當該線程的任務執行完成之后或者是非常態的調用的stop()方法之后,線程就進入了死亡狀態。而我們在圖解中可以看出,線程還具有一個則色的過程,這是怎么回事呢?當面對以下幾種情況的時候,容易造成線程阻塞,第一種,當線程主動調用了sleep()方法時,線程會進入則阻塞狀態,除此之外,當線程中主動調用了阻塞時的IO方法時,這個方法有一個返回參數,當參數返回之前,線程也會進入阻塞狀態,還有一種情況,當線程進入正在等待某個通知時,會進入阻塞狀態。那么,為什么會有阻塞狀態出現呢?我們都知道,CPU的資源是十分寶貴的,所以,當線程正在進行某種不確定時長的任務時,Java就會收回CPU的執行權,從而合理應用CPU的資源。我們根據圖可以看出,線程在阻塞過程結束之后,會重新進入就緒狀態,重新搶奪CPU資源。這時候,我們可能會產生一個疑問,如何跳出阻塞過程呢?又以上幾種可能造成線程阻塞的情況來看,都是存在一個時間限制的,當sleep()方法的睡眠時長過去后,線程就自動跳出了阻塞狀態,第二種則是在返回了一個參數之后,在獲取到了等待的通知時,就自動跳出了線程的阻塞過程。
文末超強干貨分享
3、單線程和多線程概念
單線程,顧名思義即是只有一個線程在執行任務,這種情況在我們日常的工作學習中很少遇到,所以我們只是簡單做一下了解
多線程,創建多個線程同時執行任務,這種方式在我們的日常生活中比較常見。但是,在多線程的使用過程中,還有許多需要我們了解的概念。比如,在理解上并行和并發的區別,以及在實際應用的過程中多線程的安全問題,對此,我們需要進行詳細的了解。
并行和并發:在我們看來,都是可以同時執行多種任務,那么,到底他們二者有什么區別呢?
并發:從宏觀方面來說,并發就是同時進行多種時間,實際上,這幾種時間,并不是同時進行的,而是交替進行的,而由于CPU的運算速度非常的快,會造成我們的一種錯覺,就是在同一時間內進行了多種事情
并行:則是真正意義上的同時進行多種事情。這種只可以在多核CPU的基礎上完成。
還有就是多線程的安全問題?為什么會造成多線程的安全問題呢?我們可以想象一下,如果多個線程同時執行一個任務,意味著他們共享同一種資源,由于線程CPU的資源不一定可以被誰搶占到,這是,第一條線程先搶占到CPU資源,他剛剛進行了第一次操作,而此時第二條線程搶占到了CPU的資源,共享資源還來不及發生變化,就同時有兩個線程使用了同一條資源,會造成數據不一致性,導致線程執行錯誤發生。
有造成問題的原因我們可以看出,這個問題主要的矛盾在于,CPU的使用權搶占和資源的共享發生了沖突,解決時,我們只需要讓一條線程占用了CPU的資源時,阻止第二條線程同時搶占CPU的執行權,在代碼中,我們只需要在方法中使用同步代碼塊即可。
4、JAVA中線程池的實現
在Java中,線程池的概念是Executor這個接口,具體實現為ThreadPoolExecutor類,學習Java中的線程池,就可以直接學習它。對線程池的配置,就是對ThreadPoolExecutor構造函數的參數的配置,既然這些參數這么重要,就來看看構造函數的各個參數吧
ThreadPoolExecutor提供了四個構造函數
//五個參數的構造函數 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)//六個參數的構造函數-1 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory)//六個參數的構造函數-2 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)//七個參數的構造函數 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)其它這四個構造函數,一共牽涉到7個參數類型,下面主要講解七個參數。
- int corePoolSize => 該線程池中核心線程數最大值
int maximumPoolSize
該線程池中線程總數最大值線程總數 = 核心線程數 + 非核心線程數。核心線程在上面解釋過了,這里說下非核心線程:不是核心線程的線程(別激動,把刀放下…),其實在上面解釋過了long keepAliveTime
該線程池中非核心線程閑置超時時長一個非核心線程,如果不干活(閑置狀態)的時長超過這個參數所設定的時長,就會被銷毀掉如果設置allowCoreThreadTimeOut = true,則會作用于核心線程TimeUnit unit
keepAliveTime的單位,TimeUnit是一個枚舉類型,其包括:NANOSECONDS : 1微毫秒 = 1微秒 / 1000 MICROSECONDS : 1微秒 = 1毫秒 / 1000 MILLISECONDS : 1毫秒 = 1秒 /1000 SECONDS : 秒 MINUTES : 分 HOURS : 小時 DAYS : 天BlockingQueue workQueue
該線程池中的任務隊列:維護著等待執行的Runnable對象當所有的核心線程都在干活時,新添加的任務會被添加到這個隊列中等待處理,如果隊列滿了,則新建非核心線程執行任務常用的workQueue類型:SynchronousQueue:這個隊列接收到任務的時候,會直接提交給線程處理,而不保留它,如果所有線程都在工作怎么辦?那就新建一個線程來處理這個任務!所以為了保證不出現<線程數達到了maximumPoolSize而不能新建線程>的錯誤,使用這個類型隊列的時候,maximumPoolSize一般指定成Integer.MAX_VALUE,即無限大LinkedBlockingQueue:這個隊列接收到任務的時候,如果當前線程數小于核心線程數,則新建線程(核心線程)處理任務;如果當前線程數等于核心線程數,則進入隊列等待。由于這個隊列沒有最大值限制,即所有超過核心線程數的任務都將被添加到隊列中,這也就導致了maximumPoolSize的設定失效,因為總線程數永遠不會超過corePoolSizeArrayBlockingQueue:可以限定隊列的長度,接收到任務的時候,如果沒有達到corePoolSize的值,則新建線程(核心線程)執行任務,如果達到了,則入隊等候,如果隊列已滿,則新建線程(非核心線程)執行任務,又如果總線程數到了maximumPoolSize,并且隊列也滿了,則發生錯誤DelayQueue:隊列內元素必須實現Delayed接口,這就意味著你傳進去的任務必須先實現Delayed接口。這個隊列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務ThreadFactory threadFactory
創建線程的方式,這是一個接口,你new他的時候需要實現他的Thread newThread(Runnable r)方法,一般用不上。小伙伴應該知道AsyncTask是對線程池的封裝吧?那就直接放一個AsyncTask新建線程池的threadFactory參數源碼吧:new ThreadFactory() {private final AtomicInteger mCount = new AtomicInteger(1);public Thread new Thread(Runnable r) {return new Thread(r,"AsyncTask #" + mCount.getAndIncrement());} }RejectedExecutionHandler handler
這玩意兒就是拋出異常專用的,比如上面提到的兩個錯誤發生了,就會由這個handler拋出異常,你不指定他也有個默認的拋異常能拋出什么花樣來?一般情況下根本用不上。新建一個線程池的時候,一般只用5個參數的構造函數。
向ThreadPoolExecutor添加任務
那說了這么多,你可能有疑惑,我知道new一個ThreadPoolExecutor,大概知道各個參數是干嘛的,可是我new完了,怎么向線程池提交一個要執行的任務啊?
通過ThreadPoolExecutor.execute(Runnable command)方法即可向線程池內添加一個任務
ThreadPoolExecutor的策略
上面介紹參數的時候其實已經說到了ThreadPoolExecutor執行的策略,這里給總結一下,當一個任務被添加進線程池時:
線程數量未達到corePoolSize,則新建一個線程(核心線程)執行任務
線程數量達到了corePools,則將任務移入隊列等待
隊列已滿,新建線程(非核心線程)執行任務
隊列已滿,總線程數又達到了maximumPoolSize,就會由上面那位星期天(RejectedExecutionHandler)拋出異常
5、常見四種線程池
如果你不想自己寫一個線程池,那么你可以從下面看看有沒有符合你要求的(一般都夠用了),如果有,那么很好你直接用就行了,如果沒有,那你就老老實實自己去寫一個吧
Java通過Executors提供了四種線程池,這四種線程池都是直接或間接配置ThreadPoolExecutor的參數實現的,下面我都會貼出這四種線程池構造函數的源碼,各位大佬們一看便知!
來,走起:
CachedThreadPool()
可緩存線程池:
線程數無限制(沒有核心線程,全部是非核心線程)
有空閑線程則復用空閑線程,若無空閑線程則新建線程
一定程序減少頻繁創建/銷毀線程,減少系統開銷
適用場景:適用于耗時少,任務量大的情況
創建方法:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
源碼:
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); }FixedThreadPool()
定長線程池:
創建方法:
//nThreads => 最大線程數即maximumPoolSize ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);//threadFactory => 創建線程的方法! ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory);源碼:
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }2個參數的構造方法源碼,不用我貼你也知道他把星期六放在了哪個位置!所以我就不貼了,省下篇幅給我扯皮
ScheduledThreadPool()
定長線程池:
支持定時及周期性任務執行。
有核心線程,也有非核心線程
非核心線程數量為無限大
適用場景:適用于執行周期性任務
創建方法:
//nThreads => 最大線程數即maximumPoolSize ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);源碼:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize); }//ScheduledThreadPoolExecutor(): public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue()); }SingleThreadExecutor()
單線程化的線程池:
有且僅有一個工作線程執行任務
所有任務按照指定順序執行,即遵循隊列的入隊出隊規則
適用場景:適用于有順序的任務應用場景
創建方法:
ExecutorService singleThreadPool = Executors.newSingleThreadPool();源碼:
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); }還有一個Executors.newSingleThreadScheduledExecutor()結合了3和4,就不介紹了,基本不用。
6、什么是協程?
問題:協程存在的原因?協程能夠解決哪些問題?
在我們現在CS,BS開發模式下,服務器的吞吐量是一個很重要的參數。其實吞吐量是IO處理時間加上業務處理。為了簡單起見,比如,客戶端與服務器之間是長連接的,客戶端定期給服務器發送心跳包數據。客戶端發送一次心跳包到服務器,服務器更新該新客戶端狀態的。心跳包發送的過程,業務處理時長等于IO讀取(RECV系統調用)加上業務處理(更新客戶狀態)。吞吐量等于1s業務處理次數。
業務處理(更新客戶端狀態)時間,業務不一樣的,處理時間不一樣,我們就不做討論。
那如何提升recv的性能。若只有一個客戶端,recv的性能也沒有必要提升,也不能提升。若在有百萬計的客戶端長連接的情況,我們該如何提升。以Linux為例,在這里需要介紹一個“網紅”就是epoll。服務器使用epoll管理百萬計的客戶端長連接,代碼框架如下:
hile (1) {int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);for (i = 0;i < nready;i ++) {int sockfd = events[i].data.fd;if (sockfd == listenfd) {int connfd = accept(listenfd, xxx, xxxx);setnonblock(connfd);ev.events = EPOLLIN | EPOLLET;ev.data.fd = connfd;epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);} else {handle(sockfd);}} }對于響應式服務器,所有的客戶端的操作驅動都是來源于這個大循環。來源于epoll_wait的反饋結果。
對于服務器處理百萬計的IO。Handle(sockfd)實現方式有兩種。
第一種,handle(sockfd)函數內部對sockfd進行讀寫動作。代碼如下
int handle(int sockfd) {recv(sockfd, rbuffer, length, 0);parser_proto(rbuffer, length);send(sockfd, sbuffer, length, 0);}handle的io操作(send,recv)與epoll_wait是在同一個處理流程里面的。這就是IO同步操作。
優點:
1. sockfd管理方便。
2. 操作邏輯清晰。
缺點:
1. 服務器程序依賴epoll_wait的循環響應速度慢。
2. 程序性能差
第二種,handle(sockfd)函數內部將sockfd的操作,push到線程池中,代碼如下:
int thread_cb(int sockfd) {// 此函數是在線程池創建的線程中運行。// 與handle不在一個線程上下文中運行recv(sockfd, rbuffer, length, 0);parser_proto(rbuffer, length);send(sockfd, sbuffer, length, 0); }int handle(int sockfd) {//此函數在主線程 main_thread 中運行//在此處之前,確保線程池已經啟動。push_thread(sockfd, thread_cb); //將sockfd放到其他線程中運行。 }Handle函數是將sockfd處理方式放到另一個已經其他的線程中運行,如此做法,將io操作(recv,send)與epoll_wait 不在一個處理流程里面,使得io操作(recv,send)與epoll_wait實現解耦。這就叫做IO異步操作。
優點:
1. 子模塊好規劃。
2. 程序性能高。
缺點:
正因為子模塊好規劃,使得模塊之間的sockfd的管理異常麻煩。每一個子線程都需要管理好sockfd,避免在IO操作的時候,sockfd出現關閉或其他異常。
上文有提到IO同步操作,程序響應慢,IO異步操作,程序響應快。
下面來對比一下IO同步操作與IO異步操作。
代碼如下:
https://github.com/wangbojing/c1000k_test/blob/master/server_mulport_epoll.c
在這份代碼的486行,#if 1, 打開的時候,為IO異步操作。關閉的時候,為IO同步操作。
接下來把我測試接入量的結果粘貼出來。
IO異步操作,每1000個連接接入的服務器響應時間(900ms左右)。
IO同步操作,每1000個連接接入的服務器響應時間(6500ms左右)。
IO異步操作與IO同步操作
有沒有一種方式,有異步性能,同步的代碼邏輯。來方便編程人員對IO操作的組件呢? 有,采用一種輕量級的協程來實現。在每次send或者recv之前進行切換,再由調度器來處理epoll_wait的流程。
就是采用了基于這樣的思考,寫了NtyCo,實現了一個IO異步操作與協程結合的組件。https://github.com/wangbojing/NtyCo,
線程、進程了解懂的人應該不少,但是什么是協程,純C寫的協程框架有了解過嗎?
不急,掃一掃
https://m.ke.qq.com/course/2705727?flowToken=1023499 (二維碼自動識別)
為你解密協程以下的內容:
協程框架實現,調度器模式實現,底層原理,多核模式,性能分析,ntyco作者親講
協程起源 — 存在的原因?協程能夠解決哪些問題?
協程案例 — 如何使用?與線程使用有何區別?
協程實現之工作流程 — 內部是如何工作的?
協程實現之原語操作 — 原語操作有哪些?分別如何實現?
協程實現之切換 — 上下文如何切換?代碼如何實現?
協程實現之定義 — 運行體如何定義?調度器如何定義?
協程實現之調度器 — 協程如何被調度?
協程多核模式 — 多核實現
協程性能測試 — 實戰性能測試
就等你的加入!
總結
以上是生活随笔為你收集整理的python是如何实现进程池和线程池的_进程、线程、线程池和协程如何理解?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: c语言字符串截取_笔记 | 自学Pyth
- 下一篇: 台积电第一大客户(预计为苹果)去年贡献营