Java并发编程笔记之LinkedBlockingQueue源码探究
LinkedBlockingQueue的實現是使用獨占鎖實現的阻塞隊列。首先看一下LinkedBlockingQueue 的類圖結構,如下圖所示:
如類圖所示:LinkedBlockingQueue是使用單向鏈表實現,有兩個Node分別來存放首尾節點,并且里面有個初始值為0 的原子變量count,它用來記錄隊列元素個數。
另外里面有兩個ReentrantLock的實例,分別用來控制元素入隊和出隊的原子性,其中takeLock用來控制同時只有一個線程可以從隊列獲取元素,其他線程必須等待,
putLock控制同時只能有一個線程可以獲取鎖去添加元素,其他線程必須等待。另外notEmpty 和 notFull 是信號量,內部分別有一個條件隊列用來存放進隊和出隊的時候被阻塞的線程,
說白了,這其實就是一個生產者 - 消費者模型。
我們首先看一下獨占鎖的源碼,如下所示:
/** 執行take, poll等操作時候需要獲取該鎖 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 當隊列為空時候執行出隊操作(比如take)的線程會被放入這個條件隊列進行等待 */
private final Condition notEmpty = takeLock.newCondition();
/** 執行put, offer等操作時候需要獲取該鎖*/
private final ReentrantLock putLock = new ReentrantLock();
/**當隊列滿時候執行進隊操作(比如put)的線程會被放入這個條件隊列進行等待 */
private final Condition notFull = putLock.newCondition();
/** 當前隊列元素個數 */
private final AtomicInteger count = new AtomicInteger(0);
接著我們要進入LinkedBlockingQueue 無參構造函數,源碼如下:
public static final int MAX_VALUE = 0x7fffffff;
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//初始化首尾節點,指向哨兵節點
last = head = new Node<E>(null);
}
從源碼中可以看到,默認隊列的容量為0x7fffffff; 用戶也可以自己指定容量,所以一定程度上 LinkedBlockingQueue 可以說是有界阻塞隊列。
接下來我們主要看LinkedBlockingQueue 的幾個主要方法的源碼,如下:
1.offer操作,向隊列尾部插入一個元素,如果隊列有空閑容量則插入成功后返回true,如果隊列已滿則丟棄當前元素然后返回false,如果 e元素為null,則拋出空指針異常(NullPointerException ),還有一點就是,該方法是非阻塞的。源碼如下:
public boolean offer(E e) {
//(1)空元素拋空指針異常
if (e == null) throw new NullPointerException();
//(2) 如果當前隊列滿了則丟棄將要放入的元素,然后返回false
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
//(3) 構造新節點,獲取putLock獨占鎖
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//(4)如果隊列不滿則進隊列,并遞增元素計數
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
//(5)
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//(6)釋放鎖
putLock.unlock();
}
//(7)
if (c == 0)
signalNotEmpty();
//(8)
return c >= 0;
}
private void enqueue(Node<E> node) {?
last = last.next = node;
}
代碼(2)判斷的是如果當前隊列已滿則丟棄當前元素并返回false。
代碼(3)獲取到putLock鎖,當前線程獲取到該鎖后,則其他調用put 和 offer 的線程將會被阻塞(阻塞的線程被放到 putLock 鎖的 AQS 阻塞隊列)。
代碼(4)這里又重新判斷了一下當前隊列是否滿了,這是因為在執行代碼(2)和獲取到putLock鎖期間,有可能其他線程通過put 或者 offer方法想隊列里面添加了新的元素。重新判斷隊列確實不滿則新元素入隊,并遞增計數器。
代碼(5)判斷的是如果新元素入隊后還有空閑空間,則喚醒notFull的條件隊列里面因為調用了notFull 的 await 操作(比如執行put方法而隊列滿了的時候)而被阻塞的一個線程,因為隊列現在有空閑,所以這里可以提前喚醒一個入隊線程。
代碼(6)則釋放獲取的putLock鎖,這里要注意鎖的釋放一定要在finally里面做,因為即使try塊拋出異常了,finally也是會被執行到的。另外釋放鎖后其他因為調用put和offer而被阻塞的線程將會有一個獲取到改鎖。
代碼(7)c == 0說明在執行代碼(6)釋放鎖的時候隊列里面至少有一個元素,隊列里面有元素則執行signalNotEmpty,signalNotEmpty的源碼如下:
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
通過上面代碼可以看到其作用是激活notEmpty 的條件隊列中因為調用notEmpty的await方法(比如調用 take 方法并且隊列為空的時候)而被阻塞的一個線程,這里也說明了調用條件變量的方法前,要首先獲取對應的鎖。
offer的總結:offer方法中通過使用putLock鎖保證了在隊尾新增元素的原子性和隊列元素個數的比較和遞增操作的原子性。
2.put操作,向隊列尾部插入一個元素,如果隊列有空閑則插入后直接返回true,如果隊列已經滿則阻塞當前線程知道隊列有空閑插入成功后返回true,如果在阻塞的時候被其他線程設置了中斷標志,
則被阻塞線程會拋出InterruptedException 異常而返回,另外如果 e 元素為 null 則拋出 NullPointerException 異常。源碼如下:
public void put(E e) throws InterruptedException {
//(1)空元素拋空指針異常
if (e == null) throw new NullPointerException();
//(2) 構建新節點,并獲取獨占鎖putLock
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//(3)如果隊列滿則等待
while (count.get() == capacity) {
notFull.await();
}
//(4)進隊列并遞增計數
enqueue(node);
c = count.getAndIncrement();
//(5)
if (c + 1 < capacity)
notFull.signal();
} finally {
//(6)
putLock.unlock();
}
//(7)
if (c == 0)
signalNotEmpty();
}
代碼(2)中使用 putLock.lockInterruptibly() 獲取獨占鎖,相比 offer 方法中這個獲取獨占鎖方法意味著可以被中斷,具體說是當前線程在獲取鎖的過程中,如果被其它線程設置了中斷標志則當前線程會拋出 InterruptedException 異常,
所以put操作在獲取 鎖過程中是可被中斷的。
代碼(3)如果當前隊列已經滿,則notFull 的 await() 把當前線程放入 notFull 的條件隊列,當前線程被阻塞掛起并釋放獲取到的 putLock 鎖,由于putLock鎖被釋放了,所以現在其他線程就有機會獲取到putLock鎖了。
代碼(3)判斷隊列是否為空為何使用 while 循環而不是 if 語句呢?
這是因為考慮到當前線程被虛假喚醒的問題,也就是其它線程沒有調用 notFull 的 singal 方法時候,notFull.await() 在某種情況下會自動返回。
如果使用if語句簡單判斷一下,那么虛假喚醒后會執行代碼(4),元素入隊,并且遞增計數器,而這時候隊列已經是滿了的,導致隊列元素個數大于了隊列設置的容量,導致程序出錯。
而使用使用 while 循環假如 notFull.await() 被虛假喚醒了,那么循環在檢查一下當前隊列是否是滿的,如果是則再次進行等待。
3.poll操作,從隊列頭部獲取并移除一個元素,如果隊列為空則返回 null,該方法是不阻塞的。源碼如下:
public E poll() {
//(1)隊列為空則返回null
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
//(2)獲取獨占鎖
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//(3)隊列不空則出隊并遞減計數
if (count.get() > 0) {//3.1
x = dequeue();//3.2
c = count.getAndDecrement();//3.3
//(4)
if (c > 1)
notEmpty.signal();
}
} finally {
//(5)
takeLock.unlock();
}
//(6)
if (c == capacity)
signalNotFull();
//(7)返回
return x;
}
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
代碼(1) 如果當前隊列為空,則直接返回 null。
代碼(2)獲取獨占鎖 takeLock,當前線程獲取該鎖后,其它線程在調用 poll 或者 take 方法會被阻塞掛起。
代碼 (3) 如果當前隊列不為空則進行出隊操作,然后遞減計數器。
代碼(4)如果 c>1 則說明當前線程移除掉隊列里面的一個元素后隊列不為空(c 是刪除元素前隊列元素個數),那么這時候就可以激活因為調用 poll 或者 take 方法而被阻塞到notEmpty 的條件隊列里面的一個線程。
代碼(5)釋放鎖,一定要在finally里面釋放鎖。
代碼(6)說明當前線程移除隊頭元素前當前隊列是滿的,移除隊頭元素后隊列當前至少有一個空閑位置,那么這時候就可以調用signalNotFull激活因為調用put 或者 offer 而被阻塞放到 notFull 的條件隊列里的一個線程,signalNotFull 源碼如下:
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
poll 代碼邏輯比較簡單,值得注意的是獲取元素時候只操作了隊列的頭節點。
4.peek 操作,獲取隊列頭部元素但是不從隊列里面移除,如果隊列為空則返回 null,該方法是不阻塞的。源碼如下:
public E peek() {
//(1)
if (count.get() == 0)
return null;
//(2)
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
//(3)
if (first == null)
return null;
else
//(4)
return first.item;
} finally {
//(5)
takeLock.unlock();
}
}
可以看到代碼(3)這里還是需要判斷下 first 是否為 null 的,不能直接執行代碼(4)。
正常情況下執行到代碼(2)說明隊列不為空,但是代碼(1)和(2)不是原子性操作,也就是在執行代碼(1)判斷隊列不為空后,
在代碼(2)獲取到鎖前,有可能其他線程執行了poll 或者 take 操作導致隊列變為了空,然后當前線程獲取鎖后,直接執行 first.item 會拋出空指針異常。
5.take 操作,獲取當前隊列頭部元素并從隊列里面移除,如果隊列為空則阻塞調用線程。如果隊列為空則阻塞當前線程知道隊列不為空,然后返回元素,如果在阻塞的時候被其他線程設置了中斷標志,則被阻塞線程會拋出InterruptedException 異常而返回。源碼如下:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
//(1)獲取鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//(2)當前隊列為空則阻塞掛起
while (count.get() == 0) {
notEmpty.await();
}
//(3)出隊并遞減計數
x = dequeue();
c = count.getAndDecrement();
//(4)
if (c > 1)
notEmpty.signal();
} finally {
//(5)
takeLock.unlock();
}
//(6)
if (c == capacity)
signalNotFull();
//(7)
return x;
}
代碼(1)當前線程獲取到獨占鎖,其他調用take 或者 poll的線程將會被阻塞掛起。
代碼(2)如果隊列為空則阻塞掛起當前線程,并把當前線程放入 notEmpty 的條件隊列。
代碼(3)進行出隊操作并遞減計數。
代碼(4)如果 c > 1 說明當前隊列不為空,則喚醒notEmpty 的條件隊列的條件隊列里面的一個因為調用 take 或者 poll 而被阻塞的線程。
代碼(5)釋放鎖。
代碼(6)如果 c == capacity 則說明當前隊列至少有一個空閑位置,則激活條件變量 notFull 的條件隊列里面的一個因為調用 put 或者 offer 而被阻塞的線程。
6.remove操作,刪除隊列里面指定元素,有則刪除返回 true,沒有則返回 false,源碼如下:
public boolean remove(Object o) {
if (o == null) return false;
//(1)雙重加鎖
fullyLock();
try {
//(2)遍歷隊列找則刪除返回true
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//(3)
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
//(4)找不到返回false
return false;
} finally {
//(5)解鎖
fullyUnlock();
}
}
代碼(1)通過fullyLock獲取雙重鎖,當前線程獲取后,其他線程進行入隊或者出隊的操作就會被阻塞掛起。雙重鎖方法fullyLock的源碼如下:
void fullyLock() {
putLock.lock();
takeLock.lock();
}
代碼(2)遍歷隊列尋找要刪除的元素,找不到則直接返回false,找到則執行unlink操作,unlink的源碼如下:
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
如果當前隊列滿,刪除后,也不忘記喚醒等待的線程
if (count.getAndDecrement() == capacity)
notFull.signal();
}
可以看到刪除元素后,如果發現當前隊列有空閑空間,則喚醒 notFull 的條件隊列中一個因為調 用 put 或者 offer 方法而被阻塞的線程。
代碼(5)調用 fullyUnlock 方法使用與加鎖順序相反的順序釋放雙重鎖,源碼如下:
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
7.size操作,獲取當前隊列元素個數。源碼如下:
public int size() {
return count.get();
}
總結:由于在操作出隊入隊的時候操作Count的時候加了鎖,因此相比ConcurrentLinkedQueue 的size方法比較準確。
最后用一張圖來加深LinkedBlockingQueue的理解,如下圖:
因此我們要思考一個問題:為何 ConcurrentLinkedQueue 中需要遍歷鏈表來獲取 size 而不適用一個原子變量呢?
這是因為使用原子變量保存隊列元素個數需要保證入隊出隊操作和操作原子變量是原子操作,而ConcurrentLinkedQueue 是使用 CAS 無鎖算法的,所以無法做到這個。
原文鏈接總結
以上是生活随笔為你收集整理的Java并发编程笔记之LinkedBlockingQueue源码探究的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 无锁数据结构三:无锁数据结构的两大问题
- 下一篇: JAVA并发编程: CAS和AQS