ArrayBlockingQueue源码
生活随笔
收集整理的這篇文章主要介紹了
ArrayBlockingQueue源码
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
目錄
介紹
數據結構
方法實現
offer,poll,peek
put,take
enqueue,dequeue
注意
參考
介紹
是一個用數組實現的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖。【注:每一個線程在獲取鎖的時候可能都會排隊等待,如果在等待時間上,先獲取鎖的線程的請求一定先被滿足,那么這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當前等待時間最長的線程先獲取鎖】。
鎖控制參考:ReentrantLock源碼
數據結構
/**隊列元素 */ final Object[] items;/** items index for next take, poll, peek or remove */ int takeIndex;/** items index for next put, offer, or add */ int putIndex;/** 以入隊數量 */ int count;/** 鎖控制 */ final ReentrantLock lock;/** Condition for waiting takes */ private final Condition notEmpty;/** Condition for waiting puts */ private final Condition notFull;/* iterator */transient Itrs itrs = null;方法實現
offer,poll,peek
public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)return false;else {enqueue(e);return true;}} finally {lock.unlock();}}public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(e);return true;} finally {lock.unlock();}}public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return (count == 0) ? null : dequeue();} finally {lock.unlock();}}public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}return dequeue();} finally {lock.unlock();}}public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return itemAt(takeIndex); // null when queue is empty} finally {lock.unlock();}}?
put,take
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}enqueue,dequeue
private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;//在put index 放置元素items[putIndex] = x;//如果putIndex +1 == items.length,則設置為0,循環數組。if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();}private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;}void removeAt(final int removeIndex) {// assert lock.getHoldCount() == 1;// assert items[removeIndex] != null;// assert removeIndex >= 0 && removeIndex < items.length;final Object[] items = this.items;//是第一個元素。if (removeIndex == takeIndex) {// removing front item; just advanceitems[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();} else {// an "interior" remove// slide over all others up through putIndex.final int putIndex = this.putIndex;//i位置值設為null,后續節點全部前移,直到putIndex,并且把putIndex -1;for (int i = removeIndex;;) {int next = i + 1;if (next == items.length)next = 0;if (next != putIndex) {items[i] = items[next];i = next;} else {items[i] = null;this.putIndex = i;break;}}count--;if (itrs != null)itrs.removedAt(removeIndex);}notFull.signal();}注意
數組是個循環數組,當下標為item.length時則為0。
參考
- ReentrantLock源碼
- Java并發包--阻塞隊列(BlockingQueue)
總結
以上是生活随笔為你收集整理的ArrayBlockingQueue源码的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: LinkedBlockingQueue源
- 下一篇: PriorityBlockingQueu