LinkedBlockingQueue源码
目錄
介紹
數據結構
鏈表節點
屬性
方法實現
offer
poll
peek
put
take
enqueue,dequeue
signal
fullLock
注意
參考
介紹
java.util.concurrent.LinkedBlockingQueue 是一個基于單向鏈表的、范圍任意的(其實是有界的)、FIFO 阻塞隊列。訪問與移除操作是在隊頭進行,添加操作是在隊尾進行,并分別使用不同的鎖進行保護,只有在可能涉及多個節點的操作才同時對兩個鎖進行加鎖。
由于同時使用了兩把鎖,在需要同時使用兩把鎖時,加鎖順序與釋放順序是非常重要的:必須以固定的順序進行加鎖,再以與加鎖順序的相反的順序釋放鎖。鎖使用了ReentrantLock,參考:ReentrantLock源碼 。
頭結點和尾結點一開始總是指向一個哨兵的結點,它不持有實際數據,當隊列中有數據時,頭結點仍然指向這個哨兵,尾結點指向有效數據的最后一個結點。這樣做的好處在于,與計數器 count 結合后,對隊頭、隊尾的訪問可以獨立進行,而不需要判斷頭結點與尾結點的關系。
數據結構
鏈表節點
static class Node<E> {E item;/*** 后繼指針。值為下列之一:* 實際的后繼結點。* 自身,表示后繼是 head.next (用于在遍歷處理時判斷)* null,表示沒有后繼(這是尾結點)*/Node<E> next;Node(E x) { item = x; }}屬性
// 最大容量上限,默認是 Integer.MAX_VALUE private final int capacity;// 當前元素數量,這是個原子類。因為讀寫分別使用不同的鎖,但都會訪問這個屬性,所以它需要是線程安全的。 private final AtomicInteger count = new AtomicInteger(0);// 頭結點 private transient Node<E> head;// 尾結點 private transient Node<E> last;// 隊頭訪問鎖 private final ReentrantLock takeLock = new ReentrantLock();// 隊頭訪問等待條件、隊列(等待隊列非空) private final Condition notEmpty = takeLock.newCondition();// 隊尾訪問鎖 private final ReentrantLock putLock = new ReentrantLock();// 隊尾訪問等待條件、隊列(等待隊列未滿) private final Condition notFull = putLock.newCondition();方法實現
offer
public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;//如果隊列滿,則直接退出if (count.get() == capacity)return false;int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;putLock.lock();try {if (count.get() < capacity) {//節點入隊列enqueue(node);c = count.getAndIncrement();//隊列未滿,則喚醒其他put線程if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}/*當c=0時,即意味著之前的隊列是空隊列,出隊列的線程都處于等待狀態,現在新添加了一個新的元素,即隊列不再為空,因此它會喚醒正在等待獲取元素的線程。*/if (c == 0)signalNotEmpty();return c >= 0;}public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//調用可中斷方法putLock.lockInterruptibly();try {//隊列滿while (count.get() == capacity) {//超時,則返回false。if (nanos <= 0)return false;//否則繼續等待notFullnanos = notFull.awaitNanos(nanos);}enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true;}poll
public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x = null;int c = -1;long nanos = unit.toNanos(timeout);final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;//調用可中斷方法takeLock.lockInterruptibly();try {//隊列空,則等待while (count.get() == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}//出隊x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}public E poll() {final AtomicInteger count = this.count;if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {//隊列空,則等待if (count.get() > 0) {x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}peek
public E peek() {if (count.get() == 0)return null;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {//獲取第一個元素Node<E> first = head.next;if (first == null)return null;elsereturn first.item;} finally {takeLock.unlock();}}put
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// 在所有的 put/take/etc 等操作中預設值本地變量 c 為負數表示失敗。成功會設置為 >= 0 的值。int c = -1;Node<E> node = new Node(e);// 下面兩行是訪問優化final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {/** 注意,count用于等待監視,即使它沒有用鎖保護。這個可行是因為* count 只能在此刻(持有putLock)減小(其他put線程都被鎖拒之門外),* 當count對capacity發生變化時,當前線程(或其他put等待線程)將被通知。* 在其他等待監視的使用中也類似。*/while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();// 還有可添加空間則喚醒put等待線程。if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}/*當c=0時,即意味著之前的隊列是空隊列,出隊列的線程都處于等待狀態,現在新添加了一個新的元素,即隊列不再為空,因此它會喚醒正在等待獲取元素的線程。*/if (c == 0)signalNotEmpty(); }take
public E take() throws InterruptedException {E x;int c = -1;// 下面兩行是訪問優化final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {// 循環里等待直到有數據可獲取while (count.get() == 0) {notEmpty.await();}// 獲取第一個有效元素x = dequeue();// 如果還有可獲取元素,喚醒等待獲取的線程。c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}// 注意,c 是調用 getAndDecrement 返回的,如果 if 成立,// 表明前面的 count == capacity ,有put線程在等待,可以添加新元素,所以喚醒 添加線程。if (c == capacity)signalNotFull();return x; }enqueue,dequeue
public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;//head,last都指向哨兵節點last = head = new Node<E>(null); }// 在持有 putLock 鎖下執行 private void enqueue(Node<E> node) {// assert putLock.isHeldByCurrentThread();// assert last.next == null;//把node作為最后一個節點,更新last.next引用last = last.next = node; }// 在持有 takeLock 鎖下執行 private E dequeue() {// assert takeLock.isHeldByCurrentThread();// assert head.item == null;Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null; // 出隊列后的結點作為新的哨兵結點return x; }signal
private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {notEmpty.signal();} finally {takeLock.unlock();}}private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();}}fullLock
有些方法必須同時獲取take與put鎖
在需要同時使用兩把鎖時,加鎖順序與釋放順序是非常重要的:必須以固定的順序進行加鎖,再以與加鎖順序的相反的順序釋放鎖。
void fullyLock() {putLock.lock();takeLock.lock();}void fullyUnlock() {takeLock.unlock();putLock.unlock();}public boolean remove(Object o) {if (o == null) return false;fullyLock();try {for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {if (o.equals(p.item)) {unlink(p, trail);return true;}}return false;} finally {fullyUnlock();}}void unlink(Node<E> p, Node<E> trail) {// assert isFullyLocked();// p.next is not changed, to allow iterators that are// traversing p to maintain their weak-consistency guarantee.p.item = null;trail.next = p.next;if (last == p)last = trail;if (count.getAndDecrement() == capacity)notFull.signal();}注意
- 為什么每次put()后總要喚醒其他put線程或者take()后要喚醒其他take線程
因為signalNotEmpty(),signalNotFull()方法僅喚醒一個線程,如果有多個等待線程,就會導致剩余線程一直掛起。
- 為什么signalNotEmpty(),signalNotFull()不喚醒多個線程
假設此時僅有1個空間,喚醒N個put線程后,僅有一個take線程會執行put操作,其他N-1個線程剛被喚醒,又要掛起,影響性能
?
參考
- ReentrantLock源碼
- Java并發包--阻塞隊列(BlockingQueue)
?
?
?
總結
以上是生活随笔為你收集整理的LinkedBlockingQueue源码的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java并发包--阻塞队列(Blocki
- 下一篇: ArrayBlockingQueue源码