JUC队列-ArrayBlockingQueue(一)
ArrayBlockingQueue介紹
ArrayBlockingAQueue是用數組實現的線程安全的有界的阻塞隊列。
線程安全是指通過“互斥鎖”保護競爭資源,實現了對線程對競爭資源的互斥訪問,有界是指ArrayB咯KingQueue對應的數組是有界限的,阻塞隊列是指當多個線程訪問競爭資源時,當競爭資源已經被某個線程獲取時,其他要獲取該線程的線程需要等待。
注意:ArrayBlockingQueue不同于LinkedBlockingQueue,ArrayBlockingQueue是數組實現的,并且是有界限的;而LinkedBlockingQueue是鏈表實現的,是無界限的。
ArrayBlokingQueue的uml圖:
說明:
ArrayBlockingQueue與ReentrantLock是組合關系,ArrayBlockingQueue中包含一個ReentrantLock對象(lock)。ReentrantLock是可重入的互斥鎖,ArrayBlockingQueue就是根據該互斥鎖實現“多線程對競爭資源的互斥訪問”。而且,ReentrantLock分為公平鎖和非公平鎖,關于具體使用公平鎖還是非公平鎖,在創建ArrayBlockingQueue時可以指定;而且,ArrayBlockingQueue默認會使用非公平鎖。
ArrayBlockingQueue與Condition是組合關系,ArrayBlockingQueue中包含兩個Condition對象(notEmpty和notFull)。而且,Condition又依賴于ArrayBlockingQueue而存在,通過Condition可以實現對ArrayBlockingQueue的更精確的訪問。
ArrayBlockingQueue源碼分析
構造方法
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition(); }初始化數組,獨占鎖和兩個“條件”,非空條件和滿條件。
添加元素
public void put(E e) throws InterruptedException {checkNotNull(e);//獲取隊列的獨占鎖final ReentrantLock lock = this.lock;//獲取鎖,如果鎖處于中斷狀態,則拋出InterruptedException異常lock.lockInterruptibly();try {//如果隊列已滿,則一直等待while (count == items.length)notFull.await();//入隊enqueue(e);} finally {//釋放鎖lock.unlock();}}說明:put(E e)的作用是將e插入阻塞隊列的尾部。如果隊列已滿,則等待;否則,插入元素。
在了解入隊enqueue操作時,我們先了解下面幾個成員的含義:
// 隊列中的元素個數 int takeIndex; // 下一個被取出元素的索引 int putIndex; // 下一個被添加元素的索引 int count;enqueue()的源碼如下:
private void enqueue(E x) {final Object[] items = this.items;//添加到隊列中items[putIndex] = x;//設置下一個被取出元素的索引if (++putIndex == items.length)putIndex = 0;count++;//喚醒notEmpty上的等待線程notEmpty.signal();}取出元素
取出元素的過程其實跟添加元素的過程,這里就直接貼出代碼:
public E take() throws InterruptedException {// 獲取“隊列的獨占鎖”final ReentrantLock lock = this.lock;// 獲取“鎖”,若當前線程是中斷狀態,則拋出InterruptedException異常lock.lockInterruptibly();try {// 若“隊列為空”,則一直等待。while (count == 0)notEmpty.await();// 取出元素return extract();} finally {// 釋放“鎖”lock.unlock();} } private E extract() {final Object[] items = this.items;// 強制將元素轉換為“泛型E”E x = this.<E>cast(items[takeIndex]);// 將第takeIndex元素設為null,即刪除。同時,幫助GC回收。items[takeIndex] = null;// 設置“下一個被取出元素的索引”takeIndex = inc(takeIndex);// 將“隊列中元素數量”-1--count;// 喚醒notFull上的等待線程。notFull.signal();return x; }刪除元素
這里我拿remove(Object o) 舉例
public boolean remove(Object o) {if (o == null) return false;final Object[] items = this.items;final ReentrantLock lock = this.lock;lock.lock();try {if (count > 0) {final int putIndex = this.putIndex;int i = takeIndex;do {if (o.equals(items[i])) {removeAt(i);return true;}//索引到了末尾,重置0if (++i == items.length)i = 0;//直到 i = takeIndex增長到putIndex} while (i != putIndex);}return false;} finally {lock.unlock();}}說明:remove(Object)實際上就是對數組的進行遍歷對比,只不過這個隊列數組有點特殊,它是環狀的數組,也就是可重用的數組,如果找到該元素,調用removeAt()
void removeAt(final int removeIndex) {final Object[] items = this.items;//如果remove的索引時隊列中的第一個元素,可以直接出隊。if (removeIndex == takeIndex) {// removing front item; just advanceitems[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();//如果remove的不是第一個元素,那么直接從reomve//的那個索引開始,后面的元素全部前移一位} else {// an "interior" remove// slide over all others up through putIndex.final int putIndex = this.putIndex;for (int i = removeIndex;;) {int next = i + 1;if (next == items.length)next = 0;if (next != putIndex) {items[i] = items[next];i = next;} else {items[i] = null;this.putIndex = i;break;}}count--;if (itrs != null)itrs.removedAt(removeIndex);}notFull.signal();}總結
以上是生活随笔為你收集整理的JUC队列-ArrayBlockingQueue(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JUC锁-Semaphore(八)
- 下一篇: JUC队列-LinkedBlocking