Java Review - 并发编程_ConcurrentLinkedQueue原理源码剖析
文章目錄
- 概述
- ConcurrentLinkedQueue
- 核心方法&源碼解讀
- offer
- add
- poll
- peek
- size
- remove
- contains
- 總結(jié)
概述
JDK中提供了一系列場景的并發(fā)安全隊列。總的來說,按照實現(xiàn)方式的不同可分為阻塞隊列和非阻塞隊列,
- 阻塞隊列使用鎖實現(xiàn)
- 非阻塞隊列則使用CAS非阻塞算法實現(xiàn)
ConcurrentLinkedQueue
ConcurrentLinkedQueue是線程安全的無界非阻塞隊列,其底層數(shù)據(jù)結(jié)構(gòu)使用單向鏈表實現(xiàn),對于入隊和出隊操作使用CAS來實現(xiàn)線程安全。
【類圖】
ConcurrentLinkedQueue內(nèi)部的隊列使用單向鏈表方式實現(xiàn),
其中有兩個volatile類型的Node節(jié)點分別用來存放隊列的首、尾節(jié)點。
從下面的無參構(gòu)造函數(shù)可知,默認(rèn)頭、尾節(jié)點都是指向item為null的哨兵節(jié)點。 新元素會被插入隊列末尾,出隊時從隊列頭部獲取一個元素。
在Node節(jié)點內(nèi)部則維護(hù)一個使用volatile修飾的變量item,用來存放節(jié)點的值;next用來存放鏈表的下一個節(jié)點,從而鏈接為一個單向無界鏈表。其內(nèi)部則使用UNSafe工具類提供的CAS算法來保證出入隊時操作鏈表的原子性。
核心方法&源碼解讀
下面我們介紹ConcurrentLinkedQueue的幾個主要方法的實現(xiàn)原理。
offer
在鏈表末尾添加一個元素
/*** Inserts the specified element at the tail of this queue.* As the queue is unbounded, this method will never return {@code false}.** @return {@code true} (as specified by {@link Queue#offer})* @throws NullPointerException if the specified element is null*/ public boolean offer(E e) {//1 e為null則拋出空指針異常checkNotNull(e);//2 構(gòu)造Node節(jié)點構(gòu)造函數(shù)內(nèi)部調(diào)用unsafe.putObject,后面統(tǒng)一講final Node<E> newNode = new Node<E>(e);//3 從尾節(jié)點插入for (Node<E> t = tail, p = t;;) {Node<E> q = p.next;// 4 如果q=null說明p是尾節(jié)點則插入if (q == null) {// 5 使用cas設(shè)置p節(jié)點的next節(jié)點 if (p.casNext(null, newNode)) {// 6 cas成功說明新增節(jié)點已經(jīng)被放入鏈表,然后設(shè)置當(dāng)前尾節(jié)點(包含head,1,3,5.。。個節(jié)點為尾節(jié)點)if (p != t) // hop two nodes at a timecasTail(t, newNode); // Failure is OK.return true;}// Lost CAS race to another thread; re-read next}else if (p == q)// 7 //多線程操作時候,由于poll時候會把老的head變?yōu)樽砸?#xff0c;然后head的next變?yōu)樾耯ead,所以這里需要//重新找新的head,因為新的head后面的節(jié)點才是激活的節(jié)點p = (t != (t = tail)) ? t : head;else// 8 尋找尾節(jié)點 p = (p != t && t != (t = tail)) ? t : q;} }- 首先看當(dāng)一個線程調(diào)用offer(item)時的情況。首先代碼(1)對傳參進(jìn)行空檢查,如果為null則拋出NPE異常,否則執(zhí)行代碼(2)并使用item作為構(gòu)造函數(shù)參數(shù)創(chuàng)建一個新的節(jié)點,然后代碼(3)從隊列尾部節(jié)點開始循環(huán),打算從隊列尾部添加元素,當(dāng)執(zhí)行到代碼(4)時隊列狀態(tài)如下所示。
這時候節(jié)點p、t、head、tail同時指向了item為null的哨兵節(jié)點,由于哨兵節(jié)點的next節(jié)點為null,所以這里q也指向null。
- q==null則執(zhí)行代碼(5),通過CAS原子操作判斷p節(jié)點的next節(jié)點是否為null,如果為null則使用節(jié)點newNode替換p的next節(jié)點,然后執(zhí)行代碼(6),這里由于p==t所以沒有設(shè)置尾部節(jié)點,然后退出offer方法,這時候隊列的狀態(tài)如下圖所示
(2)上面是一個線程調(diào)用offer方法的情況,如果多個線程同時調(diào)用,就會存在多個線程同時執(zhí)行到代碼(5)的情況。假設(shè)線程A調(diào)用offer(item1),線程B調(diào)用offer(item2),同時執(zhí)行到代碼(5)p.casNext(null, newNode)。
由于CAS的比較設(shè)置操作是原子性的,所以這里假設(shè)線程A先執(zhí)行了比較設(shè)置操作,發(fā)現(xiàn)當(dāng)前p的next節(jié)點確實是null,則會原子性地更新next節(jié)點為 item1,這時候線程B也會判斷p的next節(jié)點是否為null,結(jié)果發(fā)現(xiàn)不是null(因為線程A已經(jīng)設(shè)置了p的next節(jié)點為 item1),則會跳到代碼(3),然后執(zhí)行到代碼(4),這時候的隊列分布如下圖所示。
根據(jù)上面的狀態(tài)圖可知線程B接下來會執(zhí)行代碼(8),然后把q賦給了p,這時候隊列狀態(tài)如下圖所示。
然后線程B再次跳轉(zhuǎn)到代碼(3)執(zhí)行,當(dāng)執(zhí)行到代碼(4)時隊列狀態(tài)如下圖所示
由于這時候q==null,所以線程B會執(zhí)行代碼(5),通過CAS操作判斷當(dāng)前p的next節(jié)點是否是null,不是則再次循環(huán)嘗試,是則使用item2替換。假設(shè)CAS成功了,那么執(zhí)行代碼(6),由于p!=t,所以設(shè)置tail節(jié)點為item2,然后退出offer方法。這時候隊列分布如下圖所示。
分析到現(xiàn)在,就差代碼(7)還沒走過,其實這一步要在執(zhí)行poll操作后才會執(zhí)行。這里先來看一下執(zhí)行poll操作后可能會存在的一種情況,如下圖所示。
下面分析當(dāng)隊列處于這種狀態(tài)時調(diào)用offer添加元素,執(zhí)行到代碼(4)時的狀態(tài)圖,如下
這里由于q節(jié)點不為空并且pq所以執(zhí)行代碼(7),由于ttail所以p被賦值為head,然后重新循環(huán),循環(huán)后執(zhí)行到代碼(4),這時候隊列狀態(tài)如下圖所示。
這時候由于q==null,所以執(zhí)行代碼(5)進(jìn)行CAS操作,如果當(dāng)前沒有其他線程執(zhí)行offer操作,則CAS操作會成功,p的next節(jié)點被設(shè)置為新增節(jié)點。然后執(zhí)行代碼(6),由于p!=t所以設(shè)置新節(jié)點為隊列的尾部節(jié)點,現(xiàn)在隊列狀態(tài)如圖
需要注意的是,這里自引用的節(jié)點會被垃圾回收掉。
可見,offer操作中的關(guān)鍵步驟是代碼(5),通過原子CAS操作來控制某時只有一個線程可以追加元素到隊列末尾。進(jìn)行CAS競爭失敗的線程會通過循環(huán)一次次嘗試進(jìn)行CAS操作,直到CAS成功才會返回,也就是通過使用無限循環(huán)不斷進(jìn)行CAS嘗試方式來替代阻塞算法掛起調(diào)用線程。相比阻塞算法,這是使用CPU資源換取阻塞所帶來的開銷。
add
add操作是在鏈表末尾添加一個元素,其實在內(nèi)部調(diào)用的還是offer操作’\
/*** Inserts the specified element at the tail of this queue.* As the queue is unbounded, this method will never throw* {@link IllegalStateException} or return {@code false}.** @return {@code true} (as specified by {@link Collection#add})* @throws NullPointerException if the specified element is null*/public boolean add(E e) {return offer(e);}poll
poll操作是在隊列頭部獲取并移除一個元素,如果隊列為空則返回null。
public E poll() {// 1. goto標(biāo)記restartFromHead:// 2 無限循環(huán)for (;;) {for (Node<E> h = head, p = h, q;;) {// 3 保存當(dāng)前節(jié)點E item = p.item;// 4 當(dāng)前item有值,則CAS變?yōu)閚ullif (item != null && p.casItem(item, null)) {// Successful CAS is the linearization point// for item to be removed from this queue.// 5 cas成功則標(biāo)記當(dāng)前節(jié)點并從鏈表中移除if (p != h) // hop two nodes at a timeupdateHead(h, ((q = p.next) != null) ? q : p);return item;}// 6 當(dāng)前隊列為空則返回nullelse if ((q = p.next) == null) {updateHead(h, p);return null;}// 7 如果當(dāng)前節(jié)點被自己引用,則重新查找新的隊列頭節(jié)點else if (p == q)continue restartFromHead;else // 8 p = q;}}}poll方法在移除一個元素時,只是簡單地使用CAS操作把當(dāng)前節(jié)點的item值設(shè)置為null,然后通過重新設(shè)置頭節(jié)點將該元素從隊列里面移除,被移除的節(jié)點就成了孤立節(jié)點,這個節(jié)點會在垃圾回收時被回收掉。另外,如果在執(zhí)行分支中發(fā)現(xiàn)頭節(jié)點被修改了,要跳到外層循環(huán)重新獲取新的頭節(jié)點。
peek
peek操作是獲取隊列頭部一個元素(只獲取不移除),如果隊列為空則返回null
public E peek() {// 1 restartFromHead:for (;;) {// 2 for (Node<E> h = head, p = h, q;;) {E item = p.item;// 3 if (item != null || (q = p.next) == null) {updateHead(h, p);return item;}// 4 else if (p == q)continue restartFromHead;else// 5 p = q;}}}Peek操作的代碼結(jié)構(gòu)與poll操作類似,不同之處在于代碼(3)中少了castItem操作。
其實這很正常,因為peek只是獲取隊列頭元素值,并不清空其值。根據(jù)前面的介紹我們知道第一次執(zhí)行offer后head指向的是哨兵節(jié)點(也就是item為null的節(jié)點),那么第一次執(zhí)行peek時在代碼(3)中會發(fā)現(xiàn)item==null,然后執(zhí)行q=p.next,這時候q節(jié)點指向的才是隊列里面第一個真正的元素,或者如果隊列為null則q指向null。
總結(jié):peek操作的代碼與poll操作類似,只是前者只獲取隊列頭元素但是并不從隊列里將它刪除,而后者獲取后需要從隊列里面將它刪除。
另外,在第一次調(diào)用peek操作時,會刪除哨兵節(jié)點,并讓隊列的head節(jié)點指向隊列里面第一個元素或者null。
size
計算當(dāng)前隊列元素個數(shù),在并發(fā)環(huán)境下不是很有用,因為CAS沒有加鎖,所以從調(diào)用size函數(shù)到返回結(jié)果期間有可能增刪元素,導(dǎo)致統(tǒng)計的元素個數(shù)不精確
/*** Returns the number of elements in this queue. If this queue* contains more than {@code Integer.MAX_VALUE} elements, returns* {@code Integer.MAX_VALUE}.** <p>Beware that, unlike in most collections, this method is* <em>NOT</em> a constant-time operation. Because of the* asynchronous nature of these queues, determining the current* number of elements requires an O(n) traversal.* Additionally, if elements are added or removed during execution* of this method, the returned result may be inaccurate. Thus,* this method is typically not very useful in concurrent* applications.** @return the number of elements in this queue*/public int size() {int count = 0;for (Node<E> p = first(); p != null; p = succ(p))if (p.item != null)// Collection.size() spec says to max outif (++count == Integer.MAX_VALUE)break;return count;} // 獲取第一個元素,哨兵元素不算,沒有則為nullNode<E> first() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {boolean hasItem = (p.item != null);if (hasItem || (q = p.next) == null) {updateHead(h, p);return hasItem ? p : null;}else if (p == q)continue restartFromHead;elsep = q;}}}remove
如果隊列里面存在該元素則刪除該元素,如果存在多個則刪除第一個,并返回true,否則返回false。
/*** Removes a single instance of the specified element from this queue,* if it is present. More formally, removes an element {@code e} such* that {@code o.equals(e)}, if this queue contains one or more such* elements.* Returns {@code true} if this queue contained the specified element* (or equivalently, if this queue changed as a result of the call).** @param o element to be removed from this queue, if present* @return {@code true} if this queue changed as a result of the call*/public boolean remove(Object o) {//查找元素為空,直接返回falseif (o == null) return false;Node<E> pred = null;for (Node<E> p = first(); p != null; p = succ(p)) {E item = p.item;//相等則使用cas值null,同時一個線程成功,失敗的線程循環(huán)查找隊列中其他元素是否有匹配的。if (item != null &&o.equals(item) &&p.casItem(item, null)) {//獲取next元素Node<E> next = succ(p);//如果有前驅(qū)節(jié)點,并且next不為空則鏈接前驅(qū)節(jié)點到next,if (pred != null && next != null)pred.casNext(p, next);return true;}pred = p;}return false; }contains
判斷隊列里面是否含有指定對象,由于是遍歷整個隊列,所以像size 操作一樣結(jié)果也不是那么精確,有可能調(diào)用該方法時元素還在隊列里面,但是遍歷過程中其他線程才把該元素刪除了,那么就會返回false。
/*** Returns {@code true} if this queue contains the specified element.* More formally, returns {@code true} if and only if this queue contains* at least one element {@code e} such that {@code o.equals(e)}.** @param o object to be checked for containment in this queue* @return {@code true} if this queue contains the specified element*/public boolean contains(Object o) {if (o == null) return false;for (Node<E> p = first(); p != null; p = succ(p)) {E item = p.item;if (item != null && o.equals(item))return true;}return false;}總結(jié)
ConcurrentLinkedQueue的底層使用單向鏈表數(shù)據(jù)結(jié)構(gòu)來保存隊列元素,每個元素被包裝成一個Node節(jié)點。隊列是靠頭、尾節(jié)點來維護(hù)的,創(chuàng)建隊列時頭、尾節(jié)點指向一個item為null的哨兵節(jié)點。
第一次執(zhí)行peek或者first操作時會把head指向第一個真正的隊列元素。由于使用非阻塞CAS算法,沒有加鎖,所以在計算size時有可能進(jìn)行了offer、poll或者remove操作,導(dǎo)致計算的元素個數(shù)不精確,所以在并發(fā)情況下size函數(shù)不是很有用。
如下圖所示,入隊、出隊都是操作使用volatile修飾的tail、head節(jié)點,要保證在多線程下出入隊線程安全,只需要保證這兩個Node操作的可見性和原子性即可。由于volatile本身可以保證可見性,所以只需要保證對兩個變量操作的原子性即可。
offer操作是在tail后面添加元素,也就是調(diào)用tail.casNext方法,而這個方法使用的是CAS操作,只有一個線程會成功,然后失敗的線程會循環(huán),重新獲取tail,再執(zhí)行casNext方法。poll操作也通過類似CAS的算法保證出隊時移除節(jié)點操作的原子性
總結(jié)
以上是生活随笔為你收集整理的Java Review - 并发编程_ConcurrentLinkedQueue原理源码剖析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java Review - 并发编程_抽
- 下一篇: Java Review - 并发编程_S