一文弄懂java中的Queue家族
文章目錄
- 簡(jiǎn)介
- Queue接口
- Queue的分類
- BlockingQueue
- Deque
- TransferQueue
- 總結(jié)
java中Queue家族簡(jiǎn)介
簡(jiǎn)介
java中Collection集合有三大家族List,Set和Queue。當(dāng)然Map也算是一種集合類,但Map并不繼承Collection接口。
List,Set在我們的工作中會(huì)經(jīng)常使用,通常用來(lái)存儲(chǔ)結(jié)果數(shù)據(jù),而Queue由于它的特殊性,通常用在生產(chǎn)者消費(fèi)者模式中。
現(xiàn)在很火的消息中間件比如:Rabbit MQ等都是Queue這種數(shù)據(jù)結(jié)構(gòu)的展開(kāi)。
今天這篇文章將帶大家進(jìn)入Queue家族。
Queue接口
先看下Queue的繼承關(guān)系和其中定義的方法:
Queue繼承自Collection,Collection繼承自Iterable。
Queue有三類主要的方法,我們用個(gè)表格來(lái)看一下他們的區(qū)別:
| Insert | add | offer | 兩個(gè)方法都表示向Queue中添加某個(gè)元素,不同之處在于添加失敗的情況,add只會(huì)返回true,如果添加失敗,會(huì)拋出異常。offer在添加失敗的時(shí)候會(huì)返回false。所以對(duì)那些有固定長(zhǎng)度的Queue,優(yōu)先使用offer方法。 |
| Remove | remove | poll | 如果Queue是空的情況下,remove會(huì)拋出異常,而poll會(huì)返回null。 |
| Examine | element | peek | 獲取Queue頭部的元素,但不從Queue中刪除。兩者的區(qū)別還是在于Queue為空的情況下,element會(huì)拋出異常,而peek返回null。 |
注意,因?yàn)閷?duì)poll和peek來(lái)說(shuō)null是有特殊含義的,所以一般來(lái)說(shuō)Queue中禁止插入null,但是在實(shí)現(xiàn)中還是有一些類允許插入null比如LinkedList。
盡管如此,我們?cè)谑褂弥羞€是要避免插入null元素。
Queue的分類
一般來(lái)說(shuō)Queue可以分為BlockingQueue,Deque和TransferQueue三種。
BlockingQueue
BlockingQueue是Queue的一種實(shí)現(xiàn),它提供了兩種額外的功能:
BlockingQueue的操作可以分為下面四類:
| Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| Remove | remove() | poll() | take() | poll(time, unit) |
| Examine | element() | peek() | not applicable | not applicable |
第一類是會(huì)拋出異常的操作,當(dāng)遇到插入失敗,隊(duì)列為空的時(shí)候拋出異常。
第二類是不會(huì)拋出異常的操作。
第三類是會(huì)Block的操作。當(dāng)Queue為空或者達(dá)到最大容量的時(shí)候。
第四類是time out的操作,在給定的時(shí)間里會(huì)Block,超時(shí)會(huì)直接返回。
BlockingQueue是線程安全的Queue,可以在生產(chǎn)者消費(fèi)者模式的多線程中使用,如下所示:
class Producer implements Runnable {private final BlockingQueue queue;Producer(BlockingQueue q) { queue = q; }public void run() {try {while (true) { queue.put(produce()); }} catch (InterruptedException ex) { ... handle ...}}Object produce() { ... }}class Consumer implements Runnable {private final BlockingQueue queue;Consumer(BlockingQueue q) { queue = q; }public void run() {try {while (true) { consume(queue.take()); }} catch (InterruptedException ex) { ... handle ...}}void consume(Object x) { ... }}class Setup {void main() {BlockingQueue q = new SomeQueueImplementation();Producer p = new Producer(q);Consumer c1 = new Consumer(q);Consumer c2 = new Consumer(q);new Thread(p).start();new Thread(c1).start();new Thread(c2).start();}}最后,在一個(gè)線程中向BlockQueue中插入元素之前的操作happens-before另外一個(gè)線程中從BlockQueue中刪除或者獲取的操作。
Deque
Deque是Queue的子類,它代表double ended queue,也就是說(shuō)可以從Queue的頭部或者尾部插入和刪除元素。
同樣的,我們也可以將Deque的方法用下面的表格來(lái)表示,Deque的方法可以分為對(duì)頭部的操作和對(duì)尾部的操作:
| Insert | addFirst(e) | offerFirst(e) | addLast(e) | offerLast(e) |
| Remove | removeFirst() | pollFirst() | removeLast() | pollLast() |
| Examine | getFirst() | peekFirst() | getLast() | peekLast() |
和Queue的方法描述基本一致,這里就不多講了。
當(dāng)Deque以 FIFO (First-In-First-Out)的方法處理元素的時(shí)候,Deque就相當(dāng)于一個(gè)Queue。
當(dāng)Deque以LIFO (Last-In-First-Out)的方式處理元素的時(shí)候,Deque就相當(dāng)于一個(gè)Stack。
TransferQueue
TransferQueue繼承自BlockingQueue,為什么叫Transfer呢?因?yàn)門(mén)ransferQueue提供了一個(gè)transfer的方法,生產(chǎn)者可以調(diào)用這個(gè)transfer方法,從而等待消費(fèi)者調(diào)用take或者poll方法從Queue中拿取數(shù)據(jù)。
還提供了非阻塞和timeout版本的tryTransfer方法以供使用。
我們舉個(gè)TransferQueue實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者的問(wèn)題。
先定義一個(gè)生產(chǎn)者:
@Slf4j @Data @AllArgsConstructor class Producer implements Runnable {private TransferQueue<String> transferQueue;private String name;private Integer messageCount;public static final AtomicInteger messageProduced = new AtomicInteger();@Overridepublic void run() {for (int i = 0; i < messageCount; i++) {try {boolean added = transferQueue.tryTransfer( "第"+i+"個(gè)", 2000, TimeUnit.MILLISECONDS);log.info("transfered {} 是否成功: {}","第"+i+"個(gè)",added);if(added){messageProduced.incrementAndGet();}} catch (InterruptedException e) {log.error(e.getMessage(),e);}}log.info("total transfered {}",messageProduced.get());} }在生產(chǎn)者的run方法中,我們調(diào)用了tryTransfer方法,等待2秒鐘,如果沒(méi)成功則直接返回。
再定義一個(gè)消費(fèi)者:
@Slf4j @Data @AllArgsConstructor public class Consumer implements Runnable {private TransferQueue<String> transferQueue;private String name;private int messageCount;public static final AtomicInteger messageConsumed = new AtomicInteger();@Overridepublic void run() {for (int i = 0; i < messageCount; i++) {try {String element = transferQueue.take();log.info("take {}",element );messageConsumed.incrementAndGet();Thread.sleep(500);} catch (InterruptedException e) {log.error(e.getMessage(),e);}}log.info("total consumed {}",messageConsumed.get());}}在run方法中,調(diào)用了transferQueue.take方法來(lái)取消息。
下面先看一下一個(gè)生產(chǎn)者,零個(gè)消費(fèi)者的情況:
@Testpublic void testOneProduceZeroConsumer() throws InterruptedException {TransferQueue<String> transferQueue = new LinkedTransferQueue<>();ExecutorService exService = Executors.newFixedThreadPool(10);Producer producer = new Producer(transferQueue, "ProducerOne", 5);exService.execute(producer);exService.awaitTermination(50000, TimeUnit.MILLISECONDS);exService.shutdown();}輸出結(jié)果:
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0個(gè) 是否成功: false [pool-1-thread-1] INFO com.flydean.Producer - transfered 第1個(gè) 是否成功: false [pool-1-thread-1] INFO com.flydean.Producer - transfered 第2個(gè) 是否成功: false [pool-1-thread-1] INFO com.flydean.Producer - transfered 第3個(gè) 是否成功: false [pool-1-thread-1] INFO com.flydean.Producer - transfered 第4個(gè) 是否成功: false [pool-1-thread-1] INFO com.flydean.Producer - total transfered 0可以看到,因?yàn)闆](méi)有消費(fèi)者,所以消息并沒(méi)有發(fā)送成功。
再看下一個(gè)有消費(fèi)者的情況:
@Testpublic void testOneProduceOneConsumer() throws InterruptedException {TransferQueue<String> transferQueue = new LinkedTransferQueue<>();ExecutorService exService = Executors.newFixedThreadPool(10);Producer producer = new Producer(transferQueue, "ProducerOne", 2);Consumer consumer = new Consumer(transferQueue, "ConsumerOne", 2);exService.execute(producer);exService.execute(consumer);exService.awaitTermination(50000, TimeUnit.MILLISECONDS);exService.shutdown();}輸出結(jié)果:
[pool-1-thread-2] INFO com.flydean.Consumer - take 第0個(gè) [pool-1-thread-1] INFO com.flydean.Producer - transfered 第0個(gè) 是否成功: true [pool-1-thread-2] INFO com.flydean.Consumer - take 第1個(gè) [pool-1-thread-1] INFO com.flydean.Producer - transfered 第1個(gè) 是否成功: true [pool-1-thread-1] INFO com.flydean.Producer - total transfered 2 [pool-1-thread-2] INFO com.flydean.Consumer - total consumed 2可以看到Producer和Consumer是一個(gè)一個(gè)來(lái)生產(chǎn)和消費(fèi)的。
總結(jié)
本文介紹了Queue接口和它的三大分類,這三大分類又有非常多的實(shí)現(xiàn)類,我們將會(huì)在后面的文章中再詳細(xì)介紹。
更多精彩內(nèi)容且看:
- 區(qū)塊鏈從入門(mén)到放棄系列教程-涵蓋密碼學(xué),超級(jí)賬本,以太坊,Libra,比特幣等持續(xù)更新
- Spring Boot 2.X系列教程:七天從無(wú)到有掌握Spring Boot-持續(xù)更新
- Spring 5.X系列教程:滿足你對(duì)Spring5的一切想象-持續(xù)更新
- java程序員從小工到專家成神之路(2020版)-持續(xù)更新中,附詳細(xì)文章教程
歡迎關(guān)注我的公眾號(hào):程序那些事,更多精彩等著您!
更多內(nèi)容請(qǐng)?jiān)L問(wèn) www.flydean.com
總結(jié)
以上是生活随笔為你收集整理的一文弄懂java中的Queue家族的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: SkipList和java中Concur
- 下一篇: SynchronousQueue详解