java阻塞队列小结
【README】
1,本文介紹了java的7個阻塞隊列;
2,阻塞隊列的作用
- 做緩沖作用,如緩沖kafka消息,而不是直接發送給kafka,減少kafka集群的壓力;
【1】阻塞隊列 BlockingQueue 概述
1,隊列是一種數據結構,先進先出;
2,阻塞隊列的意思是:
- 當阻塞隊列為空時,線程1從隊列取出元素會阻塞;直到線程2向隊列添加了新值;
- 但阻塞隊列滿時,線程1向隊列添加元素會阻塞;直到線程2從隊列取走了值(頭部元素);
3,阻塞隊列封裝了 線程阻塞,線程喚醒的底層方法,不需要程序員編寫這些代碼,減少了開發量和代碼簡潔;
【2】阻塞隊列介紹
【2.1】ArrayBlockingQueue 數組阻塞隊列
0)類描述:
底層使用可重入鎖 ReentrantLock 實現,每個操作前都會加鎖,操作完成后解鎖;所以僅允許一個線程串行操作阻塞隊列中的任意一個方法;(線程1操作a方法,線程1退出前,線程2不能操作b方法,注意是b方法)
基于數組的有界阻塞隊列;
在創建該隊列時,給定隊列容量 capacity,一旦創建,capacity無法修改;
此類支持 排序等待生產者和消費者線程的可選公平策略。默認情況下,不保證此順序。但是,在公平性設置為 true 的情況下構造的隊列以 FIFO 順序授予線程訪問權限。公平通常會降低吞吐量,但會減少可變性并避免饑餓。
此類及其迭代器實現了 Collection 和 Iterator 接口的所有可選方法。
1)類定義
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {2)構造器
public ArrayBlockingQueue(int capacity) {this(capacity, false); } // 帶有公平性 public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException(); // 基于數組實現的阻塞隊列 this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition(); } // 給定初始隊列元素 public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {this(capacity, fair);final ReentrantLock lock = this.lock;lock.lock(); // Lock only for visibility, not mutual exclusiontry {int i = 0;try {for (E e : c) {checkNotNull(e);items[i++] = e;}} catch (ArrayIndexOutOfBoundsException ex) {throw new IllegalArgumentException();}count = i;putIndex = (i == capacity) ? 0 : i;} finally {lock.unlock();} }3)元素操作方法
4)其他方法
【2.2】LinkedBlockingQueue 鏈表阻塞隊列
0)類描述
基于鏈表的可選有界隊列, 可選的意思是,給定容量則有界,否則無界;
基于鏈接節點的可選有界阻塞隊列(基于鏈表的可選有界隊列)。 此隊列對元素 FIFO(先進先出)進行排序。 隊列的頭部是在隊列中停留時間最長的那個元素。 隊列的尾部是在隊列中停留時間最短的那個元素。 新元素插入隊列尾部,隊列檢索操作獲取隊列頭部元素。
鏈接隊列通常比基于數組的隊列具有更高的吞吐量,但在大多數并發應用程序中的可預測性較差。
可選的容量綁定構造函數參數是一種 防止隊列過度擴展的方法。 如果未指定,容量等于 Integer.MAX_VALUE。 鏈接節點在每次插入時動態創建,除非這會使隊列超過容量。
此類及其迭代器實現了 Collection 和 Iterator 接口的所有可選方法。
此類是 Java 集合框架的成員。
1)類定義
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {2)構造器
2.1)鏈表節點-靜態內部類
static class Node<E> {E item;/*** One of:* - the real successor Node* - this Node, meaning the successor is head.next* - null, meaning there is no successor (this is the last node)*/Node<E> next;Node(E x) { item = x; } } // 兩把鎖 /** 元素獲取鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 元素非空條件 */ private final Condition notEmpty = takeLock.newCondition(); /** 元素插入鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** 元素非滿條件 */ private final Condition notFull = putLock.newCondition();2.2)構造器
// 構造器 public LinkedBlockingQueue() {this(Integer.MAX_VALUE); }// 給定隊列大小 public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null); // 頭節點 }// 創建大小為 Integer.Max_value 的,用給定容器初始化元素的 隊列 public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);final ReentrantLock putLock = this.putLock;putLock.lock(); // Never contended, but necessary for visibilitytry {int n = 0;for (E e : c) {if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {putLock.unlock();} }3)元素操作方法
同 ArrayBlockingQueue的元素操作方法,包括
【2.3】PriorityBlockingQueue 優先級阻塞隊列 (底層使用數組)
0)類描述
它是一個基于數組的無界阻塞隊列,自動擴容保證無界;插入元素用不阻塞,獲取元素阻塞(干貨);
一個無界阻塞隊列,它使用與 PriorityQueue 類相同的排序規則并提供阻塞檢索操作。
雖然這個隊列在邏輯上是無界的,但由于資源耗盡(導致 OutOfMemoryError),嘗試添加可能會失敗。此類不允許空元素。依賴于自然排序的優先級隊列也不允許插入不可比較的對象(這樣做會導致 ClassCastException)。
此類及其迭代器實現了 Collection 和 Iterator 接口的所有可選方法。方法 iterator() 中提供的 Iterator 不能保證以任何特定順序遍歷 PriorityBlockingQueue 的元素。如果您需要有序遍歷,請考慮使用 Arrays.sort(pq.toArray())。此外,drainTo 方法可用于按優先級順序刪除部分或全部元素,并將它們放置在另一個集合中。
對此類的操作不保證具有相同優先級的元素的順序。如果您需要強制排序,您可以定義自定義類或比較器,它們使用輔助鍵來打破主要優先級值之間的聯系。例如,這是一個將先進先出打破平局應用于可比元素的類。要使用它,您需要插入一個新的 FIFOEntry(anEntry) 而不是普通的條目對象。
1)類定義
public class PriorityBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {2)構造器
// 創建優先級阻塞度列,默認大小 Integer.max_value, 比較器為null,默認為字典序 public PriorityBlockingQueue() {this(DEFAULT_INITIAL_CAPACITY, null); } // private static final int DEFAULT_INITIAL_CAPACITY = 11; 默認大小11 // 創建優先級阻塞度列,給定大小 , 比較器為null,默認為字典序 public PriorityBlockingQueue(int initialCapacity) {this(initialCapacity, null); }// 創建優先級阻塞度列,給定大小 ,和比較器 public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator) {if (initialCapacity < 1)throw new IllegalArgumentException();this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.comparator = comparator;this.queue = new Object[initialCapacity]; } // 創建優先級阻塞隊列,包含給定的集合c;若集合是 SortedSet 或 PriorityQueue,則創建后的隊列的元素順序不變,否則采用字典序 public PriorityBlockingQueue(Collection<? extends E> c) {this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();boolean heapify = true; // true if not known to be in heap orderboolean screen = true; // true if must screen for nullsif (c instanceof SortedSet<?>) {SortedSet<? extends E> ss = (SortedSet<? extends E>) c;this.comparator = (Comparator<? super E>) ss.comparator();heapify = false;}else if (c instanceof PriorityBlockingQueue<?>) {PriorityBlockingQueue<? extends E> pq =(PriorityBlockingQueue<? extends E>) c;this.comparator = (Comparator<? super E>) pq.comparator();screen = false;if (pq.getClass() == PriorityBlockingQueue.class) // exact matchheapify = false;}Object[] a = c.toArray();int n = a.length;// If c.toArray incorrectly doesn't return Object[], copy it.if (a.getClass() != Object[].class)a = Arrays.copyOf(a, n, Object[].class);if (screen && (n == 1 || this.comparator != null)) {for (int i = 0; i < n; ++i)if (a[i] == null)throw new NullPointerException();}this.queue = a;this.size = n;if (heapify)heapify(); }3)元素操作方法
【2.4】DelayQueue - 延遲隊列
0)類描述
Delayed元素的無界阻塞隊列,每個元素只能在其延遲到期時被取用;
其底層使用了優先級隊列來實現;
Delayed 元素的無界阻塞隊列,其中每個元素只能在其延遲到期時被取用。
隊列的頭部是過去延遲過期最遠的那個 Delayed 元素。 如果沒有延遲到期,則沒有 頭部且 poll() 方法返回 null。
當元素的 getDelay(TimeUnit.NANOOSECONDS) 方法返回值小于或等于零的值時,就會發生過期。
盡管無法使用 take 或 poll()方法 取走(刪除)未過期的元素,但它們仍被視為普通元素。 例如,size 方法返回過期和未過期元素的總個數。 此隊列不允許空元素。
此類及其迭代器實現了 Collection 和 Iterator 接口的所有可選方法。
方法 iterator() 中提供的 Iterator 不保證以任何特定順序遍歷 DelayQueue 的元素。
1)類定義
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {// 延遲隊列底層使用 優先級隊列 private final PriorityQueue<E> q = new PriorityQueue<E>();....... }- 1.1)Delayed 接口定義?
一個混合風格的接口,用于標記在給定延遲后應采取行動的對象。
此接口的實現必須定義一個 compareTo 方法,以提供與其 getDelay 方法一致的排序。
- getDelay() 方法描述:
返回關聯對象剩余的延遲時間;單位為unit;
2)構造器
// 創建一個空的延遲隊列 public DelayQueue() {}// 創建延遲隊列,初始包含給定集合c public DelayQueue(Collection<? extends E> c) {this.addAll(c);}3)元素操作方法
【2.5】SynchronousQueue 同步隊列
0)類描述
一個阻塞隊列,其中每個插入操作都必須等待另一個線程執行相應的移除操作,反之亦然。同步隊列沒有任何內部容量。
- 您無法查看同步隊列,因為元素僅在您嘗試刪除它時才存在;
- 你不能插入一個元素(使用任何方法),除非另一個線程試圖刪除它;
- 你不能迭代,因為沒有什么可以迭代的。
隊列頭部是第一個排隊的插入線程試圖添加到隊列的元素;
如果沒有這樣的排隊線程,則沒有元素可用于刪除,poll() 將返回 null。對于其他集合方法(例如包含),SynchronousQueue 充當空集合。此隊列不允許空元素。
同步隊列類似于 CSP 和 Ada 中使用的集合通道。
它們非常適合切換設計,在這種設計中,在一個線程中運行的對象必須與在另一個線程中運行的對象同步,以便將某些信息、事件或任務交給它。
該類支持可選的公平策略,用于對等待的生產者線程和消費者線程進行排序。默認情況下,不保證此順序。但是,公平性設置為 true 的隊列會以 FIFO 順序授予線程訪問權限(公平性,誰最先等待,誰最先操作)。
此類及其迭代器實現了 Collection 和 Iterator 接口的所有可選方法。
1)類定義
public class SynchronousQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {2)構造器
public SynchronousQueue() {this(false); }// 若公平性fair設置為true,則等待線程根據 FIFO 順序競爭訪問 public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }3)方法描述
【2.6】LinkedTransferQueue 鏈表傳輸隊列
0)類描述
基于鏈接節點的無界 TransferQueue。
該隊列根據任何給定的生產者對元素 FIFO(先進先出)進行排序。隊列的頭部是某個生產者在隊列中停留時間最長的元素。隊列的尾部是某個生產者在隊列中停留時間最短的那個元素。
請注意,與大多數集合不同,size 方法不是恒定時間操作。
?由于這些隊列的異步特性,確定當前元素的數量需要遍歷所有元素,因此如果在遍歷期間修改此集合,則可能會報告不準確的結果。
此外,批量操作 addAll、removeAll、retainAll、containsAll、equals 和 toArray 不能保證以原子方式執行。例如,與 addAll 操作同時運行的迭代器可能只查看一些添加的元素。
此類及其迭代器實現了 Collection 和 Iterator 接口的所有可選方法。
與其他并發集合一樣,線程中的操作在將對象放入LinkedTransferQueue之前發生——在另一個線程中的LinkedTransferQueue中訪問或刪除該元素之后發生操作。
1)類定義
public class LinkedTransferQueue<E> extends AbstractQueue<E>implements TransferQueue<E>, java.io.Serializable {2)構造器
// 創建空鏈表傳輸隊列 public LinkedTransferQueue() { }// 創建包含給定集合c的鏈表傳輸隊列 public LinkedTransferQueue(Collection<? extends E> c) {this();addAll(c); }3)元素操作方法
/** Possible values for "how" argument in xfer method.*/ private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer【2.7】LinkedBlockingDeque 鏈表阻塞雙端隊列(注意是雙端)
0)類描述
基于鏈表的可選有界阻塞雙端隊列。
可選的容量提供了防止隊列過度擴展的方法。
如果未指定,容量等于 Integer.MAX_VALUE(默認)。 鏈接節點在每次插入時動態創建,除非這會使雙端隊列超出容量。
大多數操作在恒定時間內運行(忽略阻塞所花費的時間)。 但 remove、removeFirstOccurrence、removeLastOccurrence、contains、iterator.remove() 和批量操作,所有這些都在線性時間內運行。
此類及其迭代器實現了 Collection 和 Iterator 接口的所有可選方法。
1)類定義
public class LinkedBlockingDeque<E>extends AbstractQueue<E>implements BlockingDeque<E>, java.io.Serializable {2)構造器
// 默認容量的構造器 public LinkedBlockingDeque() {this(Integer.MAX_VALUE);}// 給定容量的構造器 public LinkedBlockingDeque(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;} // 創建一個包含 指定容器中所有元素 的雙端隊列 public LinkedBlockingDeque(Collection<? extends E> c) {this(Integer.MAX_VALUE);final ReentrantLock lock = this.lock;lock.lock(); // Never contended, but necessary for visibilitytry {for (E e : c) {if (e == null)throw new NullPointerException();if (!linkLast(new Node<E>(e)))throw new IllegalStateException("Deque full");}} finally {lock.unlock();}}底層使用了 Node節點,包括 上一個 下一個指針;
static final class Node<E> { E item;// 節點值 // 上一個節點Node<E> prev;// 下一個節點 Node<E> next;// 節點構造器 Node(E x) {item = x;}}3)元素操作方法
總結
以上是生活随笔為你收集整理的java阻塞队列小结的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ps的加深工具快捷键(ps加深简单工具)
- 下一篇: 头脑王者有多少个段位?头脑王者怎么升段位