Java8 ArrayBlockingQueue 源码阅读
一、什么是 ArrayBlockingQueue
ArrayBlockingQueue 是 GUC(java.util.concurrent) 包下的一個線程安全的阻塞隊列,底層使用數(shù)組實現(xiàn)。
除了線程安全這個特性,ArrayBlockingQueue 還有其他的特點:
- 當(dāng)隊列已滿時,會阻塞后面添加元素 [put(E e)] 的線程,直到調(diào)用了移除元素的方法隊列不滿的情況下會喚醒前面添加元素的線程
- 當(dāng)隊列已空時,會阻塞后面移除元素 [take()] 的線程,直到調(diào)用了添加元素的方法隊列不為空的情況下會喚醒前面移除元素的線程
- 新添加的元素并不一定在數(shù)組的 0 下標(biāo)位置,因為其內(nèi)部維護了一個 putIndex 屬性
- 數(shù)組大小確定,通過構(gòu)造函數(shù)初始化阻塞隊列大小,沒有擴容機制,因為線程阻塞,不存在數(shù)組下標(biāo)越界異常
- 元素都是緊湊的,比如阻塞隊列中有兩個元素,那這兩個元素在數(shù)組中下標(biāo)之差一定是 1
- 插入的元素不允許為 null,所有的隊列都有這個特點
- 先進先出(FIFO (first-in-first-out))
二、相關(guān)結(jié)構(gòu)介紹
2.1 內(nèi)部屬性
了解了 ArrayBlockingQueue 內(nèi)部的屬性,可以幫助我們更好的理解阻塞隊列。
// 底層存儲元素的數(shù)組final Object[] items;// 出隊序號,如果有一個元素出隊,那么后面的元素不會向前移動,// 而是將 takeIndex + 1 表示后面要出隊的元素的角標(biāo)int takeIndex;// 入隊序號,表示后續(xù)插入的元素的角標(biāo),putIndex 不一定大于 takeIndexint putIndex;// 元素個數(shù)int count;// 內(nèi)部鎖final ReentrantLock lock;// notEmpty 條件private final Condition notEmpty;// notFull 條件private final Condition notFull;2.2 構(gòu)造函數(shù)
ArrayBlockingQueue 中共有 3 個構(gòu)造函數(shù),這里只看其中一個,拿出來簡單地分析一下。
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();// 初始化底層數(shù)組this.items = new Object[capacity];// 默認為非公平鎖lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}2.3 相關(guān)方法特性
| offer(E e) | 在隊列尾部插入元素,如果隊列已滿,則返回 false |
| put(E e) | 在隊列尾部插入元素,如果隊列已滿,則線程阻塞等待空間可用 |
| add(E e) | 底層調(diào)用了 offer(E e) 方法 |
| poll() | 出隊,如果隊列中沒有元素則返回 null |
| take() | 出隊,如果隊列中沒有元素,則線程阻塞,等待新元素插入 |
| peek() | 出隊,如果隊列中沒有元素則返回 null |
三、源碼分析
上面對 ArrayBlockingQueue 中主要的方法作了介紹,并指出了對應(yīng)特點,下面就簡單的來分析一下 put 與 take 的方法。
3.1 put 方法
public void put(E e) throws InterruptedException {// 插入的元素不允許為 nullcheckNotNull(e);// 獲取鎖final ReentrantLock lock = this.lock;/*** lock:調(diào)用后一直阻塞到獲得鎖* tryLock:嘗試是否能獲得鎖 如果不能獲得立即返回* lockInterruptibly:調(diào)用后一直阻塞到獲得鎖 但是接受中斷信號(比如:Thread、sleep)*/lock.lockInterruptibly();try {// 如果隊列數(shù)組已滿,則 notFull 阻塞,當(dāng)有元素被移除后才喚醒 notFullwhile (count == items.length)notFull.await();// 元素入隊enqueue(e);} finally {// 添加完元素后釋放鎖lock.unlock();}}上面的方法中調(diào)用了 enqueue 方法,這個 enqueue 用于在隊尾插入元素,下面是具體實現(xiàn)細節(jié)。
private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;// 添加元素items[putIndex] = x; // 如果插入元素的位置是數(shù)組尾部則重置 putIndex 為 0if (++putIndex == items.length)putIndex = 0;count++;// 隊列中一定有元素,因此喚醒 notEmptynotEmpty.signal();}代碼實現(xiàn)還是比較簡單的,先加鎖,如果隊列沒有滿的情況下直接在 putIndex 的位置插入新元素,如果隊列已滿則阻塞當(dāng)前獲得鎖的添加元素的線程,直到有元素從隊列中被移除了,會喚醒 notFull,添加元素的線程才會被喚醒繼續(xù)執(zhí)行。
3.2 take 方法
take 方法與 put 方法類似。
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 如果隊列中沒有元素,則讓 notEmpty 阻塞,添加元素后會喚醒 notEmptywhile (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}出隊 dequeue 源碼。
private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];// 元素置 nullitems[takeIndex] = null;// 如果出隊的是數(shù)組中的最后一個元素,則重置 takeIndex 為 0if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();// 喚醒 notFullnotFull.signal();出隊與入隊的原理都是類似的,同樣是先加鎖,如果隊列中沒有任何元素,則獲得鎖的出隊的線程阻塞 notEmpty.await(),直到有元素被添加到隊列中,會喚醒 notEmpty,移除元素的線程才會被喚醒繼續(xù)執(zhí)行,如果隊列中有元素,則直接把 takeIndex 位置上的元素出隊。
3.3 other
上面只簡單的分析了 4 個方法,但是上面 4 個方法足以讓我們了解 ArrayBlockingQueue 的實現(xiàn)原理。其中比較復(fù)雜的方法可能就是 remove 方法了,因為移除的元素可能在任意一個位置,為了使元素緊湊,會將后面的元素向前移動一個位置,然后重置 putIndex,大概的流程就是這樣,想看詳細的源碼可以點擊后面的鏈接進行進一步的了解。
jdk1.8 源碼閱讀:https://github.com/zchen96/jdk1.8-source-code-read
總結(jié)
以上是生活随笔為你收集整理的Java8 ArrayBlockingQueue 源码阅读的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java8 LinkedHashMap
- 下一篇: Java8 Hashtable 源码阅读