《七周七并发模型》笔记
《七周七并發模型》筆記
- 1 概述
- 1.1 并發并行
- 1.2 并行架構
- 1.3 并發:不只是多核
- 并發的世界,并發的軟件
- 分布式的世界,分布式的軟件
- 不可預測的世界,容錯性強的軟件
- 復雜的世界,簡單的軟件
- 1.4 七個模型
- 2 線程與鎖
- 2.1 互斥與內存模型
- 哲學家進餐問題
- 2.1 互斥與內存模型
- 2.2 超越內置鎖
- 可中斷的鎖
- 超時
- 交替鎖
- 條件變量
- 原子變量
- Tips:關于重入鎖和原子變量的總結
- 2.3 站在巨人的肩膀上
- 線程池
- 一個完整的程序(生產者-消費者 | 觀察者)
- Tips:
- 3 函數式編程
- 3.1 并行化一個函數式算法
- 3.2 Clojure的reducer框架
- 3.3 利用future模型和promise模型創建一個并發的函數式Web服務
- 4 Clojure之道--分離標識與狀態
- 5 Actor
- 消息和信箱
- 6 通信順序進程
- 7 數據并行
- GPU 編程
- OpenCL 程序:
- 8 Lambda架構
- 9 圓滿結束
- 參考
1 概述
1.1 并發并行
并發是同一時間應對(dealing with)多件事情的能力;
并行是同一時間動手做(doing)多件事情的能力。
1.2 并行架構
- 位級(bit-level)并行
兩個32位數加法,8位計算機必須進行多次8位計算,32位計算機可以一步完成。
由位升級帶來的性能改善是存在瓶頸的。 - 指令級(instruction-level)并行
現代CPU的并行度很高,其中使用的技術包括流水線、亂序執行和猜測執行等。 - 數據級(data)并行
數據級并行(亦稱“單指令多數據”,SIMD)架構,可以并行地在大量數據上施加同一操作。
圖形處理(GPU)中如此應用很多。 - 任務級(task-level)并行
多處理器級并行。多處理器架構最明顯的分類特征是其內存模型(共享內存模型或分布式內存模型)。
對于共享內存的多處理器系統,每個處理器都能訪問整個內存,處理器之間的通信主要通過內存進行,如圖1.1。
對于分布式內存的多處理器系統,每個處理器都有自己的內存,處理器之間的通信主要通過網絡進行。
通過內存通信比通過網絡通信更簡單快速。當處理器個數逐漸增多,共享內存就會遭遇性能瓶頸–需要轉向分布式內存。要開發一個容錯系統,就要使用多臺計算機以規避硬件故障對系統的影響,此時必須借助分布式內存。
1.3 并發:不只是多核
正確使用并發,程序將:及時響應、高效、容錯、簡單。
并發的世界,并發的軟件
世界是并發的,為了與其有效地交互,軟件也應是并發的。
分布式的世界,分布式的軟件
有時,我們要解決地理分布型問題。軟件在多臺計算機上分布式地運行,其本質是并發。
分布式軟件具有容錯性,如同大佬不做同一架飛機。
不可預測的世界,容錯性強的軟件
為了增強軟件的容錯性,并發代碼的關鍵是獨立性和故障檢測。
串行程序的容錯性遠不如并發程序。
復雜的世界,簡單的軟件
用串行方案解決一個并發問題往往需要付出額外的代價,而且解決方案會晦澀難懂。如果能并發,就不需要創建一個復雜的線程來處理問題中的多個任務,只需要用多個簡單的線程分別處理不同的任務即可。十倍圍之勝過以一敵百。
1.4 七個模型
- 線程與鎖
線程與鎖有很多眾所周知的不足,但仍是其他模型的技術基礎,也是很多并發軟件開發的首選。 - 函數式編程
函數式編程消除了可變狀態,所以從根本上是線程安全的,而且易于并行執行。 - Clojure之道–分離標識與狀態
編程語言Clojure是一種指令式編程和函數式編程的混搭方案。 - Actor
該模型適用性很廣,適用于共享內存模型和分布式內存模型,也適應解決地理分布型問題,能提供強大的容錯性。 - 通信順序進程(CSP)
CSP模型與actor模型很相似,兩者都基于消息傳遞。CSP側重于傳遞信息的通道,而actor側重于通道兩端的實體。 - 數據并行
GPU利用了數據級并行,不僅可以圖像處理,也可以進行有限元分析、流體力學計算或其他大量數學計算。 - Lambda架構
大數據時代的到來離不開并行–現在我們只需要增加計算資源,就能具有處理TB級數據的能力。Lambda架構綜合了MapReduce和流式處理的特點,是一種可以處理多種大數據問題的架構。
2 線程與鎖
線程與鎖模型有眾所周知的缺點,不過仍然是并發編程的首選技術,也是其他并發技術的支撐。
線程與鎖模型其實是對底層硬件運行過程的形式化。
2.1 互斥與內存模型
互斥指的是用鎖保證某一時間僅有一個線程可以訪問數據。不過副作用就是會帶來競態條件和死鎖。
在直覺上,編譯器、硬件等都不會改動代碼邏輯。實際在近幾年提升運行效率,尤其引入共享內存架構后,編譯器、硬件等都會通過亂序執行來優化性能。這就是優化的副作用。
讓多線程代碼安全運行的方法只能是讓所有的方法都同步。不過這樣會導致大多數線程頻繁阻塞,最終效率底下,程序失去并發的意義。多把鎖又會帶來死鎖風險。
哲學家進餐問題
- 5位哲學家(五個進程)圍坐圓桌,面對5支筷子(五個臨界資源),對應“思考”和“饑餓”2種狀態。同時進餐時,都拿起左手邊的筷子,都在等待右手邊哲學家放下筷子,這就是死鎖。
- 5只筷子為臨界資源,因此設置5個信號量即可。
- 解決辦法:
1)法一:至多只允許有四位哲學家同時去拿左邊的筷子,最終能保證至少有一位哲學家能夠進餐,并在用餐完畢后能釋放他占用的筷子,從而使別的哲學家能夠進餐;
2)法二:僅當哲學家的左、右兩支筷子可用時,才允許他拿起筷子;
3)法三:規定奇數號哲學家先拿他左邊的筷子,然后再去拿右邊的筷子;而偶數號哲學家則相反。
面對規模較大的程序,使用鎖的地方比較零散,各處都遵守這個順序就變得不太實際。
規模較大的程序常用監聽器模式(listener)來解耦模塊。【觀察者模式】
2.1 互斥與內存模型
2.2 超越內置鎖
- Java內置鎖的限制:
一個線程因為等待內置鎖而進入阻塞之后,就無法中斷該線程了;
嘗試獲取內置鎖時,無法設置超時;獲得內置鎖,必須使用synchronized塊。
這種用法的限制是獲取鎖和釋放鎖的代碼必須嚴格嵌在同一方法中。
ReentrantLock提供了顯式的lock和unlock方法,可以突破上述幾個限制。
可中斷的鎖
使用內置鎖時,由于阻塞的線程無法被中斷,程序不可能從死鎖中恢復。
終止線程的最終手段是讓run()函數返回(可能是通過拋出InterruptedException)。
不過,如果你的線程由于等待內置鎖而陷入死鎖,且不能中斷其等待鎖的狀態,那么要終止死鎖線程就只剩下終止JVM運行這條路了。
這一次Thread.interrupt()可以讓線程終止。
超時
ReentrantLock突破了內置鎖的另一個限制:可以為獲取鎖的操作設置超時時間。
tryLock()相比lock()在獲取鎖失敗時有超時機制。tryLock()避免了無盡地死鎖,不過依然不能避免死鎖。會產生活鎖現象,即所有死鎖線程同時超時,極有可能再次陷入死鎖。該死鎖不會永遠持續,但對資源的爭奪沒有改善。通過設置不同線程的超時時間,可減少同時超時的幾率。
交替鎖
在鏈表中插入一個節點,
方法一是用鎖保護整個鏈表,但是鏈表加鎖時其他使用者無法訪問鏈表。
方法二是只鎖住鏈表的一部分,允許不涉及被鎖部分的其他線程自由訪問鏈表。
條件變量
并發編程經常需要等待某個事件發生。如從隊列刪除元素前需要等待隊列非空、向緩存添加數據前
需要等待緩存有足夠的空間。條件變量就是為這種情況而設計的。
一個條件變量需要與一把鎖關聯,線程在開始等待條件之前必須獲取該鎖。獲鎖后,如果為真,線程解鎖并繼續執行。如果等待的條件不為真,線程會調用await(),來解鎖并阻塞等待。當另一個線程調用signal()或signalAll(),對應的條件可能為真,await()將恢復運行并重新加鎖。
//條件變量解決哲學家就餐問題 public class PhilosopherWithCondition extends Thread{private boolean eating;private PhilosopherWithCondition left,right;private ReentrantLock table;private Condition condition;private Random random;public PhilosopherWithCondition(ReentrantLock table){eating = false;this.table = table;condition = table.newCondition();random = new Random();}public void setLeft(PhilosopherWithCondition left){this.left = left;}public void setRight(PhilosopherWithCondition right){this.right = right;}@Overridepublic void run() {try {while (true){think();eat();}}catch (InterruptedException e){}}private void think()throws InterruptedException{table.lock();try {eating = false;left.condition.signal();right.condition.signal();}finally {table.unlock();}Thread.sleep(1000);}private void eat() throws InterruptedException{table.lock();try{while (left.eating || right.eating){condition.await();}eating = true;}finally {table.unlock();}Thread.sleep(1000);} }只使用一把鎖(table),且沒有Chopsticks類。僅當哲學家左右鄰居都沒有進餐時,才能進餐。
原子變量
與鎖相比,使用原子變量有諸多好處。不會引發死鎖,不會出現因鎖操作不對產生同步問題。
原子變量是無鎖(lock-free)非阻塞(non-blocking)算法的基礎,該算法可不用鎖和阻塞來達到同步的目的。
Java變量標記volatile。這樣可保證變量的讀寫不被亂序執行。隨著JVM的不斷優化,atomic可以代替volatile,
并開銷更小。
Tips:關于重入鎖和原子變量的總結
- 重入鎖可以在線程獲取鎖時中斷它
- 設置線程獲取鎖的超時時間
- 按照任意順序獲取和釋放鎖
- 用條件變量等待某個條件為真
- 使用原子變量來避免鎖的使用
2.3 站在巨人的肩膀上
java.util.concurrent包不僅提供了比內置鎖更好用的鎖,還提供了一些通用、高效、bug少的并發數據結構和工具。
線程池
影響線程池最優大小的因素有很多個,例如硬件的性能,線程任務是CPU密集型還是IO密集型,是否有其他任務在同時運行,還有很多其他原因也會產生影響。存在一個經驗法則:對于一個CPU密集型的任務,線程池大小應該接近于可用核個數,對于IO密集型的任務,線程池應該設置的更大一些。當然,最佳的方法還是建立一個真實環境下的壓力測試來衡量性能。
public class EchoServer {public static void main(String[] args) throws IOException {class ConnectionHandler implements Runnable {InputStream in; OutputStream out;ConnectionHandler(Socket socket) throws IOException {in = socket.getInputStream();out = socket.getOutputStream();}public void run() {try {int n;byte[] buffer = new byte[1024];while((n = in.read(buffer)) != -1) {out.write(buffer, 0, n);out.flush();}} catch (IOException e) {}}}ServerSocket server = new ServerSocket(4567);while (true) {Socket socket = server.accept();Thread handler = new Thread(new ConnectionHandler(socket));handler.start();}} }該代碼采用的設計是,接受一個連接請求并創建一個處理線程。
該設計隱患一:每個連接都花費一個線程代價;
該設計隱患二:請求連接速度高于處理速度時,線程數量激增,服務器將停止服務甚至崩潰。
可用線程池解決。
一個完整的程序(生產者-消費者 | 觀察者)
查找Wikipedia上出現頻率最高的詞,先下載XML dump文件,然后寫一個程序解析它們計算出詞頻。
public class WordCount {private static final HashMap<String, Integer> counts = new HashMap<String, Integer>();public static void main(String[] args) throws Exception {long start = System.currentTimeMillis();Iterable<Page> pages = new Pages(100000, "enwiki.xml");for(Page page: pages) {Iterable<String> words = new Words(page.getText());for (String word: words)countWord(word);}long end = System.currentTimeMillis();System.out.println("Elapsed time: " + (end - start) + "ms");// for (Map.Entry<String, Integer> e: counts.entrySet()) {// System.out.println(e);// }}private static void countWord(String word) {Integer currentCount = counts.get(word);if (currentCount == null)counts.put(word, 1);elsecounts.put(word, currentCount + 1);} }主循環的每一次循環都完成了兩個任務–首先解析XML并構造一個Page,然后“消費”這個Page,對Page中的內容統計詞頻。這是經典的生產者-消費者模式。相比只用一個線程自產自銷,我們可以創建兩個線程:一個生產者和一個消費者。Java.util.concurrent包中的ArrayBlockingQueue是一個并發隊列,非常適應實現生產者-消費者模式。其提供了高效的并發方法put()和take(),這些方法會必要時阻塞:當對一個空隊列調用take()時,程序會阻塞知道隊列變為非空;當對一個滿隊列調用put()時,程序會阻塞直到隊列有足夠空間。相對之前由105s提升到95s。
class Parser implements Runnable {private BlockingQueue<Page> queue;public Parser(BlockingQueue<Page> queue) {this.queue = queue;}public void run() {try {Iterable<Page> pages = new Pages(100000, "enwiki.xml");for (Page page: pages)queue.put(page);} catch (Exception e) { e.printStackTrace(); }} }class Counter implements Runnable {private BlockingQueue<Page> queue;private Map<String, Integer> counts;public Counter(BlockingQueue<Page> queue,Map<String, Integer> counts) {this.queue = queue;this.counts = counts;}public void run() {try {while(true) {Page page = queue.take();if (page.isPoisonPill())break;Iterable<String> words = new Words(page.getText());for (String word: words)countWord(word);}} catch (Exception e) { e.printStackTrace(); }}private void countWord(String word) {Integer currentCount = counts.get(word);if (currentCount == null)counts.put(word, 1);elsecounts.put(word, currentCount + 1);} }public class WordCount {public static void main(String[] args) throws Exception {ArrayBlockingQueue<Page> queue = new ArrayBlockingQueue<Page>(100);HashMap<String, Integer> counts = new HashMap<String, Integer>();Thread counter = new Thread(new Counter(queue, counts));Thread parser = new Thread(new Parser(queue));long start = System.currentTimeMillis();counter.start();parser.start();parser.join();queue.put(new PoisonPill());counter.join();long end = System.currentTimeMillis();System.out.println("Elapsed time: " + (end - start) + "ms");// for (Map.Entry<String, Integer> e: counts.entrySet()) {// System.out.println(e);// }} }
過多的線程嘗試同時使用一個共享資源會產生過渡競爭。如果使用HashMap,HashMap不提供原子的讀-改-寫的方法。最終統計時間會更長。如果使用ConcurrentHashMap,ConcurrentHashMap提供了原子的讀-改-寫方法,還使用了更高級的并發訪問(鎖分段(lock striping))技術。最終效果較好。
Tips:
- 使用線程池,而不是直接創建線程;
- 使用CopyOnWriteArrayList讓監聽器相關的代碼更簡單高效;
- 使用ArrayBlockingQueue讓生產者和消費者之間高效協作;
- ConcurrentHashMap提供了更好的并發訪問。
3 函數式編程
- 函數式編程與命令式編程不同。命令式編程的代碼由一系列改變全局狀態的語句構成,而函數式編程則是將計算過程抽象成表達式求值。這些表達式由純數學函數構成,而這些數學函數是第一類對象并且沒有副作用。由于沒有副作用,函數式編程可以更容易做到線程安全,因此特別適合于并發編程。
- 函數式編程沒有可變狀態,所以不會遇到由共享可變狀態帶來的種種問題。
- Clojure是動態類型語言,作為Ruby或Python程序員會有接觸。
3.1 并行化一個函數式算法
可變狀態的風險
- 隱藏的可變狀態 SimpleDateFormat
- 逃逸的可變狀態
java示例:accumulator是可變的,因此這段代碼不是函數式的
public int sum(int[] numbers){int accumulator = 0;for(int n : numbers){accumulator += n;}return accumulator; }clojure示例:
//方式一 (defn recursive-sum [numbers](if (empty? numbers)0(+ (first numbers) (recursive-sum (rest numbers)))) )user=> (recursive-sum [1,2,3]) 6方式二 (defn reduce-sum [numbers](reduce (fn [acc x] (+ acc x)) 0 numbers) ) user=> (reduce-sum [1,2,3]) 6方式三: (defn sum [numbers](reduce + numbers) ) user=> (sum [1,2,3]) 63.2 Clojure的reducer框架
3.3 利用future模型和promise模型創建一個并發的函數式Web服務
4 Clojure之道–分離標識與狀態
混合函數式編程和可變狀態,平衡兩者的優點,成為并發編程的利器。
Clojure 精心設計了用于并發的語義,從而保留了共享可變狀態。
- 優點:持久化數據結構將可變量的表示與狀態分離開,解決了使用鎖的方案的大部分缺點。
- 缺點:不支持分布式編程。
5 Actor
Actor 模型保留了可變狀態,只是不進行共享。Actor 對象是一個比線程還要輕量級的“進程”,通常不需要依賴線程池技術。
消息和信箱
- 異步地發送消息
消息發送到信箱,只需要關心發送消息時的并發問題,而不需要關心處理消息時的并發問題 - 函數是第一類對象
和數據相同:可以綁定到變量上,可以作為函數的參數
6 通信順序進程
7 數據并行
基于圖形處理器的通用計算(General-Purpose computing on the GPU)
GPU 編程
- 流水線
- 多 ALU
OpenCL 程序:
- 創建上下文,獲取設備信息,創建命令隊列,內核和命令隊列都運行在上下文中
- 編譯內核,將內核編譯成可以在設備上運行的代碼
- 創建緩沖區,內核使用的都是緩沖區中的數據,并將數據讀入到輸入緩沖區
- 設置內核參數,執行工作項,讓每個工作項都運行一次內核程序
- 獲取結果,并清理現場
8 Lambda架構
9 圓滿結束
參考
1、《七周七并發模型》Paul Butcher
2、經典的進程同步問題-----哲學家進餐問題詳解
3、七周七并發模型源碼
4、七周七并發讀書筆記 第三章 函數式編程
5、七周七并發之函數式編程
6、讀書筆記:《七周七并發模型》
總結
以上是生活随笔為你收集整理的《七周七并发模型》笔记的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: sudo spctl --master-
- 下一篇: Python爬虫之路-打码平台的使用