Java多线程-BlockingQueue-ArrayBlockingQueue-LinkedBlockingQueue
前言:
? BlockingQueue很好的解決了多線程中,如何高效安全“傳輸”數(shù)據(jù)的問題。通過這些高效并且線程安全的隊列類,為我們快速搭建高質(zhì)量的多線程程序帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的兩個重要成員,包括他們各自的功能以及常見使用場景。
認識BlockingQueue
? 阻塞隊列,顧名思義,首先它是一個隊列,而一個隊列在數(shù)據(jù)結(jié)構(gòu)中所起的作用大致如下圖所示:
? 從圖中我們可以很清楚看到,通過一個共享的隊列,可以使得數(shù)據(jù)由隊列的一端輸入,從另外一端輸出;常用的隊列主要有以下兩種:(當然通過不同的實現(xiàn)方式,還可以延伸出很多不同類型的隊列,DelayQueue就是其中的一種)
先進先出(FIFO):先插入的隊列的元素也最先出隊列,類似于排隊的功能。從某種程度上來說這種隊列也體現(xiàn)了一種公平性。
后進先出(LIFO):后插入隊列的元素最先出隊列,這種隊列優(yōu)先處理最近發(fā)生的事件。
? 多線程環(huán)境中,通過隊列可以很容易實現(xiàn)數(shù)據(jù)共享,比如經(jīng)典的“生產(chǎn)者”和“消費者”模型中,通過隊列可以很便利地實現(xiàn)兩者之間的數(shù)據(jù)共享。假設我們有若干生產(chǎn)者線程,另外又有若干個消費者線程。如果生產(chǎn)者線程需要把準備好的數(shù)據(jù)共享給消費者線程,利用隊列的方式來傳遞數(shù)據(jù),就可以很方便地解決他們之間的數(shù)據(jù)共享問題。但如果生產(chǎn)者和消費者在某個時間段內(nèi),萬一發(fā)生數(shù)據(jù)處理速度不匹配的情況呢?理想情況下,如果生產(chǎn)者產(chǎn)出數(shù)據(jù)的速度大于消費者消費的速度,并且當生產(chǎn)出來的數(shù)據(jù)累積到一定程度的時候,那么生產(chǎn)者必須暫停等待一下(阻塞生產(chǎn)者線程),以便等待消費者線程把累積的數(shù)據(jù)處理完畢,反之亦然。
? 然而,在concurrent包發(fā)布以前,在多線程環(huán)境下,我們每個程序員都必須去自己控制這些細節(jié),尤其還要兼顧效率和線程安全,而這會給我們的程序帶來不小的復雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多線程領域:所謂阻塞,在某些情況下會掛起線程,一旦條件滿足,被掛起的線程又會自動被喚醒)
? 下面兩幅圖演示了BlockingQueue的兩個常見阻塞場景:
如圖所示:當隊列中沒有數(shù)據(jù)的情況下,消費者端的所有線程都會被自動阻塞(掛起),直到有數(shù)據(jù)放入隊列。
如圖所示:當隊列中填滿數(shù)據(jù)的情況下,生產(chǎn)者端的所有線程都會被自動阻塞(掛起),直到隊列中有空的位置,線程被自動喚醒。
? 這也是我們在多線程環(huán)境下,為什么需要BlockingQueue的原因。作為BlockingQueue的使用者,我們再也不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大,讓我們一起來見識下它的常用方法:
BlockingQueue的核心方法:
放入數(shù)據(jù):
offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當前執(zhí)行方法的線程)
offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內(nèi),還不能往隊列中加入BlockingQueue,則返回失敗。
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調(diào)用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續(xù).
獲取數(shù)據(jù):
poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數(shù)規(guī)定的時間,取不到時返回null;
poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內(nèi),隊列一旦有數(shù)據(jù)可取,則立即返回隊列中的數(shù)據(jù)。否則知道時間超時還沒有數(shù)據(jù)可取,返回失敗。
take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態(tài)直到BlockingQueue有新的數(shù)據(jù)被加入;
drainTo():一次性從BlockingQueue獲取所有可用的數(shù)據(jù)對象(還可以指定獲取數(shù)據(jù)的個數(shù)),通過該方法,可以提升獲取數(shù)據(jù)效率;不需要多次分批加鎖或釋放鎖。
常見BlockingQueue
在了解了BlockingQueue的基本功能后,讓我們來看看BlockingQueue家庭大致有哪些成員?
1. ArrayBlockingQueue
? 基于數(shù)組的阻塞隊列實現(xiàn),在ArrayBlockingQueue內(nèi)部,維護了一個定長數(shù)組,以便緩存隊列中的數(shù)據(jù)對象,這是一個常用的阻塞隊列,除了一個定長數(shù)組外,ArrayBlockingQueue內(nèi)部還保存著兩個整形變量,分別標識著隊列的頭部和尾部在數(shù)組中的位置。
? ArrayBlockingQueue在生產(chǎn)者放入數(shù)據(jù)和消費者獲取數(shù)據(jù),都是共用同一個鎖對象,由此也意味著兩者無法真正并行運行,這點尤其不同于LinkedBlockingQueue;
? 按照實現(xiàn)原理來分析,ArrayBlockingQueue完全可以采用分離鎖,從而實現(xiàn)生產(chǎn)者和消費者操作的完全并行運行。之所以沒這樣去做,猜測是因為ArrayBlockingQueue的數(shù)據(jù)寫入和獲取操作已經(jīng)足夠輕巧,以至于引入獨立的鎖機制,除了給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在于,前者在插入或刪除元素時不會產(chǎn)生或銷毀任何額外的對象實例,而后者則會生成一個額外的Node對象。這在長時間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的系統(tǒng)中,其對于GC的影響還是存在一定的區(qū)別。而在創(chuàng)建ArrayBlockingQueue時,我們還可以控制對象的內(nèi)部鎖是否采用公平鎖,默認采用非公平鎖。
2. LinkedBlockingQueue
? 基于鏈表的阻塞隊列,同ArrayListBlockingQueue類似,其內(nèi)部也維持著一個數(shù)據(jù)緩沖隊列(該隊列由一個鏈表構(gòu)成),當生產(chǎn)者往隊列中放入一個數(shù)據(jù)時,隊列會從生產(chǎn)者手中獲取數(shù)據(jù),并緩存在隊列內(nèi)部,而生產(chǎn)者立即返回;只有當隊列緩沖區(qū)達到最大值緩存容量時(LinkedBlockingQueue可以通過構(gòu)造函數(shù)指定該值),才會阻塞生產(chǎn)者隊列,直到消費者從隊列中消費掉一份數(shù)據(jù),生產(chǎn)者線程會被喚醒,反之對于消費者這端的處理也基于同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理并發(fā)數(shù)據(jù),還因為其對于生產(chǎn)者端和消費者端分別采用了獨立的鎖來控制數(shù)據(jù)同步,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費者可以并行地操作隊列中的數(shù)據(jù),以此來提高整個隊列的并發(fā)性能。
? 作為開發(fā)者,我們需要注意的是,如果構(gòu)造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產(chǎn)者的速度一旦大于消費者的速度,也許還沒有等到隊列滿阻塞產(chǎn)生,系統(tǒng)內(nèi)存就有可能已被消耗殆盡了。
? ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞隊列,一般情況下,在處理多線程間的生產(chǎn)者消費者問題,使用這兩個類足以。
BlockingQueueTest.java
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue;public class BlockingQueueTest {public static void main(String[] args) throws InterruptedException {// 聲明一個容量為10的緩存隊列BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);Producer producer1 = new Producer(queue);Producer producer2 = new Producer(queue);Producer producer3 = new Producer(queue);Consumer consumer = new Consumer(queue);// 借助ExecutorsExecutorService service = Executors.newCachedThreadPool();// 啟動線程 service.execute(producer1);service.execute(producer2);service.execute(producer3);service.execute(consumer);// 執(zhí)行10sThread.sleep(10 * 1000);producer1.stop();producer2.stop();producer3.stop();Thread.sleep(2000);// 退出Executor service.shutdown();} }? 上面程序使用了Executors線程池的用法,詳細內(nèi)容可以參考:http://www.cnblogs.com/liqiu/p/3649360.html
Cosumer.java
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class Consumer implements Runnable {public Consumer(BlockingQueue<String> queue) {this.queue = queue;}public void run() {System.out.println("啟動消費者線程!");Random r = new Random();boolean isRunning = true;try {while (isRunning) {System.out.println("正從隊列獲取數(shù)據(jù)...");String data = queue.poll(2, TimeUnit.SECONDS);if (null != data) {System.out.println("拿到數(shù)據(jù):" + data);System.out.println("正在消費數(shù)據(jù):" + data);Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));} else {// 超過2s還沒數(shù)據(jù),認為所有生產(chǎn)線程都已經(jīng)退出,自動退出消費線程。isRunning = false;}}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} finally {System.out.println("退出消費者線程!");}}private BlockingQueue<String> queue;private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; }Producer.java
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable {public Producer(BlockingQueue queue) {this.queue = queue;}public void run() {String data = null;Random r = new Random();System.out.println("啟動生產(chǎn)者線程!");try {while (isRunning) {System.out.println("正在生產(chǎn)數(shù)據(jù)...");Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));data = "data:" + count.incrementAndGet();System.out.println("將數(shù)據(jù):" + data + "放入隊列...");if (!queue.offer(data, 2, TimeUnit.SECONDS)) {System.out.println("放入數(shù)據(jù)失敗:" + data);}}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} finally {System.out.println("退出生產(chǎn)者線程!");}}public void stop() {isRunning = false;}private volatile boolean isRunning = true;private BlockingQueue queue;private static AtomicInteger count = new AtomicInteger();private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;}?執(zhí)行結(jié)果:
。。。。。。。。。。。。。。。 正從隊列獲取數(shù)據(jù)... 拿到數(shù)據(jù):data:36 正在消費數(shù)據(jù):data:36 正從隊列獲取數(shù)據(jù)... 拿到數(shù)據(jù):data:37 正在消費數(shù)據(jù):data:37 正從隊列獲取數(shù)據(jù)... 拿到數(shù)據(jù):data:38 正在消費數(shù)據(jù):data:38 。。。。。。。。。。。。?
總結(jié)
以上是生活随笔為你收集整理的Java多线程-BlockingQueue-ArrayBlockingQueue-LinkedBlockingQueue的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java笔记之数组的概念、声明、初始化、
- 下一篇: linux 命令行字符终端termina