Java 下实现锁无关数据结构--转载
介紹
通常在一個多線程環境下,我們需要共享某些數據,但為了避免競爭條件引致數據出現不一致的情況,某些代碼段需要變成原子操作去執行。這時,我們便需要利用各種同步機制如互斥(Mutex)去為這些代碼段加鎖,讓某一線程可以獨占共享數據,避免競爭條件,確保數據一致性。但可惜的是,這屬于阻塞性同步,所有其他線程唯一可以做的就是等待。基于鎖(Lock based)的多線程設計更可能引發死鎖、優先級倒置、饑餓等情況,令到一些線程無法繼續其進度。
鎖無關(Lock free)算法,顧名思義,即不牽涉鎖的使用。這類算法可以在不使用鎖的情況下同步各個線程。對比基于鎖的多線程設計,鎖無關算法有以下優勢:
- 對死鎖、優先級倒置等問題免疫:它屬于非阻塞性同步,因為它不使用鎖來協調各個線程,所以對死鎖、優先級倒置等由鎖引起的問題免疫;
- 保證程序的整體進度:由于鎖無關算法避免了死鎖等情況出現,所以它能確保線程是在運行當中,從而確保程序的整體進度;
- 性能理想:因為不涉及使用鎖,所以在普遍的負載環境下,使用鎖無關算法可以得到理想的性能提升。
自 JDK 1.5 推出之后,當中的?java.util.concurrent.atomic?的一組類為實現鎖無關算法提供了重要的基礎。本文介紹如何將鎖無關算法應用到基本的數據結構中,去避免競爭條件,允許多個線程同時存取和使用集合中的共享數據。如果一個數據結構本身并非是線程安全的,一旦在多線程環境下使用這個數據結構,必須施加某種同步機制,否則很可能會出現競爭條件。我們即將設計的鎖無關數據結構是線程安全的,所以使用時無需再編寫額外代碼去確保競爭條件不會出現。
數據結構的設計
本文會由淺入深,先提出鎖無關棧(Stack)的實現方法,為讀者提供必須的基礎知識,棧是一個先入后出(Last in first out)的基本數據結構。當讀者掌握必要的技術之后,我們便會著手設計相對復雜的鏈表(Linked List)數據結構,鏈表是很多其他數據結構的基礎組成部分。不過,對比起棧,鏈表可以面對更棘手的線程同步問題。
在開始設計之前,我們需要理解一個十分重要的原語?Compare-and-swap (CAS)?,Herlihy 證明了?CAS?是實現鎖無關數據結構的通用原語,?CAS可以原子地比較一個內存位置的內容及一個期望值,如果兩者相同,則用一個指定值取替這個內存位罝里的內容,并且提供結果指示這個操作是否成功。很多現代的處理器已經提供了?CAS?的硬件實現,例如在 x86 架構下的?CMPXCHG8?指令。而在 Java 下,位于java.util.concurrent.atomic?內的?AtomicReference<V>?類亦提供了?CAS?原語的實現,并且有很多其他的擴展功能。?CAS?操作將會是稍后實現的鎖無關數據算法無可缺少的指令。
棧
棧能以數組或者鏈表作為底下的儲存結構,雖然采取鏈表為基礎的實現方式會占用多一點空間去儲存代表元素的節點,但卻可避免處理數組溢出的問題。故此我們將以鏈表作為棧的基礎。
首先,我們分析一下一個非線程安全的版本。為了清楚表達和集中于文章的主題,代碼沒有包含對異常及不正當操作的處理,讀者請留意。它的代碼如下:
清單 1. 非線程安全的棧實現
class Node<T> { Node<T> next; T value; public Node(T value, Node<T> next) { this.next = next; this.value = value; } } public class Stack<T> { Node<T> top; public void push(T value) { Node<T> newTop = new Node<T>(value, top); top = newTop; } public T pop() { Node<T> node = top; top = top.next; return node.value; } public T peek() { return top.value; } }數據成員?top?儲存著棧頂的節點,它的類型為?Node<T>?,這是因為我們的棧是基于鏈表的。?Node<T>?代表一個節點,它有兩個數據成員,value?儲存著入棧的元素,而?next?儲存下一個節點。這個類有三個方法,分別是?push?、?pop?和?peek?,它們是基本的棧操作。除了?peek?方法是線程安全之外,其余兩個方法在多線程環境之下都有可能引發競爭條件。
push 方法
讓我們先考慮一下?push?方法,它能將一個元素入棧。調用?push?時,它首先建立一個新的節點,并將?value?數據成員設定為傳入的參數,而next?數據成員則被賦值為當前的棧頂。然后它把?top?數據成員設定成新建立的節點。假設有兩個線程 A 和 B 同時調用?push?方法,線程 A 獲取當前棧頂的節點去建立新的節點(?push?方法代碼第一行),但由于時間片用完,線程 A 暫時掛起。此時,線程 B 獲取當前棧頂的節點去建立新的節點,并把?top?設定成新建立的節點(?push?方法代碼第二行)。然后,線程 A 恢復執行,更新棧頂。當線程 B 對?push?的調用完成后,線程 A 原本獲得的棧頂已經「過期」,因為線程 B 用新的節點取代了原本的棧頂。
pop 方法
至于?pop?方法,它把棧頂的元素彈出。?pop?方法把棧頂暫存在一個本地變量?node?,然后用下一個節點去更新棧頂,最后返回變量?node?的value?數據成員。如果兩個線程同時調用這個方法,可能會引起競爭條件。當一個線程將當前棧頂賦值到變量?node?,并準備用下一個節點更新棧頂時,這個線程掛起。另一個線程亦調用?pop?方法,完成并返回結果。剛剛被掛起的線程恢復執行,但由于棧頂被另一個線程變更了,所以繼續執行的話會引起同步問題。
peek 方法
而?peek?方法只是簡單地返回當前位于棧頂的元素,這個方法是線程安全的,沒有同步問題要解決。
在 Java 要解決?push?和?pop?方法的同步問題,可以用?synchronized?這個關鍵詞,這是基于鎖的解決方案?,F在我們看看鎖無關的解決方案,以下是鎖無關棧實現的代碼:
清單 2. 鎖無關的棧實現
import java.util.concurrent.atomic.*; class Node<T> { Node<T> next; T value; public Node(T value, Node<T> next) { this.next = next; this.value = value; } } public class Stack<T> { AtomicReference<Node<T>> top = new AtomicReference<Node<T>>(); public void push(T value) { boolean sucessful = false; while (!sucessful) { Node<T> oldTop = top.get(); Node<T> newTop = new Node<T>(value, oldTop); sucessful = top.compareAndSet(oldTop, newTop); }; } public T peek() { return top.get().value; } public T pop() { boolean sucessful = false; Node<T> newTop = null; Node<T> oldTop = null; while (!sucessful) { oldTop = top.get(); newTop = oldTop.next; sucessful = top.compareAndSet(oldTop, newTop); } return oldTop.value; } }這個新的實現方式和剛剛的很不同,看似比較復雜。成員數據?top?的類型由?Node<T>?改為?AtomicReference<Node<T>>?,AtomicReference<V>?這個類可以對?top?數據成員施加?CAS?操作,亦即是可以允許?top?原子地和一個期望值比較,兩者相同的話便用一個指定值取代。從上文可知,我們需要解決遇到棧頂「過期」的問題。
push 方法
現在我們先分析新的?push?方法如何處理這個問題,確保競爭條件不會出現。在?while?循環中,通過在?top?數據成員調用AtomicReference.get()?,?oldTop?持有當前棧頂節點,這個棧頂稍后會被取替。變量?newTop?則被初始化為新的節點。最重要的一步,top.compareAndSet(oldTop, newTop)?,它比較?top?和?oldTop?這兩個引用是否相同,去確保?oldTop?持有的棧頂并未「過期」,亦即未被其他線程變更。假如沒有過期,則用?newTop?去更新?top?,使之成為新的棧頂,并返回?boolean?值?true?。否則,?compareAndSet?方法便返回false?,并且令到循環繼續執行,直至成功。因為?compareAndSet?是原子操作,所以可以保證數據一致。
pop 方法
pop?方法把棧頂的元素彈出,它的實現方式和?push?方法十分類同。在?while?循環內,?compareAndSet?檢查棧頂有沒有被其他線程改變,數據一致的話便更新?top?數據成員并把原本棧頂彈出。如果失敗,便重新嘗試,直至成功。
push?和?pop?都沒有使用任何鎖,所以全部線程都不用停下來等待。
鏈表
棧是一個相當簡單的數據結構,要解決的同步問題亦比較直接和容易。但很多環境下,棧并不能滿足我們的需求。我們將介紹鏈表,它有更廣泛的應用范圍。為了保持簡潔,這個鏈表所提供的方法較少。以下是鏈表的非線程安全版本:
清單 3. 非線程安全的鏈表實現
class Node<T> { Node<T> next; T value; public Node(T value, Node<T> next) { this.value = value; this.next = next; } } class LinkedList<T> { Node<T> head; public LinkedList() { head = new Node<T>(null, null); } public void addFirst(T value) { addAfter(head.value, value); } public boolean addAfter(T after, T value) { for (Node<T> node = head; node != null; node = node.next) { if (isEqual(node.value, after)) { Node<T> newNode = new Node<T>(value, node.next); node.next = newNode; return true; } } return false; } public boolean remove(T value) { for (Node<T> node = head; node.next != null; node = node.next) { if (isEqual(node.next.value, value)) { node.next = node.next.next; return true; } } return false; } boolean isEqual(T arg0, T arg1) { if (arg0 == null) { return arg0 == arg1; } else { return arg0.equals(arg1); } } }數據成員?head?是鏈表的頭,它沒有存儲任何元素,而是直接指向第一個元素,這可以令稍后的?remove?方法較易實現。這個鏈表有三個公用方法,其中?addAfter?和?remove?比較重要。
addAfter 方法
先考慮一下?addAfter?方法,這個方法把一個新元素加入到集合內指定元素之后的位置,并返回一個?boolean?值指示元素有沒有被加入到集合中,元素沒有被加入的原因是因為集合內沒有所指定的元素。它首先在一個?for?循環中尋找指定元素的節點,成功發現指定的節點后,便建立一個新節點。這個新節點的?next?數據成員連接到指定節點原本的?next?,指定節點的?next?則連到新節點上。另一方面,?remove?方法尋找指定元素并從集合中移除,并且返回一個?boolean?值指示元素有沒有被移除,返回?false?代表集合中沒有指定的元素。這個方法在一個循環尋找要移除的元素,并且將左右兩邊的元素重新連接。
在多線程環境下,如果兩個線程同時調用?addAfter?或?remove?方法,或者一個線程調用?addAfter?方法而同一時間另一個線程調用?remove?方法均有機會引發競爭條件。
試想像現在鏈表內有三個元素,分別是 A、B 和 C 。如果一個線程準備把一個元素加到 A 之后,它首先確定 A 節點的位置,然后新建一個節點 A1,這個節點的?value?數據成員儲存著新加入的元素,而?next?數據成員則儲存著 B 節點的引用。當這個線程準備把 A 和 A1 通過?next?成員連接起來時,此時線程因為時間片用完而被掛起。另一個線程亦準備在 A 之后加入一個新元素,它建立 A2 節點,并解除 A 和 B 原本的連結,然后依著 A-A2-B 的次序重新連接三個節點,操作完成?,F在,剛剛被掛起的線程恢復執行,并依著 A-A1-B 的次序去重新連接三個節點。這時問題出現,剛剛新加入的 A2 節點遺失了。解決方法是,每一次準備把 A 元素和新建立的節點連接時,檢查 A 節的?next?有否被其他線程改動過,沒有改動過才進行連接,這是通過?CAS?操作原子地進行的。
addAfter 和 remove 的沖突
同時間執行?addAfter?和?remove?亦有可能引起競爭條件。同樣地,現在一個鏈表內有三個元素 A、B 和 C 。當一個線程準備調用?remove?方法從這個集合中移除 B 元素,它首先獲得 A 節點,然后準備通過改變 A 節點的?next?成員,把 A 和 C 互相連接時,這個線程突然被掛起。此時另一個線程調用?addAfter?方法在 B 節點后插入一個新的元素 B2 。插入操作完成后,剛才被掛起的線程恢復執行,并且通過改變?next?成員把 A 和 C 互相連接,完成移除操作。可是,剛剛被加入的 B2 元素則遺失了,因為 A 節點跳過了 B 節點,直接連接著 C 節點。故此,我們要有一個解決方案。 Timothy L. Harris 提供了一個方法,他把整個移除過程分成兩個步驟,邏輯刪除和物理刪除。邏輯刪除并非真正地移除一個節點,而是把要移除的節點標記為已刪除。另一方面,物理刪除則真實從集合左移除一個節點。每次要加入新元素到指定節點之后,都必先檢查該節點有沒有被標記為刪除,沒有的話才把新的節點連接到集合中。這是通過?AtomicMarkableReference<V>?類中的?compareAndSet?方法原子地進行的。
remove 方法
鏈表有可能發生的沖突比較多,另一個問題便是兩個線程同時間執行?remove?方法。這個問題和同時間執行?addAfter?有點類同?,F在假設一個集合內有四個元素 A、B、C 和 D,一個線程調用?remove?方法去移除元素 B 。它首先確定了 A 和 C 的位置,然后準備解除 A 和 B 的連結,再將 A 和 C 連接起來,實際的移除還未實行,這時這個線程被掛起了。另一個線程亦調用?remove?方法移除 C 元素,它解除 B 和 C 的連結,并把 B 和 D 連接起來,移除操作完成。之后剛才的線程恢復運行,繼續執行余下的操作,把 A 和 C 連接起來,這樣之前的移除 C 的操作便受到了破壞。最終鏈表中的元素變成 A-C-D,C 元素沒有被移除。所以,我們?remove?方法需要確定要移除的元素的?next?有沒有被改變。例如移除 B 的時候,檢查 A 的?next?有沒有被其他線程更動,以及有沒有被標記為已經邏輯地刪除。這亦是透過?CAS?操作去完成的。
從上文的各種情況可見,我們必須原子地施加某些檢查機制,確保數據的一致性。我們現在看看解決這些問題的鎖無關鏈表是如何實現的,這些代碼應該和讀者在算法書上看到的很不同。以下是它的代碼:
清單 4. 鎖無關的鏈表實現
import java.util.concurrent.atomic.*; class Node<T> { AtomicMarkableReference<Node<T>> next; T value; public Node(T value, Node<T> next) { this.next = new AtomicMarkableReference<Node<T>>(next, false); this.value = value; } } class LinkedList<T> { AtomicMarkableReference<Node<T>> head; public LinkedList() { Node<T> headNode = new Node<T>(null, null); head = new AtomicMarkableReference<Node<T>>(headNode, false); } public void addFirst(T value) { addAfter(head.getReference().value, value); } public boolean addAfter(T after, T value) { boolean sucessful = false; while (!sucessful) { boolean found = false; for (Node<T> node = head.getReference(); node != null && !isRemoved(node); node = node.next.getReference()) { if (isEqual(node.value, after) && !node.next.isMarked()) { found = true; Node<T> nextNode = node.next.getReference(); Node<T> newNode = new Node<T>(value, nextNode); sucessful = node.next.compareAndSet(nextNode, newNode, false, false); break; } } if (!found) { return false; } } return true; } public boolean remove(T value) { boolean sucessful = false; while (!sucessful) { boolean found = false; for (Node<T> node = head.getReference(), nextNode = node.next.getReference(); nextNode != null; node = nextNode, nextNode = nextNode.next.getReference()) { if (!isRemoved(nextNode) && isEqual(nextNode.value, value)) { found = true; logicallyRemove(nextNode); sucessful = physicallyRemove(node, nextNode); break; } } if (!found) { return false; } } return true; } void logicallyRemove(Node<T> node) { while (!node.next.attemptMark(node.next.getReference(), true)) { } } boolean physicallyRemove(Node<T> leftNode, Node<T> node) { Node<T> rightNode = node; do { rightNode = rightNode.next.getReference(); } while (rightNode != null && isRemoved(rightNode)); return leftNode.next.compareAndSet(node, rightNode, false, false); } boolean isRemoved(Node<T> node) { return node.next.isMarked(); } boolean isEqual(T arg0, T arg1) { if (arg0 == null) { return arg0 == arg1; } else { return arg0.equals(arg1); } } }和之前不同,?Node?類中的?next?成員數據屬于?AtomicMarkableReference<V>?類,不是?Node<T>?,亦不是?AtomicReference<V>?。這是因為我們不但需要在?next?成員進行?CAS?操作,也需要在 next 中加上標記。?AtomicMarkableReference<V>?上的標記是以一個?boolean?表示的。我們會以設定它為?true?來代表一個節點已被邏輯刪除,?false?則代表這個節點未被邏輯刪除。當一個節點被標記為已經邏輯地刪除,它的next?數據成員的標記位會被設定成?boolean?值?true?。另一方面,鏈表中的?head?亦屬?AtomicMarkableReference<V>?這類型,因為我們也需要進行同樣的操作。
addAfter 方法
首先考慮一下?addAfter?方法,?addAfter?方法的設計必須顧慮到兩個線程同時間插入及同時間一個線程插入和一個線程移除的沖突。在一個for?循環中,它遍歷集合去尋找?after?所位于的節點,并通過調用?getReference()?把?after?的下一個節點賦值到?nextNode?變量中。然后,建立一個新的節點去容納新加入的元素,它通過?next?數據成員和?nextNode?變量進行連接。下一步,在?node?變量上調用?compareAndSet?。不同之處在于它不但比較兩個引用是否相同去確保?next?數據成員沒有被其他線程改變過,它亦會比較?boolean?標記位和期望值是否相同。上文提到,?AtomicMarkableReference<V>?類擁有一個標記位,我們利用它去檢查一個節點是否已被邏輯地刪除了。如果我們發現那個節點已被logicallyRemove?方法標記為已經被邏輯地刪除,?compareAndSet?方法便會失敗,并繼續循環尋找下一個符合的節點。若果循環終結后仍未尋找到指定的元素,?addAfter?方法便會返回?false?以表示由于集合內不存在指定的元素,所以元素無法被加入到集合中。?compareAndSet?返回?true?便代表元素插入成功,方法便會返回并結束。
addFirst 方法
addFirst?方法只是簡單地調用?addAfter?方法去把一個新的元素插入到集合的開端。
Remove 方法
remove?方法在一個循環內尋找要移除元素的前一個節點,然后確定這個節點未被邏輯地移除。確定后,它首先調用?logicallyRemove?邏輯地刪除節點,然后調用?physicallyRemove?物理地刪除節點。在?logicallyRemove?中,我們調用?AtomicMarkableReference<V>?中的attemptMark?來設定標記位。由于?attemptMark?可能會失敗,所以要將它放進一個?while?循環中,經過?attemptMark?設定標記的節點代表該節點已被邏輯地刪除。在?physicallyRemove?方法中,它首先檢查鄰近的其他節點是否都已經被標記為邏輯刪除,若果已被標記則順道物理地移除它們。這是通過?compareAndSet?完成,?compareAndSet?會檢查節點是否已被邏輯地刪除,以及上一個節點的?next?成員未被其他線程更改。這樣可以確保兩個線程同時調用?remove?方法,或者分別同時間調用?remove?方法和?addAfter?方法的時候,競爭條件不會發生。
ABA 問題
因為?CAS?操作比較一個內存位置的內容及一個期望值是否相同,但如果一個內存位置的內容由 A 變成 B,再由 B 變成 A,?CAS?仍然會看作兩者相同。不過,一些算法因為需求的關系無法容忍這種行為。當一些內存位置被重用的時候,這個問題便可能會發生。在沒有垃圾回收機制的環境下,ABA 問題需要一些機制例如標記去解決。但由于 JVM 會為我們處理內存管理的問題,故此以上的實現足以避免 ABA 問題的出現。
結束語
以往很多的鎖無關數據結構都以 Immutable object 的方式去達致線程安全,這很像 Java 中的?String?,但因為涉及過多的復制操作,令性能低下。但經過十多年,鎖無關數據結構已發展得十分成熟,性能并不遜色于傳統的實現方式。編寫鎖無關算法是十分困難的,但因為數據結構是經常被重用的部分,首先把這個概念應用到數據結構中,可以輕易讓程序進入鎖無關的世界,體驗它所帶來的好處。
原文:http://www.ibm.com/developerworks/cn/java/j-lo-lockfree/
轉載于:https://www.cnblogs.com/davidwang456/p/4065700.html
總結
以上是生活随笔為你收集整理的Java 下实现锁无关数据结构--转载的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Understanding Extens
- 下一篇: 通过扩展RandomAccessFile