Python 09--多线程、进程
本節(jié)內(nèi)容
?
?
操作系統(tǒng)發(fā)展史
手工操作(無操作系統(tǒng))
1946年第一臺計算機(jī)誕生--20世紀(jì)50年代中期,還未出現(xiàn)操作系統(tǒng),計算機(jī)工作采用手工操作方式。
手工操作
程序員將對應(yīng)于程序和數(shù)據(jù)的已穿孔的紙帶(或卡片)裝入輸入機(jī),然后啟動輸入機(jī)把程序和數(shù)據(jù)輸入計算機(jī)內(nèi)存,接著通過控制臺開關(guān)啟動程序針對數(shù)據(jù)運行;計算完畢,打印機(jī)輸出計算結(jié)果;用戶取走結(jié)果并卸下紙帶(或卡片)后,才讓下一個用戶上機(jī)。
?
手工操作方式兩個特點:
(1)用戶獨占全機(jī)。不會出現(xiàn)因資源已被其他用戶占用而等待的現(xiàn)象,但資源的利用率低。
(2)CPU 等待手工操作。CPU的利用不充分。
? 20世紀(jì)50年代后期,出現(xiàn)人機(jī)矛盾:手工操作的慢速度和計算機(jī)的高速度之間形成了尖銳矛盾,手工操作方式已嚴(yán)重?fù)p害了系統(tǒng)資源的利用率(使資源利用率降為百分之幾,甚至更低),不能容忍。唯一的解決辦法:只有擺脫人的手工操作,實現(xiàn)作業(yè)的自動過渡。這樣就出現(xiàn)了成批處理。
?
批處理系統(tǒng)
批處理系統(tǒng):加載在計算機(jī)上的一個系統(tǒng)軟件,在它的控制下,計算機(jī)能夠自動地、成批地處理一個或多個用戶的作業(yè)(這作業(yè)包括程序、數(shù)據(jù)和命令)。
聯(lián)機(jī)批處理系統(tǒng)
首先出現(xiàn)的是聯(lián)機(jī)批處理系統(tǒng),即作業(yè)的輸入/輸出由CPU來處理。
主機(jī)與輸入機(jī)之間增加一個存儲設(shè)備——磁帶,在運行于主機(jī)上的監(jiān)督程序的自動控制下,計算機(jī)可自動完成:成批地把輸入機(jī)上的用戶作業(yè)讀入磁帶,依次把磁帶上的用戶作業(yè)讀入主機(jī)內(nèi)存并執(zhí)行并把計算結(jié)果向輸出機(jī)輸出。完成了上一批作業(yè)后,監(jiān)督程序又從輸入機(jī)上輸入另一批作業(yè),保存在磁帶上,并按上述步驟重復(fù)處理。
?
監(jiān)督程序不停地處理各個作業(yè),從而實現(xiàn)了作業(yè)到作業(yè)的自動轉(zhuǎn)接,減少了作業(yè)建立時間和手工操作時間,有效克服了人機(jī)矛盾,提高了計算機(jī)的利用率。
但是,在作業(yè)輸入和結(jié)果輸出時,主機(jī)的高速CPU仍處于空閑狀態(tài),等待慢速的輸入/輸出設(shè)備完成工作: 主機(jī)處于“忙等”狀態(tài)。
?
脫機(jī)批處理系統(tǒng)
為克服與緩解高速主機(jī)與慢速外設(shè)的矛盾,提高CPU的利用率,又引入了脫機(jī)批處理系統(tǒng),即輸入/輸出脫離主機(jī)控制。
這種方式的顯著特征是:增加一臺不與主機(jī)直接相連而專門用于與輸入/輸出設(shè)備打交道的衛(wèi)星機(jī)。
其功能是:
(1)從輸入機(jī)上讀取用戶作業(yè)并放到輸入磁帶上。
(2)從輸出磁帶上讀取執(zhí)行結(jié)果并傳給輸出機(jī)。
這樣,主機(jī)不是直接與慢速的輸入/輸出設(shè)備打交道,而是與速度相對較快的磁帶機(jī)發(fā)生關(guān)系,有效緩解了主機(jī)與設(shè)備的矛盾。主機(jī)與衛(wèi)星機(jī)可并行工作,二者分工明確,可以充分發(fā)揮主機(jī)的高速計算能力。
?
脫機(jī)批處理系統(tǒng):20世紀(jì)60年代應(yīng)用十分廣泛,它極大緩解了人機(jī)矛盾及主機(jī)與外設(shè)的矛盾。IBM-7090/7094:配備的監(jiān)督程序就是脫機(jī)批處理系統(tǒng),是現(xiàn)代操作系統(tǒng)的原型。
不足:每次主機(jī)內(nèi)存中僅存放一道作業(yè),每當(dāng)它運行期間發(fā)出輸入/輸出(I/O)請求后,高速的CPU便處于等待低速的I/O完成狀態(tài),致使CPU空閑。
為改善CPU的利用率,又引入了多道程序系統(tǒng)。
?
多道程序系統(tǒng)
多道程序設(shè)計技術(shù)
所謂多道程序設(shè)計技術(shù),就是指允許多個程序同時進(jìn)入內(nèi)存并運行。即同時把多個程序放入內(nèi)存,并允許它們交替在CPU中運行,它們共享系統(tǒng)中的各種硬、軟件資源。當(dāng)一道程序因I/O請求而暫停運行時,CPU便立即轉(zhuǎn)去運行另一道程序。
單道程序的運行過程:
在A程序計算時,I/O空閑, A程序I/O操作時,CPU空閑(B程序也是同樣);必須A工作完成后,B才能進(jìn)入內(nèi)存中開始工作,兩者是串行的,全部完成共需時間=T1+T2。
?
多道程序的運行過程:
將A、B兩道程序同時存放在內(nèi)存中,它們在系統(tǒng)的控制下,可相互穿插、交替地在CPU上運行:當(dāng)A程序因請求I/O操作而放棄CPU時,B程序就可占用CPU運行,這樣 CPU不再空閑,而正進(jìn)行A I/O操作的I/O設(shè)備也不空閑,顯然,CPU和I/O設(shè)備都處于“忙”狀態(tài),大大提高了資源的利用率,從而也提高了系統(tǒng)的效率,A、B全部完成所需時間<<T1+T2。
?
?
多道程序設(shè)計技術(shù)不僅使CPU得到充分利用,同時改善I/O設(shè)備和內(nèi)存的利用率,從而提高了整個系統(tǒng)的資源利用率和系統(tǒng)吞吐量(單位時間內(nèi)處理作業(yè)(程序)的個數(shù)),最終提高了整個系統(tǒng)的效率。
單處理機(jī)系統(tǒng)中多道程序運行時的特點:
(1)多道:計算機(jī)內(nèi)存中同時存放幾道相互獨立的程序;
(2)宏觀上并行:同時進(jìn)入系統(tǒng)的幾道程序都處于運行過程中,即它們先后開始了各自的運行,但都未運行完畢;
(3)微觀上串行:實際上,各道程序輪流地用CPU,并交替運行。
多道程序系統(tǒng)的出現(xiàn),標(biāo)志著操作系統(tǒng)漸趨成熟的階段,先后出現(xiàn)了作業(yè)調(diào)度管理、處理機(jī)管理、存儲器管理、外部設(shè)備管理、文件系統(tǒng)管理等功能。
多道批處理系統(tǒng)
20世紀(jì)60年代中期,在前述的批處理系統(tǒng)中,引入多道程序設(shè)計技術(shù)后形成多道批處理系統(tǒng)(簡稱:批處理系統(tǒng))。
它有兩個特點:
(1)多道:系統(tǒng)內(nèi)可同時容納多個作業(yè)。這些作業(yè)放在外存中,組成一個后備隊列,系統(tǒng)按一定的調(diào)度原則每次從后備作業(yè)隊列中選取一個或多個作業(yè)進(jìn)入內(nèi)存運行,運行作業(yè)結(jié)束、退出運行和后備作業(yè)進(jìn)入運行均由系統(tǒng)自動實現(xiàn),從而在系統(tǒng)中形成一個自動轉(zhuǎn)接的、連續(xù)的作業(yè)流。
(2)成批:在系統(tǒng)運行過程中,不允許用戶與其作業(yè)發(fā)生交互作用,即:作業(yè)一旦進(jìn)入系統(tǒng),用戶就不能直接干預(yù)其作業(yè)的運行。
?
批處理系統(tǒng)的追求目標(biāo):提高系統(tǒng)資源利用率和系統(tǒng)吞吐量,以及作業(yè)流程的自動化。
批處理系統(tǒng)的一個重要缺點:不提供人機(jī)交互能力,給用戶使用計算機(jī)帶來不便。
雖然用戶獨占全機(jī)資源,并且直接控制程序的運行,可以隨時了解程序運行情況。但這種工作方式因獨占全機(jī)造成資源效率極低。
一種新的追求目標(biāo):既能保證計算機(jī)效率,又能方便用戶使用計算機(jī)。 20世紀(jì)60年代中期,計算機(jī)技術(shù)和軟件技術(shù)的發(fā)展使這種追求成為可能。
?
分時系統(tǒng)
由于CPU速度不斷提高和采用分時技術(shù),一臺計算機(jī)可同時連接多個用戶終端,而每個用戶可在自己的終端上聯(lián)機(jī)使用計算機(jī),好象自己獨占機(jī)器一樣。
分時技術(shù):把處理機(jī)的運行時間分成很短的時間片,按時間片輪流把處理機(jī)分配給各聯(lián)機(jī)作業(yè)使用。
若某個作業(yè)在分配給它的時間片內(nèi)不能完成其計算,則該作業(yè)暫時中斷,把處理機(jī)讓給另一作業(yè)使用,等待下一輪時再繼續(xù)其運行。由于計算機(jī)速度很快,作業(yè)運行輪轉(zhuǎn)得很快,給每個用戶的印象是,好象他獨占了一臺計算機(jī)。而每個用戶可以通過自己的終端向系統(tǒng)發(fā)出各種操作控制命令,在充分的人機(jī)交互情況下,完成作業(yè)的運行。
具有上述特征的計算機(jī)系統(tǒng)稱為分時系統(tǒng),它允許多個用戶同時聯(lián)機(jī)使用計算機(jī)。
?
特點:
(1)多路性。若干個用戶同時使用一臺計算機(jī)。微觀上看是各用戶輪流使用計算機(jī);宏觀上看是各用戶并行工作。
(2)交互性。用戶可根據(jù)系統(tǒng)對請求的響應(yīng)結(jié)果,進(jìn)一步向系統(tǒng)提出新的請求。這種能使用戶與系統(tǒng)進(jìn)行人機(jī)對話的工作方式,明顯地有別于批處理系統(tǒng),因而,分時系統(tǒng)又被稱為交互式系統(tǒng)。
(3)獨立性。用戶之間可以相互獨立操作,互不干擾。系統(tǒng)保證各用戶程序運行的完整性,不會發(fā)生相互混淆或破壞現(xiàn)象。
(4)及時性。系統(tǒng)可對用戶的輸入及時作出響應(yīng)。分時系統(tǒng)性能的主要指標(biāo)之一是響應(yīng)時間,它是指:從終端發(fā)出命令到系統(tǒng)予以應(yīng)答所需的時間。
分時系統(tǒng)的主要目標(biāo):對用戶響應(yīng)的及時性,即不至于用戶等待每一個命令的處理時間過長。
分時系統(tǒng)可以同時接納數(shù)十個甚至上百個用戶,由于內(nèi)存空間有限,往往采用對換(又稱交換)方式的存儲方法。即將未“輪到”的作業(yè)放入磁盤,一旦“輪到”,再將其調(diào)入內(nèi)存;而時間片用完后,又將作業(yè)存回磁盤(俗稱“滾進(jìn)”、“滾出“法),使同一存儲區(qū)域輪流為多個用戶服務(wù)。
多用戶分時系統(tǒng)是當(dāng)今計算機(jī)操作系統(tǒng)中最普遍使用的一類操作系統(tǒng)。
?
實時系統(tǒng)
雖然多道批處理系統(tǒng)和分時系統(tǒng)能獲得較令人滿意的資源利用率和系統(tǒng)響應(yīng)時間,但卻不能滿足實時控制與實時信息處理兩個應(yīng)用領(lǐng)域的需求。于是就產(chǎn)生了實時系統(tǒng),即系統(tǒng)能夠及時響應(yīng)隨機(jī)發(fā)生的外部事件,并在嚴(yán)格的時間范圍內(nèi)完成對該事件的處理。
實時系統(tǒng)在一個特定的應(yīng)用中常作為一種控制設(shè)備來使用。
實時系統(tǒng)可分成兩類:
(1)實時控制系統(tǒng)。當(dāng)用于飛機(jī)飛行、導(dǎo)彈發(fā)射等的自動控制時,要求計算機(jī)能盡快處理測量系統(tǒng)測得的數(shù)據(jù),及時地對飛機(jī)或?qū)椷M(jìn)行控制,或?qū)⒂嘘P(guān)信息通過顯示終端提供給決策人員。當(dāng)用于軋鋼、石化等工業(yè)生產(chǎn)過程控制時,也要求計算機(jī)能及時處理由各類傳感器送來的數(shù)據(jù),然后控制相應(yīng)的執(zhí)行機(jī)構(gòu)。
(2)實時信息處理系統(tǒng)。當(dāng)用于預(yù)定飛機(jī)票、查詢有關(guān)航班、航線、票價等事宜時,或當(dāng)用于銀行系統(tǒng)、情報檢索系統(tǒng)時,都要求計算機(jī)能對終端設(shè)備發(fā)來的服務(wù)請求及時予以正確的回答。此類對響應(yīng)及時性的要求稍弱于第一類。
實時操作系統(tǒng)的主要特點:
(1)及時響應(yīng)。每一個信息接收、分析處理和發(fā)送的過程必須在嚴(yán)格的時間限制內(nèi)完成。
(2)高可靠性。需采取冗余措施,雙機(jī)系統(tǒng)前后臺工作,也包括必要的保密措施等。
?
操作系統(tǒng)發(fā)展圖譜
?
?
?
?
進(jìn)程與線程
什么是進(jìn)程(process)?
An executing instance of a program is called a process.
Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.
程序并不能單獨運行,只有將程序裝載到內(nèi)存中,系統(tǒng)為它分配資源才能運行,而這種執(zhí)行的程序就稱之為進(jìn)程。程序和進(jìn)程的區(qū)別就在于:程序是指令的集合,它是進(jìn)程運行的靜態(tài)描述文本;進(jìn)程是程序的一次執(zhí)行活動,屬于動態(tài)概念。
在多道編程中,我們允許多個程序同時加載到內(nèi)存中,在操作系統(tǒng)的調(diào)度下,可以實現(xiàn)并發(fā)地執(zhí)行。這是這樣的設(shè)計,大大提高了CPU的利用率。進(jìn)程的出現(xiàn)讓每個用戶感覺到自己獨享CPU,因此,進(jìn)程就是為了在CPU上實現(xiàn)多道編程而提出的。
有了進(jìn)程為什么還要線程?
進(jìn)程有很多優(yōu)點,它提供了多道編程,讓我們感覺我們每個人都擁有自己的CPU和其他資源,可以提高計算機(jī)的利用率。很多人就不理解了,既然進(jìn)程這么優(yōu)秀,為什么還要線程呢?其實,仔細(xì)觀察就會發(fā)現(xiàn)進(jìn)程還是有很多缺陷的,主要體現(xiàn)在兩點上:
-
進(jìn)程只能在一個時間干一件事,如果想同時干兩件事或多件事,進(jìn)程就無能為力了。
-
進(jìn)程在執(zhí)行的過程中如果阻塞,例如等待輸入,整個進(jìn)程就會掛起,即使進(jìn)程中有些工作不依賴于輸入的數(shù)據(jù),也將無法執(zhí)行。
例如,我們在使用qq聊天, qq做為一個獨立進(jìn)程如果同一時間只能干一件事,那他如何實現(xiàn)在同一時刻 即能監(jiān)聽鍵盤輸入、又能監(jiān)聽其它人給你發(fā)的消息、同時還能把別人發(fā)的消息顯示在屏幕上呢?你會說,操作系統(tǒng)不是有分時么?但我的親,分時是指在不同進(jìn)程間的分時呀, 即操作系統(tǒng)處理一會你的qq任務(wù),又切換到word文檔任務(wù)上了,每個cpu時間片分給你的qq程序時,你的qq還是只能同時干一件事呀。
再直白一點, 一個操作系統(tǒng)就像是一個工廠,工廠里面有很多個生產(chǎn)車間,不同的車間生產(chǎn)不同的產(chǎn)品,每個車間就相當(dāng)于一個進(jìn)程,且你的工廠又窮,供電不足,同一時間只能給一個車間供電,為了能讓所有車間都能同時生產(chǎn),你的工廠的電工只能給不同的車間分時供電,但是輪到你的qq車間時,發(fā)現(xiàn)只有一個干活的工人,結(jié)果生產(chǎn)效率極低,為了解決這個問題,應(yīng)該怎么辦呢?。。。。沒錯,你肯定想到了,就是多加幾個工人,讓幾個人工人并行工作,這每個工人,就是線程!
?
什么是線程(thread)?
線程是操作系統(tǒng)能夠進(jìn)行運算調(diào)度的最小單位。它被包含在進(jìn)程之中,是進(jìn)程中的實際運作單位。一條線程指的是進(jìn)程中一個單一順序的控制流,一個進(jìn)程中可以并發(fā)多個線程,每條線程并行執(zhí)行不同的任務(wù)
A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.
Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.
If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.
Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.
On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.
Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.
Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).
?
進(jìn)程與線程的區(qū)別?
?
Python GIL(Global Interpreter Lock)
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
上面的核心意思就是,無論你啟多少個線程,你有多少個cpu, Python在執(zhí)行的時候會淡定的在同一時刻只允許一個線程運行,擦。。。,那這還叫什么多線程呀?莫如此早的下結(jié)結(jié)論,聽我現(xiàn)場講。
?
?
首先需要明確的一點是GIL并不是Python的特性,它是在實現(xiàn)Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標(biāo)準(zhǔn),但是可以用不同的編譯器來編譯成可執(zhí)行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執(zhí)行環(huán)境來執(zhí)行。像其中的JPython就沒有GIL。然而因為CPython是大部分環(huán)境下默認(rèn)的Python執(zhí)行環(huán)境。所以在很多人的概念里CPython就是Python,也就想當(dāng)然的把GIL歸結(jié)為Python語言的缺陷。所以這里要先明確一點:GIL并不是Python的特性,Python完全可以不依賴于GIL
這篇文章透徹的剖析了GIL對python多線程的影響,強(qiáng)烈推薦看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf?
?
Python threading模塊
線程有2種調(diào)用方式,如下:
直接調(diào)用
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | import?threading import?time def?sayhi(num):?#定義每個線程要運行的函數(shù) ????print("running on number:%s"?%num) ????time.sleep(3) if?__name__?==?'__main__': ????t1?=?threading.Thread(target=sayhi,args=(1,))?#生成一個線程實例 ????t2?=?threading.Thread(target=sayhi,args=(2,))?#生成另一個線程實例 ????t1.start()?#啟動線程 ????t2.start()?#啟動另一個線程 ????print(t1.getName())?#獲取線程名 ????print(t2.getName()) |
繼承式調(diào)用
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | import?threading import?time class?MyThread(threading.Thread): ????def?__init__(self,num): ????????threading.Thread.__init__(self) ????????self.num?=?num ????def?run(self):#定義每個線程要運行的函數(shù) ????????print("running on number:%s"?%self.num) ????????time.sleep(3) if?__name__?==?'__main__': ????t1?=?MyThread(1) ????t2?=?MyThread(2) ????t1.start() ????t2.start() |
Join & Daemon
Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it's okay to kill them off once the other, non-daemon, threads have exited.
Without daemon threads, you'd have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | #_*_coding:utf-8_*_ __author__?=?'Alex Li' import?time import?threading def?run(n): ????print('[%s]------running----\n'?%?n) ????time.sleep(2) ????print('--done--') def?main(): ????for?i?in?range(5): ????????t?=?threading.Thread(target=run,args=[i,]) ????????t.start() ????????t.join(1) ????????print('starting thread', t.getName()) m?=?threading.Thread(target=main,args=[]) m.setDaemon(True)?#將main線程設(shè)置為Daemon線程,它做為程序主線程的守護(hù)線程,當(dāng)主線程退出時,m線程也會退出,由m啟動的其它子線程會同時退出,不管是否執(zhí)行完任務(wù) m.start() m.join(timeout=2) print("---main thread done----") |
Note:Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an?Event.
?
?
線程鎖(互斥鎖Mutex)
一個進(jìn)程下可以啟動多個線程,多個線程共享父進(jìn)程的內(nèi)存空間,也就意味著每個線程可以訪問同一份數(shù)據(jù),此時,如果2個線程同時要修改同一份數(shù)據(jù),會出現(xiàn)什么狀況?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | import?time import?threading def?addNum(): ????global?num?#在每個線程中都獲取這個全局變量 ????print('--get num:',num ) ????time.sleep(1) ????num??-=1?#對此公共變量進(jìn)行-1操作 num?=?100??#設(shè)定一個共享變量 thread_list?=?[] for?i?in?range(100): ????t?=?threading.Thread(target=addNum) ????t.start() ????thread_list.append(t) for?t?in?thread_list:?#等待所有線程執(zhí)行完畢? ????t.join() print('final num:', num ) |
正常來講,這個num結(jié)果應(yīng)該是0, 但在python 2.7上多運行幾次,會發(fā)現(xiàn),最后打印出來的num結(jié)果不總是0,為什么每次運行的結(jié)果不一樣呢? 哈,很簡單,假設(shè)你有A,B兩個線程,此時都 要對num 進(jìn)行減1操作, 由于2個線程是并發(fā)同時運行的,所以2個線程很有可能同時拿走了num=100這個初始變量交給cpu去運算,當(dāng)A線程去處完的結(jié)果是99,但此時B線程運算完的結(jié)果也是99,兩個線程同時CPU運算的結(jié)果再賦值給num變量后,結(jié)果就都是99。那怎么辦呢? 很簡單,每個線程在要修改公共數(shù)據(jù)時,為了避免自己在還沒改完的時候別人也來修改此數(shù)據(jù),可以給這個數(shù)據(jù)加一把鎖, 這樣其它線程想修改此數(shù)據(jù)時就必須等待你修改完畢并把鎖釋放掉后才能再訪問此數(shù)據(jù)。?
*注:不要在3.x上運行,不知為什么,3.x上的結(jié)果總是正確的,可能是自動加了鎖
加鎖版本
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | import?time import?threading def?addNum(): ????global?num?#在每個線程中都獲取這個全局變量 ????print('--get num:',num ) ????time.sleep(1) ????lock.acquire()?#修改數(shù)據(jù)前加鎖 ????num??-=1?#對此公共變量進(jìn)行-1操作 ????lock.release()?#修改后釋放 num?=?100??#設(shè)定一個共享變量 thread_list?=?[] lock?=?threading.Lock()?#生成全局鎖 for?i?in?range(100): ????t?=?threading.Thread(target=addNum) ????t.start() ????thread_list.append(t) for?t?in?thread_list:?#等待所有線程執(zhí)行完畢 ????t.join() print('final num:', num ) |
?
GIL VS Lock?
機(jī)智的同學(xué)可能會問到這個問題,就是既然你之前說過了,Python已經(jīng)有一個GIL來保證同一時間只能有一個線程來執(zhí)行了,為什么這里還需要lock? 注意啦,這里的lock是用戶級的lock,跟那個GIL沒關(guān)系 ,具體我們通過下圖來看一下+配合我現(xiàn)場講給大家,就明白了。
那你又問了, 既然用戶程序已經(jīng)自己有鎖了,那為什么C python還需要GIL呢?加入GIL主要的原因是為了降低程序的開發(fā)的復(fù)雜度,比如現(xiàn)在的你寫python不需要關(guān)心內(nèi)存回收的問題,因為Python解釋器幫你自動定期進(jìn)行內(nèi)存回收,你可以理解為python解釋器里有一個獨立的線程,每過一段時間它起wake up做一次全局輪詢看看哪些內(nèi)存數(shù)據(jù)是可以被清空的,此時你自己的程序 里的線程和 py解釋器自己的線程是并發(fā)運行的,假設(shè)你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程中的clearing時刻,可能一個其它線程正好又重新給這個還沒來及得清空的內(nèi)存空間賦值了,結(jié)果就有可能新賦值的數(shù)據(jù)被刪除了,為了解決類似的問題,python解釋器簡單粗暴的加了鎖,即當(dāng)一個線程運行時,其它人都不能動,這樣就解決了上述的問題, ?這可以說是Python早期版本的遺留問題。
?
?
RLock(遞歸鎖)
說白了就是在一個大鎖中還要再包含子鎖
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | import?threading,time def?run1(): ????print("grab the first part data") ????lock.acquire() ????global?num ????num?+=1 ????lock.release() ????return?num def?run2(): ????print("grab the second part data") ????lock.acquire() ????global??num2 ????num2+=1 ????lock.release() ????return?num2 def?run3(): ????lock.acquire() ????res?=?run1() ????print('--------between run1 and run2-----') ????res2?=?run2() ????lock.release() ????print(res,res2) if?__name__?==?'__main__': ????num,num2?=?0,0 ????lock?=?threading.RLock() ????for?i?in?range(10): ????????t?=?threading.Thread(target=run3) ????????t.start() while?threading.active_count() !=?1: ????print(threading.active_count()) else: ????print('----all threads done---') ????print(num,num2) |
Semaphore(信號量)
互斥鎖 同時只允許一個線程更改數(shù)據(jù),而Semaphore是同時允許一定數(shù)量的線程更改數(shù)據(jù) ,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進(jìn)去。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | import?threading,time def?run(n): ????semaphore.acquire() ????time.sleep(1) ????print("run the thread: %s\n"?%n) ????semaphore.release() if?__name__?==?'__main__': ????num=?0 ????semaphore??=?threading.BoundedSemaphore(5)?#最多允許5個線程同時運行 ????for?i?in?range(20): ????????t?=?threading.Thread(target=run,args=(i,)) ????????t.start() while?threading.active_count() !=?1: ????pass?#print threading.active_count() else: ????print('----all threads done---') ????print(num) |
?
Timer ?
This class represents an action that should be run only after a certain amount of time has passed?
Timers are started, as with threads, by calling their?start()?method. The timer can be stopped (before its action has begun) by calling thecancel()?method. The interval the timer will wait before executing its action may not be exactly the same as the interval specified by the user.
| 1 2 3 4 5 | def?hello(): ????print("hello, world") t?=?Timer(30.0, hello) t.start()??# after 30 seconds, "hello, world" will be printed |
?
Events
An event is a simple synchronization object;
the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.
event = threading.Event()
# a client thread can wait for the flag to be set
event.wait()
# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.
通過Event來實現(xiàn)兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程做交通指揮燈,生成幾個線程做車輛,車輛行駛按紅燈停,綠燈行的規(guī)則。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | import?threading,time import?random def?light(): ????if?not?event.isSet(): ????????event.set()?#wait就不阻塞 #綠燈狀態(tài) ????count?=?0 ????while?True: ????????if?count <?10: ????????????print('\033[42;1m--green light on---\033[0m') ????????elif?count <13: ????????????print('\033[43;1m--yellow light on---\033[0m') ????????elif?count <20: ????????????if?event.isSet(): ????????????????event.clear() ????????????print('\033[41;1m--red light on---\033[0m') ????????else: ????????????count?=?0 ????????????event.set()?#打開綠燈 ????????time.sleep(1) ????????count?+=1 def?car(n): ????while?1: ????????time.sleep(random.randrange(10)) ????????if??event.isSet():?#綠燈 ????????????print("car [%s] is running.."?%?n) ????????else: ????????????print("car [%s] is waiting for the red light.."?%n) if?__name__?==?'__main__': ????event?=?threading.Event() ????Light?=?threading.Thread(target=light) ????Light.start() ????for?i?in?range(3): ????????t?=?threading.Thread(target=car,args=(i,)) ????????t.start() |
這里還有一個event使用的例子,員工進(jìn)公司門要刷卡, 我們這里設(shè)置一個線程是“門”, 再設(shè)置幾個線程為“員工”,員工看到門沒打開,就刷卡,刷完卡,門開了,員工就可以通過。
?View Code?
queue隊列?
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
class?queue.Queue(maxsize=0) #先入先出class?queue.LifoQueue(maxsize=0) #last in fisrt out?class?queue.PriorityQueue(maxsize=0) #存儲數(shù)據(jù)時可設(shè)置優(yōu)先級的隊列Constructor for a priority queue.?maxsize?is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If?maxsize?is less than or equal to zero, the queue size is infinite.
The lowest valued entries are retrieved first (the lowest valued entry is the one returned by?sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form:?(priority_number,?data).
Exception raised when non-blocking?get()?(or?get_nowait()) is called on a?Queue?object which is empty.
Exception raised when non-blocking?put()?(or?put_nowait()) is called on a?Queue?object which is full.
Put?item?into the queue. If optional args?block?is true and?timeout?is None (the default), block if necessary until a free slot is available. If?timeout?is a positive number, it blocks at most?timeout?seconds and raises the?Full?exception if no free slot was available within that time. Otherwise (block?is false), put an item on the queue if a free slot is immediately available, else raise the?Full?exception (timeout?is ignored in that case).
Equivalent to?put(item,?False).
Remove and return an item from the queue. If optional args?block?is true and?timeout?is None (the default), block if necessary until an item is available. If?timeout?is a positive number, it blocks at most?timeout?seconds and raises the?Empty?exception if no item was available within that time. Otherwise (block?is false), return an item if one is immediately available, else raise the?Empty?exception (timeout?is ignored in that case).
Equivalent to?get(False).
Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
Queue.task_done()Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each?get()?used to fetch a task, a subsequent call to?task_done()?tells the queue that the processing on the task is complete.
If a?join()?is currently blocking, it will resume when all items have been processed (meaning that a?task_done()?call was received for every item that had been?put()?into the queue).
Raises a?ValueError?if called more times than there were items placed in the queue.
生產(chǎn)者消費者模型
在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。
為什么要使用生產(chǎn)者和消費者模式
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者。為了解決這個問題于是引入了生產(chǎn)者和消費者模式。
什么是生產(chǎn)者消費者模式
生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強(qiáng)耦合問題。生產(chǎn)者和消費者彼此之間不直接通訊,而通過阻塞隊列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。
?
下面來學(xué)習(xí)一個最基本的生產(chǎn)者消費者模型的例子
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | import?threading import?queue def?producer(): ????for?i?in?range(10): ????????q.put("骨頭 %s"?%?i ) ????print("開始等待所有的骨頭被取走...") ????q.join() ????print("所有的骨頭被取完了...") def?consumer(n): ????while?q.qsize() >0: ????????print("%s 取到"?%n? , q.get()) ????????q.task_done()?#告知這個任務(wù)執(zhí)行完了 q?=?queue.Queue() p?=?threading.Thread(target=producer,) p.start() c1?=?consumer("李闖") |
?
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | import?time,random import?queue,threading q?=?queue.Queue() def?Producer(name): ??count?=?0 ??while?count <20: ????time.sleep(random.randrange(3)) ????q.put(count) ????print('Producer %s has produced %s baozi..'?%(name, count)) ????count?+=1 def?Consumer(name): ??count?=?0 ??while?count <20: ????time.sleep(random.randrange(4)) ????if?not?q.empty(): ????????data?=?q.get() ????????print(data) ????????print('\033[32;1mConsumer %s has eat %s baozi...\033[0m'?%(name, data)) ????else: ????????print("-----no baozi anymore----") ????count?+=1 p1?=?threading.Thread(target=Producer, args=('A',)) c1?=?threading.Thread(target=Consumer, args=('B',)) p1.start() c1.start() |
?
多進(jìn)程multiprocessing
multiprocessing?is a package that supports spawning processes using an API similar to the?threading?module. The?multiprocessing?package offers both local and remote concurrency,?effectively side-stepping the?Global Interpreter Lock?by using subprocesses instead of threads. Due to this, the?multiprocessing?module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.
| 1 2 3 4 5 6 7 8 9 10 | from?multiprocessing?import?Process import?time def?f(name): ????time.sleep(2) ????print('hello', name) if?__name__?==?'__main__': ????p?=?Process(target=f, args=('bob',)) ????p.start() ????p.join() |
To show the individual process IDs involved, here is an expanded example:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | from?multiprocessing?import?Process import?os def?info(title): ????print(title) ????print('module name:', __name__) ????print('parent process:', os.getppid()) ????print('process id:', os.getpid()) ????print("\n\n") def?f(name): ????info('\033[31;1mfunction f\033[0m') ????print('hello', name) if?__name__?==?'__main__': ????info('\033[32;1mmain process line\033[0m') ????p?=?Process(target=f, args=('bob',)) ????p.start() ????p.join() |
?
進(jìn)程間通訊
不同進(jìn)程間內(nèi)存是不共享的,要想實現(xiàn)兩個進(jìn)程間的數(shù)據(jù)交換,可以用以下方法:
Queues
使用方法跟threading里的queue差不多
| 1 2 3 4 5 6 7 8 9 10 11 | from?multiprocessing?import?Process, Queue def?f(q): ????q.put([42,?None,?'hello']) if?__name__?==?'__main__': ????q?=?Queue() ????p?=?Process(target=f, args=(q,)) ????p.start() ????print(q.get())????# prints "[42, None, 'hello']" ????p.join() |
Pipes
The?Pipe()?function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
| 1 2 3 4 5 6 7 8 9 10 11 12 | from?multiprocessing?import?Process, Pipe def?f(conn): ????conn.send([42,?None,?'hello']) ????conn.close() if?__name__?==?'__main__': ????parent_conn, child_conn?=?Pipe() ????p?=?Process(target=f, args=(child_conn,)) ????p.start() ????print(parent_conn.recv())???# prints "[42, None, 'hello']" ????p.join() |
The two connection objects returned by?Pipe()?represent the two ends of the pipe. Each connection object has?send()?and?recv()?methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the?same?end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
?
ManagersA manager object returned by?Manager()?controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by?Manager()?will support types?list,?dict,?Namespace,?Lock,?RLock,?Semaphore,?BoundedSemaphore,?Condition,?Event,?Barrier,?Queue,?Value?and?Array. For example,
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | from?multiprocessing?import?Process, Manager def?f(d, l): ????d[1]?=?'1' ????d['2']?=?2 ????d[0.25]?=?None ????l.append(1) ????print(l) if?__name__?==?'__main__': ????with Manager() as manager: ????????d?=?manager.dict() ????????l?=?manager.list(range(5)) ????????p_list?=?[] ????????for?i?in?range(10): ????????????p?=?Process(target=f, args=(d, l)) ????????????p.start() ????????????p_list.append(p) ????????for?res?in?p_list: ????????????res.join() ????????print(d) ????????print(l) |
進(jìn)程同步
Without using the lock output from the different processes is liable to get all mixed up.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | from?multiprocessing?import?Process, Lock def?f(l, i): ????l.acquire() ????try: ????????print('hello world', i) ????finally: ????????l.release() if?__name__?==?'__main__': ????lock?=?Lock() ????for?num?in?range(10): ????????Process(target=f, args=(lock, num)).start() |
?
進(jìn)程池
進(jìn)程池內(nèi)部維護(hù)一個進(jìn)程序列,當(dāng)使用時,則去進(jìn)程池中獲取一個進(jìn)程,如果進(jìn)程池序列中沒有可供使用的進(jìn)進(jìn)程,那么程序就會等待,直到進(jìn)程池中有可用進(jìn)程為止。
進(jìn)程池中有兩個方法:
- apply
- apply_async
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | from??multiprocessing?import?Process,Pool import?time def?Foo(i): ????time.sleep(2) ????return?i+100 def?Bar(arg): ????print('-->exec done:',arg) pool?=?Pool(5) for?i?in?range(10): ????pool.apply_async(func=Foo, args=(i,),callback=Bar) ????#pool.apply(func=Foo, args=(i,)) print('end') pool.close() pool.join()#進(jìn)程池中進(jìn)程執(zhí)行完畢后再關(guān)閉,如果注釋,那么程序直接關(guān)閉。 |
轉(zhuǎn)載于:https://www.cnblogs.com/tlios/p/7637477.html
總結(jié)
以上是生活随笔為你收集整理的Python 09--多线程、进程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux虚拟机安装oracle全过程(
- 下一篇: 编写高质量代码:改善Java的151个建