queue double java_一文弄懂java中的Queue家族
java中Queue家族簡介
簡介
java中Collection集合有三大家族List,Set和Queue。當然Map也算是一種集合類,但Map并不繼承Collection接口。
List,Set在我們的工作中會經常使用,通常用來存儲結果數據,而Queue由于它的特殊性,通常用在生產者消費者模式中。
現在很火的消息中間件比如:Rabbit MQ等都是Queue這種數據結構的展開。
今天這篇文章將帶大家進入Queue家族。
Queue接口
先看下Queue的繼承關系和其中定義的方法:
Queue繼承自Collection,Collection繼承自Iterable。
Queue有三類主要的方法,我們用個表格來看一下他們的區別:
方法類型
方法名稱
方法名稱
區別
Insert
add
offer
兩個方法都表示向Queue中添加某個元素,不同之處在于添加失敗的情況,add只會返回true,如果添加失敗,會拋出異常。offer在添加失敗的時候會返回false。所以對那些有固定長度的Queue,優先使用offer方法。
Remove
remove
poll
如果Queue是空的情況下,remove會拋出異常,而poll會返回null。
Examine
element
peek
獲取Queue頭部的元素,但不從Queue中刪除。兩者的區別還是在于Queue為空的情況下,element會拋出異常,而peek返回null。
注意,因為對poll和peek來說null是有特殊含義的,所以一般來說Queue中禁止插入null,但是在實現中還是有一些類允許插入null比如LinkedList。
盡管如此,我們在使用中還是要避免插入null元素。
Queue的分類
一般來說Queue可以分為BlockingQueue,Deque和TransferQueue三種。
BlockingQueue
BlockingQueue是Queue的一種實現,它提供了兩種額外的功能:
當當前Queue是空的時候,從BlockingQueue中獲取元素的操作會被阻塞。
當當前Queue達到最大容量的時候,插入BlockingQueue的操作會被阻塞。
BlockingQueue的操作可以分為下面四類:
操作類型
Throws exception
Special value
Blocks
Times out
Insert
add(e)
offer(e)
put(e)
offer(e, time, unit)
Remove
remove()
poll()
take()
poll(time, unit)
Examine
element()
peek()
not applicable
not applicable
第一類是會拋出異常的操作,當遇到插入失敗,隊列為空的時候拋出異常。
第二類是不會拋出異常的操作。
第三類是會Block的操作。當Queue為空或者達到最大容量的時候。
第四類是time out的操作,在給定的時間里會Block,超時會直接返回。
BlockingQueue是線程安全的Queue,可以在生產者消費者模式的多線程中使用,如下所示:
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
最后,在一個線程中向BlockQueue中插入元素之前的操作happens-before另外一個線程中從BlockQueue中刪除或者獲取的操作。
Deque
Deque是Queue的子類,它代表double ended queue,也就是說可以從Queue的頭部或者尾部插入和刪除元素。
同樣的,我們也可以將Deque的方法用下面的表格來表示,Deque的方法可以分為對頭部的操作和對尾部的操作:
方法類型
Throws exception
Special value
Throws exception
Special value
Insert
addFirst(e)
offerFirst(e)
addLast(e)
offerLast(e)
Remove
removeFirst()
pollFirst()
removeLast()
pollLast()
Examine
getFirst()
peekFirst()
getLast()
peekLast()
和Queue的方法描述基本一致,這里就不多講了。
當Deque以 FIFO (First-In-First-Out)的方法處理元素的時候,Deque就相當于一個Queue。
當Deque以LIFO (Last-In-First-Out)的方式處理元素的時候,Deque就相當于一個Stack。
TransferQueue
TransferQueue繼承自BlockingQueue,為什么叫Transfer呢?因為TransferQueue提供了一個transfer的方法,生產者可以調用這個transfer方法,從而等待消費者調用take或者poll方法從Queue中拿取數據。
還提供了非阻塞和timeout版本的tryTransfer方法以供使用。
我們舉個TransferQueue實現的生產者消費者的問題。
先定義一個生產者:
@Slf4j
@Data
@AllArgsConstructor
class Producer implements Runnable {
private TransferQueue transferQueue;
private String name;
private Integer messageCount;
public static final AtomicInteger messageProduced = new AtomicInteger();
@Override
public void run() {
for (int i = 0; i < messageCount; i++) {
try {
boolean added = transferQueue.tryTransfer( "第"+i+"個", 2000, TimeUnit.MILLISECONDS);
log.info("transfered {} 是否成功: {}","第"+i+"個",added);
if(added){
messageProduced.incrementAndGet();
}
} catch (InterruptedException e) {
log.error(e.getMessage(),e);
}
}
log.info("total transfered {}",messageProduced.get());
}
}
在生產者的run方法中,我們調用了tryTransfer方法,等待2秒鐘,如果沒成功則直接返回。
再定義一個消費者:
@Slf4j
@Data
@AllArgsConstructor
public class Consumer implements Runnable {
private TransferQueue transferQueue;
private String name;
private int messageCount;
public static final AtomicInteger messageConsumed = new AtomicInteger();
@Override
public void run() {
for (int i = 0; i < messageCount; i++) {
try {
String element = transferQueue.take();
log.info("take {}",element );
messageConsumed.incrementAndGet();
Thread.sleep(500);
} catch (InterruptedException e) {
log.error(e.getMessage(),e);
}
}
log.info("total consumed {}",messageConsumed.get());
}
}
在run方法中,調用了transferQueue.take方法來取消息。
下面先看一下一個生產者,零個消費者的情況:
@Test
public void testOneProduceZeroConsumer() throws InterruptedException {
TransferQueue transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(10);
Producer producer = new Producer(transferQueue, "ProducerOne", 5);
exService.execute(producer);
exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
exService.shutdown();
}
輸出結果:
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第2個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第3個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第4個 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 0
可以看到,因為沒有消費者,所以消息并沒有發送成功。
再看下一個有消費者的情況:
@Test
public void testOneProduceOneConsumer() throws InterruptedException {
TransferQueue transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(10);
Producer producer = new Producer(transferQueue, "ProducerOne", 2);
Consumer consumer = new Consumer(transferQueue, "ConsumerOne", 2);
exService.execute(producer);
exService.execute(consumer);
exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
exService.shutdown();
}
輸出結果:
[pool-1-thread-2] INFO com.flydean.Consumer - take 第0個
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0個 是否成功: true
[pool-1-thread-2] INFO com.flydean.Consumer - take 第1個
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1個 是否成功: true
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 2
[pool-1-thread-2] INFO com.flydean.Consumer - total consumed 2
可以看到Producer和Consumer是一個一個來生產和消費的。
總結
本文介紹了Queue接口和它的三大分類,這三大分類又有非常多的實現類,我們將會在后面的文章中再詳細介紹。
歡迎關注我的公眾號:程序那些事,更多精彩等著您!
更多內容請訪問 www.flydean.com
總結
以上是生活随笔為你收集整理的queue double java_一文弄懂java中的Queue家族的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: server2019 sqlcmd命令安
- 下一篇: oracle12178错误,Oracle