JUC中的阻塞队列
JUC中的阻塞隊列
阻塞情況
阻塞隊列中,線程阻塞有這樣的兩種情況:
- 當隊列中沒有數據的情況下,消費者端的所有線程都會被自動阻塞,直到有數據放入隊列。
- 當隊列中填滿數據的情況下,生產者端的所有線程都會被自動阻塞,直到隊列中有空的位置,線程被自動喚醒。
如下兩圖:
空的情況
滿的情況
BlockingQueue接口
BlockingQueue的主要方法
官方提供的方法表格
| Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| Remove | remove() | poll() | take() | poll(time, unit) |
| Examine | element() | peek() | not applicable | not applicable |
插入
- public abstract boolean add(E paramE):將指定元素插入此隊列中(如果立即可行且不會違反容量限制),成功時返回 true,如果當前沒有可用的空間,則拋出 IllegalStateException。如果該元素是 NULL,則會拋出 NullPointerException 異常。
- public abstract boolean offer(E paramE):將指定元素插入此隊列中(如果立即可行且不會違反容量限制),成功時返回 true,如果當前沒有可用的空間,則返回 false。
- public abstract void put(E paramE) throws InterruptedException: 將指定元素插入此隊列中,將等待可用的空間(如果有必要)
- offer(E o, long timeout, TimeUnit unit):可以設定等待的時間,如果在指定的時間內,還不能往隊列中加入 BlockingQueue,則返回失敗。
獲取
- poll(time):取走 BlockingQueue 里排在首位的對象,若不能立即取出,則可以等 time 參數規定的時間,取不到時返回 null;
- poll(long timeout, TimeUnit unit):從 BlockingQueue 取出一個隊首的對象,如果在指定時間內,隊列一旦有數據可取,則立即返回隊列中的數據。否則直到時間超時還沒有數據可取,返回失敗。
- take():取走 BlockingQueue 里排在首位的對象,若 BlockingQueue 為空,阻斷進入等待狀態直到 BlockingQueue 有新的數據被加入。
- drainTo():一次性從 BlockingQueue 獲取所有可用的數據對象(還可以指定獲取數據的個數),通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖。
BlockingQueue接口的實現類
- ArrayBlockingQueue :由數組結構組成的有界阻塞隊列。
- LinkedBlockingQueue :由鏈表結構組成的有界阻塞隊列。
- PriorityBlockingQueue :支持優先級排序的無界阻塞隊列。
- DelayQueue:使用優先級隊列實現的無界阻塞隊列。
- SynchronousQueue:不存儲元素的阻塞隊列。
- LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列。
- LinkedBlockingDeque:由鏈表結構組成的雙向阻塞隊列
- ArrayBlockingQueue :用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。
- LinkedBlockingQueue(雙鎖提高并發) :基于鏈表的阻塞隊列,同 ArrayListBlockingQueue 類似,此隊列按照先進先出(FIFO)的原則對元素進行排序。 LinkedBlockingQueue 之所以能夠高效的處理并發數據,還因為其對于生產者端和消費者端分別采用了獨立的鎖來控制數據同步,這也意味著在高并發的情況下生產者和消費者可以并行地操作隊列中的數據,以此來提高整個隊列的并發性能。
- PriorityBlockingQueue :是一個支持優先級的無界隊列。默認情況下元素采取自然順序升序排列。可以自定義實現compareTo()方法來指定元素進行排序規則,或者初始化 PriorityBlockingQueue 時,指定構造參數 Comparator 來對元素進行排序。
- DelayQueue:是一個支持延時獲取元素的無界阻塞隊列。隊列使用 PriorityQueue 來實現。隊列中的元素必須實現 Delayed 接口,在創建元素時可以指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。
- SynchronousQueue:是一個不存儲元素的阻塞隊列。每一個 put 操作必須等待一個 take 操作,否則不能繼續添加元素。SynchronousQueue 可以看成是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。
- LinkedTransferQueue:相 對 于 其 他 阻 塞 隊 列 ,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。
- LinkedBlockingDeque:是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的你可以從隊列的兩端插入和移出元素。
ArrayBlockingQueue
ArrayBlockingQueue實現了BlockingQueue接口,所以擁有BlockingQueue的所有方法 ,看下面代碼時對照著上面的表看即可。
用ArrayBlockingQueue模仿生成者和消費者,生產者分別是A,B,C,其中A,C使用了add()方法添加元素,add()會拋出異常,所以做了處理,而B使用put()方法添加元素,隊列滿了也不會拋出隊列滿的異常,自會進行阻塞。消費者D負責那東西,使用remove()方法,remove()方法會拋異常,所以我這里在拿之前判斷檢測了一下隊列是否為空。
當然你也可以試試其他方法獲取隊列元素,或者填充隊列,就看你是那種需求,要阻塞的還是拋出異常的,還是放回true或false的。
public class ArrayblockingTest {//阻塞隊列private ArrayBlockingQueue<String> queue;public ArrayblockingTest(){queue = new ArrayBlockingQueue<String>(8);}//添加元素,隊列滿了的話,會拋異常的public void additem(String str){this.queue.add(str);}//put本身不拋異常,但有個InterruptedException要捕獲public void putitem(String str){try {this.queue.put(str);//這里處理一下中斷異常} catch (InterruptedException e) {e.printStackTrace();}}//會拋異常public String removeitem(){return this.queue.remove();}//不拋異常public boolean isempty(){return this.queue.peek() == null;}public static void main(String[] args) {ArrayblockingTest arrayblocking = new ArrayblockingTest();//生產者Anew Thread(()->{int i = 5;while (i>0)try {//這里捕獲拋出的異常,并處理arrayblocking.additem(Thread.currentThread().getName());i--;} catch (Exception e) {System.out.println("隊列已經滿了無法填充");continue;}},"A").start();//生產者Bnew Thread(()->{int i = 5;while (i>0){i--;//無異常拋出但會阻塞arrayblocking.putitem(Thread.currentThread().getName());}},"B").start();//生產者Cnew Thread(()->{int i = 5;while (i>0)try {//這里捕獲拋出的異常,并處理arrayblocking.additem(Thread.currentThread().getName());i--;} catch (Exception e) {System.out.println("隊列已經滿了無法填充");continue;}},"C").start();//消費者Dnew Thread(()->{while (true) {if (!arrayblocking.isempty()) {System.out.println(arrayblocking.removeitem());}else {System.out.println("當前隊列空");try {//睡一會,等填滿Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}},"D").start();} }SynchronousQueue
是一個不存儲元素的阻塞隊列。每一個 put 操作必須等待一個 take 操作,否則不能繼續添加元素。SynchronousQueue 可以看成是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。
public class SynblockingTest {private SynchronousQueue<String> synqueue;public SynblockingTest(){synqueue = new SynchronousQueue<>();}public void put(String str){try {synqueue.put(str);} catch (InterruptedException e) {e.printStackTrace();}}public String take(){try {return this.synqueue.take();} catch (InterruptedException e) {e.printStackTrace();}return null;}public static void main(String[] args) {SynblockingTest syn = new SynblockingTest();//生產,每1秒生產一個new Thread(()->{while (true){syn.put(Thread.currentThread().getName());try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}},"A").start();//消費,不斷消費,但因為生產者每一秒生產一個,//所以只能能生產者生產一個才能拿一個new Thread(()->{while (true)System.out.println(Thread.currentThread().getName()+"獲得:"+syn.take());},"B").start();} }總結
- 上一篇: 操作系统--中断和异常
- 下一篇: 儿童心跳每分钟多少次正常