Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析
轉載自??Java阻塞隊列ArrayBlockingQueue和LinkedBlockingQueue實現原理分析
?
Java中的阻塞隊列接口BlockingQueue繼承自Queue接口。
BlockingQueue接口提供了3個添加元素方法。
3個刪除方法。
常用的阻塞隊列具體類有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、LinkedBlockingDeque等。
本文以ArrayBlockingQueue和LinkedBlockingQueue為例,分析它們的實現原理。
?
ArrayBlockingQueue
ArrayBlockingQueue的原理就是使用一個可重入鎖和這個鎖生成的兩個條件對象進行并發控制(classic two-condition algorithm)。
ArrayBlockingQueue是一個帶有長度的阻塞隊列,初始化的時候必須要指定隊列長度,且指定長度之后不允許進行修改。
它帶有的屬性如下:?
// 存儲隊列元素的數組,是個循環數組 final Object[] items;// 拿數據的索引,用于take,poll,peek,remove方法 int takeIndex;// 放數據的索引,用于put,offer,add方法 int putIndex;// 元素個數 int count;// 可重入鎖 final ReentrantLock lock; // notEmpty條件對象,由lock創建 private final Condition notEmpty; // notFull條件對象,由lock創建 private final Condition notFull;數據的添加
ArrayBlockingQueue有不同的幾個數據添加方法,add、offer、put方法。
add方法:
public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full"); }add方法內部調用offer方法如下:
public boolean offer(E e) {checkNotNull(e); // 不允許元素為空final ReentrantLock lock = this.lock;lock.lock(); // 加鎖,保證調用offer方法的時候只有1個線程try {if (count == items.length) // 如果隊列已滿return false; // 直接返回false,添加失敗else {insert(e); // 數組沒滿的話調用insert方法return true; // 返回true,添加成功}} finally {lock.unlock(); // 釋放鎖,讓其他線程可以調用offer方法} }insert方法如下:
private void insert(E x) {items[putIndex] = x; // 元素添加到數組里putIndex = inc(putIndex); // 放數據索引+1,當索引滿了變成0++count; // 元素個數+1notEmpty.signal(); // 使用條件對象notEmpty通知,比如使用take方法的時候隊列里沒有數據,被阻塞。這個時候隊列insert了一條數據,需要調用signal進行通知 }put方法:
public void put(E e) throws InterruptedException {checkNotNull(e); // 不允許元素為空final ReentrantLock lock = this.lock;lock.lockInterruptibly(); // 加鎖,保證調用put方法的時候只有1個線程try {while (count == items.length) // 如果隊列滿了,阻塞當前線程,并加入到條件對象notFull的等待隊列里notFull.await(); // 線程阻塞并被掛起,同時釋放鎖insert(e); // 調用insert方法} finally {lock.unlock(); // 釋放鎖,讓其他線程可以調用put方法} }ArrayBlockingQueue的添加數據方法有add,put,offer這3個方法,總結如下:
add方法內部調用offer方法,如果隊列滿了,拋出IllegalStateException異常,否則返回true
offer方法如果隊列滿了,返回false,否則返回true
add方法和offer方法不會阻塞線程,put方法如果隊列滿了會阻塞線程,直到有線程消費了隊列里的數據才有可能被喚醒。
這3個方法內部都會使用可重入鎖保證原子性。
數據的刪除
ArrayBlockingQueue有不同的幾個數據刪除方法,poll、take、remove方法。
poll方法:
public E poll() {final ReentrantLock lock = this.lock;lock.lock(); // 加鎖,保證調用poll方法的時候只有1個線程try {return (count == 0) ? null : extract(); // 如果隊列里沒元素了,返回null,否則調用extract方法} finally {lock.unlock(); // 釋放鎖,讓其他線程可以調用poll方法} }poll方法內部調用extract方法:
private E extract() {final Object[] items = this.items;E x = this.<E>cast(items[takeIndex]); // 得到取索引位置上的元素items[takeIndex] = null; // 對應取索引上的數據清空takeIndex = inc(takeIndex); // 取數據索引+1,當索引滿了變成0--count; // 元素個數-1notFull.signal(); // 使用條件對象notFull通知,比如使用put方法放數據的時候隊列已滿,被阻塞。這個時候消費了一條數據,隊列沒滿了,就需要調用signal進行通知return x; // 返回元素 }take方法:
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly(); // 加鎖,保證調用take方法的時候只有1個線程try {while (count == 0) // 如果隊列空,阻塞當前線程,并加入到條件對象notEmpty的等待隊列里notEmpty.await(); // 線程阻塞并被掛起,同時釋放鎖return extract(); // 調用extract方法} finally {lock.unlock(); // 釋放鎖,讓其他線程可以調用take方法} }remove方法:
public boolean remove(Object o) {if (o == null) return false;final Object[] items = this.items;final ReentrantLock lock = this.lock;lock.lock(); // 加鎖,保證調用remove方法的時候只有1個線程try {for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍歷元素if (o.equals(items[i])) { // 兩個對象相等的話removeAt(i); // 調用removeAt方法return true; // 刪除成功,返回true}}return false; // 刪除成功,返回false} finally {lock.unlock(); // 釋放鎖,讓其他線程可以調用remove方法} }removeAt方法:
void removeAt(int i) {final Object[] items = this.items;if (i == takeIndex) { // 如果要刪除數據的索引是取索引位置,直接刪除取索引位置上的數據,然后取索引+1即可items[takeIndex] = null;takeIndex = inc(takeIndex);} else { // 如果要刪除數據的索引不是取索引位置,移動元素元素,更新取索引和放索引的值for (;;) {int nexti = inc(i);if (nexti != putIndex) {items[i] = items[nexti];i = nexti;} else {items[i] = null;putIndex = i;break;}}}--count; // 元素個數-1notFull.signal(); // 使用條件對象notFull通知,比如使用put方法放數據的時候隊列已滿,被阻塞。這個時候消費了一條數據,隊列沒滿了,就需要調用signal進行通知 }ArrayBlockingQueue的刪除數據方法有poll,take,remove這3個方法,總結如下:
poll方法對于隊列為空的情況,返回null,否則返回隊列頭部元素。
remove方法取的元素是基于對象的下標值,刪除成功返回true,否則返回false。
poll方法和remove方法不會阻塞線程。
take方法對于隊列為空的情況,會阻塞并掛起當前線程,直到有數據加入到隊列中。
這3個方法內部都會調用notFull.signal方法通知正在等待隊列滿情況下的阻塞線程。
LinkedBlockingQueue
LinkedBlockingQueue是一個使用鏈表完成隊列操作的阻塞隊列。鏈表是單向鏈表,而不是雙向鏈表。
內部使用放鎖和拿鎖,這兩個鎖實現阻塞(“two lock queue” algorithm)。
它帶有的屬性如下:
// 容量大小 private final int capacity;// 元素個數,因為有2個鎖,存在競態條件,使用AtomicInteger private final AtomicInteger count = new AtomicInteger(0);// 頭結點 private transient Node<E> head;// 尾節點 private transient Node<E> last;// 拿鎖 private final ReentrantLock takeLock = new ReentrantLock();// 拿鎖的條件對象 private final Condition notEmpty = takeLock.newCondition();// 放鎖 private final ReentrantLock putLock = new ReentrantLock();// 放鎖的條件對象 private final Condition notFull = putLock.newCondition();ArrayBlockingQueue只有1個鎖,添加數據和刪除數據的時候只能有1個被執行,不允許并行執行。
而LinkedBlockingQueue有2個鎖,放鎖和拿鎖,添加數據和刪除數據是可以并行進行的,當然添加數據和刪除數據的時候只能有1個線程各自執行。
數據的添加
LinkedBlockingQueue有不同的幾個數據添加方法,add、offer、put方法。
add方法內部調用offer方法:
public boolean offer(E e) {if (e == null) throw new NullPointerException(); // 不允許空元素final AtomicInteger count = this.count;if (count.get() == capacity) // 如果容量滿了,返回falsereturn false;int c = -1;Node<E> node = new Node(e); // 容量沒滿,以新元素構造節點final ReentrantLock putLock = this.putLock;putLock.lock(); // 放鎖加鎖,保證調用offer方法的時候只有1個線程try {if (count.get() < capacity) { // 再次判斷容量是否已滿,因為可能拿鎖在進行消費數據,沒滿的話繼續執行enqueue(node); // 節點添加到鏈表尾部c = count.getAndIncrement(); // 元素個數+1if (c + 1 < capacity) // 如果容量還沒滿notFull.signal(); // 在放鎖的條件對象notFull上喚醒正在等待的線程,表示可以再次往隊列里面加數據了,隊列還沒滿}} finally {putLock.unlock(); // 釋放放鎖,讓其他線程可以調用offer方法}if (c == 0) // 由于存在放鎖和拿鎖,這里可能拿鎖一直在消費數據,count會變化。這里的if條件表示如果隊列中還有1條數據signalNotEmpty(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列里還有1條數據,可以進行消費return c >= 0; // 添加成功返回true,否則返回false }put方法:
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException(); // 不允許空元素int c = -1;Node<E> node = new Node(e); // 以新元素構造節點final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly(); // 放鎖加鎖,保證調用put方法的時候只有1個線程try {while (count.get() == capacity) { // 如果容量滿了notFull.await(); // 阻塞并掛起當前線程}enqueue(node); // 節點添加到鏈表尾部c = count.getAndIncrement(); // 元素個數+1if (c + 1 < capacity) // 如果容量還沒滿notFull.signal(); // 在放鎖的條件對象notFull上喚醒正在等待的線程,表示可以再次往隊列里面加數據了,隊列還沒滿} finally {putLock.unlock(); // 釋放放鎖,讓其他線程可以調用put方法}if (c == 0) // 由于存在放鎖和拿鎖,這里可能拿鎖一直在消費數據,count會變化。這里的if條件表示如果隊列中還有1條數據signalNotEmpty(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的1個線程,表示隊列里還有1條數據,可以進行消費 }LinkedBlockingQueue的添加數據方法add,put,offer跟ArrayBlockingQueue一樣,不同的是它們的底層實現不一樣。
ArrayBlockingQueue中放入數據阻塞的時候,需要消費數據才能喚醒。
而LinkedBlockingQueue中放入數據阻塞的時候,因為它內部有2個鎖,可以并行執行放入數據和消費數據,不僅在消費數據的時候進行喚醒插入阻塞的線程,同時在插入的時候如果容量還沒滿,也會喚醒插入阻塞的線程。
數據的刪除
LinkedBlockingQueue有不同的幾個數據刪除方法,poll、take、remove方法。
poll方法:
public E poll() {final AtomicInteger count = this.count;if (count.get() == 0) // 如果元素個數為0return null; // 返回nullE x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;takeLock.lock(); // 拿鎖加鎖,保證調用poll方法的時候只有1個線程try {if (count.get() > 0) { // 判斷隊列里是否還有數據x = dequeue(); // 刪除頭結點c = count.getAndDecrement(); // 元素個數-1if (c > 1) // 如果隊列里還有元素notEmpty.signal(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的線程,表示隊列里還有數據,可以再次消費}} finally {takeLock.unlock(); // 釋放拿鎖,讓其他線程可以調用poll方法}if (c == capacity) // 由于存在放鎖和拿鎖,這里可能放鎖一直在添加數據,count會變化。這里的if條件表示如果隊列中還可以再插入數據signalNotFull(); // 在放鎖的條件對象notFull上喚醒正在等待的1個線程,表示隊列里還能再次添加數據return x; }take方法:
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly(); // 拿鎖加鎖,保證調用take方法的時候只有1個線程try {while (count.get() == 0) { // 如果隊列里已經沒有元素了notEmpty.await(); // 阻塞并掛起當前線程}x = dequeue(); // 刪除頭結點c = count.getAndDecrement(); // 元素個數-1if (c > 1) // 如果隊列里還有元素notEmpty.signal(); // 在拿鎖的條件對象notEmpty上喚醒正在等待的線程,表示隊列里還有數據,可以再次消費} finally {takeLock.unlock(); // 釋放拿鎖,讓其他線程可以調用take方法}if (c == capacity) // 由于存在放鎖和拿鎖,這里可能放鎖一直在添加數據,count會變化。這里的if條件表示如果隊列中還可以再插入數據signalNotFull(); // 在放鎖的條件對象notFull上喚醒正在等待的1個線程,表示隊列里還能再次添加數據return x; }remove方法:
public boolean remove(Object o) {if (o == null) return false;fullyLock(); // remove操作要移動的位置不固定,2個鎖都需要加鎖try {for (Node<E> trail = head, p = trail.next; // 從鏈表頭結點開始遍歷p != null;trail = p, p = p.next) {if (o.equals(p.item)) { // 判斷是否找到對象unlink(p, trail); // 修改節點的鏈接信息,同時調用notFull的signal方法return true;}}return false;} finally {fullyUnlock(); // 2個鎖解鎖} }LinkedBlockingQueue的take方法對于沒數據的情況下會阻塞,poll方法刪除鏈表頭結點,remove方法刪除指定的對象。
需要注意的是remove方法由于要刪除的數據的位置不確定,需要2個鎖同時加鎖。
?
總結
以上是生活随笔為你收集整理的Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2500元的电脑配置(电脑2500配置)
- 下一篇: 逆战的电脑配置要求高吗(逆战的电脑配置)