高并发之并发容器,了解多少(从入门到超神)
點(diǎn)擊上方“好好學(xué)java”,選擇“置頂公眾號(hào)”
優(yōu)秀學(xué)習(xí)資源、干貨第一時(shí)間送達(dá)!
?精彩內(nèi)容?
java實(shí)戰(zhàn)練習(xí)項(xiàng)目教程
2018微服務(wù)資源springboot、springcloud、docker、dubbo實(shí)戰(zhàn)等傾心分享
2018年java架構(gòu)師全套學(xué)習(xí)教程
最新大數(shù)據(jù)培訓(xùn)完整視頻教程
2018年java最新全套培訓(xùn)學(xué)習(xí)教程
一、ConcurrentHashMap
在上面已經(jīng)提到過(guò)ConcurrentHashMap,ConcurrentHashMap相比Hashtable能夠進(jìn)一步提高并發(fā)性,其原理圖如下:
HashMap,Hashtable與ConcurrentHashMap都是實(shí)現(xiàn)的哈希表數(shù)據(jù)結(jié)構(gòu),在隨機(jī)讀取的時(shí)候效率很高。Hashtable實(shí)現(xiàn)同步是利用synchronized關(guān)鍵字進(jìn)行鎖定的,其是針對(duì)整張哈希表進(jìn)行鎖定的,即每次鎖住整張表讓線程獨(dú)占,在線程安全的背后是巨大的浪費(fèi)。ConcurrentHashMap和Hashtable主要區(qū)別就是圍繞著鎖的粒度進(jìn)行區(qū)別以及如何區(qū)鎖定。
上圖中,左邊是Hashtable的實(shí)現(xiàn)方式,可以看到鎖住整個(gè)哈希表;而右邊則是ConcurrentHashMap的實(shí)現(xiàn)方式,單獨(dú)鎖住每一個(gè)桶(segment).ConcurrentHashMap將哈希表分為16個(gè)桶(默認(rèn)值),諸如get(),put(),remove()等常用操作只鎖當(dāng)前需要用到的桶,而size()才鎖定整張表。原來(lái)只能一個(gè)線程進(jìn)入,現(xiàn)在卻能同時(shí)接受16個(gè)寫(xiě)線程并發(fā)進(jìn)入(寫(xiě)線程需要鎖定,而讀線程幾乎不受限制),并發(fā)性的提升是顯而易見(jiàn)的。
而在迭代時(shí),ConcurrentHashMap使用了不同于傳統(tǒng)集合的快速失敗迭代器(fast-fail iterator)的另一種迭代方式,稱(chēng)為弱一致迭代器。在這種迭代方式中,當(dāng)iterator被創(chuàng)建后集合再發(fā)生改變就不再是拋出ConcurrentModificationException,取而代之的是在改變時(shí)實(shí)例化出新的數(shù)據(jù)從而不影響原有的數(shù)據(jù),iterator完成后再將頭指針替換為新的數(shù)據(jù),這樣iterator線程可以使用原來(lái)老的數(shù)據(jù),而寫(xiě)線程也可以并發(fā)的完成改變,更重要的,這保證了多個(gè)線程并發(fā)執(zhí)行的連續(xù)性和擴(kuò)展性,是性能提升的關(guān)鍵。
我們?cè)谏厦骊U述了ConcurrentHashMap的使用特點(diǎn)和原理,分別在同樣的一個(gè)高并發(fā)場(chǎng)景下,測(cè)試不同的方式產(chǎn)生的延時(shí)(ms):
Map<String,?String>?map?=?new?ConcurrentHashMap<>();//483 Map<String,?String>?map?=?new?ConcurrentSkipListMap<>();?//高并發(fā)并且排序?559 Map<String,?String>?map?=?new?Hashtable<>();?//499? Map<String,?String>?map?=Collections.synchronizedMap(new?HashMap<>());?//?530 Map<String,?String>?map?=Collections.synchronizedMap(new?TreeMap());?//905以ConcurrentLinkedQueue為例,他實(shí)現(xiàn)了Queue接口,實(shí)例化方式如下:
Queue<String>?strs?=?new?ConcurrentLinkedQueue<>();添加元素的方法:offer()
取出隊(duì)頭的方法:poll()
判斷隊(duì)列長(zhǎng)度:size()
對(duì)于雙端隊(duì)列,使用ConcurrentLinkedDeque類(lèi)型來(lái)實(shí)現(xiàn).
下面我們?cè)倏匆粋€(gè)具體的實(shí)例:
public?class?T01_ConcurrentMap?{public?static?void?main(String[]?args)?{Map<String,?String>?map?=?new?ConcurrentHashMap<String,?String>();//Map<String,?String>?map?=?new?ConcurrentSkipListMap<String,?String>();?//高并發(fā)并且排序//Map<String,?String>?map?=?new?Hashtable<>();//Map<String,?String>?map?=?new?HashMap<String,?String>();Random?random?=?new?Random();Thread[]?threads?=?new?Thread[100];CountDownLatch?latch?=?new?CountDownLatch(threads.length);long?start?=?System.currentTimeMillis();for?(int?i?=?0;?i?<?threads.length;?i++)?{threads[i]?=?new?Thread(()->{for(int?j=0;?j<10000;j++)?map.put("a"?+?random.nextInt(100000),?"a"?+?random.nextInt(100000));latch.countDown();});}Arrays.asList(threads).forEach(t->t.start());try?{latch.await();}?catch?(InterruptedException?e)?{e.printStackTrace();}long?end?=?System.currentTimeMillis();System.out.println(end-start);} }啟動(dòng)100個(gè)線程,向圖中添加100000個(gè)元素,分別使用Hashtable,HashMap,ConcurrentHashMap,ConcurrentSkipListMap定義map,判斷程序完成的時(shí)間。最終發(fā)現(xiàn),ConcurrentHashMap要比HashMap效率高,ConcurrentHashMap是將大鎖分成若干小鎖,實(shí)現(xiàn)多個(gè)線程共同運(yùn)行,所以,效率有很大差距。ConcurrentSkipListMap較ConcurrentHashMap除了實(shí)現(xiàn)高并發(fā)外還能夠排序。
參考:
http://blog.csdn.net/sunxianghuang/article/details/52221913 http://www.educity.cn/java/498061.html二、ConcurrentQueue
與ConcurrentHashMap相同,ConcurrentQueue也是通過(guò)同樣的方式來(lái)提高并發(fā)性能的。
我們?cè)谕饺萜髦刑岬竭^(guò)火車(chē)票問(wèn)題:
有N張火車(chē)票,每張票都有一個(gè)編號(hào),同時(shí)有10個(gè)窗口對(duì)外售票,寫(xiě)一個(gè)模擬程序。
在上述問(wèn)題中,也可以使用ConcurrentQueue進(jìn)一步提高并發(fā)性:
static?Queue<String>?tickets?=?new?ConcurrentLinkedQueue<>();具體的代碼是這樣的:
public?class?TicketSeller4?{static?Queue<String>?tickets?=?new?ConcurrentLinkedQueue<>();static?{for(int?i=0;?i<1000;?i++)?tickets.add("票編號(hào):"?+?i);}public?static?void?main(String[]?args)?{for(int?i=0;?i<10;?i++)?{new?Thread(()->{while(true)?{String?s?=?tickets.poll();if(s?==?null)?break;else?System.out.println("銷(xiāo)售了--"?+?s);}}).start();}} }這里面通過(guò)ConcurrentLinkedQueue的poll()方法來(lái)實(shí)現(xiàn)獲取容器成員的。用這個(gè)類(lèi)型可以進(jìn)一步提高并發(fā)性。
具體基本操作實(shí)例
public?class?T04_ConcurrentQueue?{public?static?void?main(String[]?args)?{Queue<String>?strings?=?new?ConcurrentLinkedQueue<String>();for?(int?i?=?0;?i?<?10;?i++)?{strings.offer("a"?+?i);?//相當(dāng)于add,?放進(jìn)隊(duì)列}System.out.println(strings);System.out.println(strings.size());System.out.println(strings.poll());?//取出并移除掉System.out.println(strings.size());System.out.println(strings.peek());?//取出,不會(huì)移除。相當(dāng)于get(0)System.out.println(strings.size());} }三、CopyOnWriteArrayList
寫(xiě)時(shí)復(fù)制容器,即copy-on-write,在多線程環(huán)境下,寫(xiě)時(shí)效率低,讀時(shí)效率高,適合寫(xiě)少讀多的環(huán)境。對(duì)比測(cè)試幾種情況:
List<String>?lists?=?new?ArrayList<>(); //這個(gè)會(huì)出并發(fā)問(wèn)題!報(bào)錯(cuò):ArrayIndexOutOfBoundsException List<String>?lists?=?new?Vector();//111?ms List<String>?lists?=?new?CopyOnWriteArrayList<>();//5230?ms //測(cè)試核心代碼: Runnable?task?=?new?Runnable()?{ @Override public?void?run()?{for(int?i=0;?i<1000;?i++)?lists.add("a"?+?????r.nextInt(10000)); } }; //多線程向該容器中不斷加入數(shù)據(jù)。從JDK 5開(kāi)始Java并發(fā)包里提供了兩個(gè)使用CopyOnWrite機(jī)制實(shí)現(xiàn)的并發(fā)容器,它們是CopyOnWriteArrayList和CopyOnWriteArraySet。
當(dāng)我們往一個(gè)容器添加元素的時(shí)候,不直接往當(dāng)前容器添加,而是先將當(dāng)前容器進(jìn)行Copy,復(fù)制出一個(gè)新的容器,然后向新的容器里添加元素,添加完元素之后,再將原容器的引用指向新的容器。這樣做的好處是我們可以對(duì)CopyOnWrite容器進(jìn)行并發(fā)的讀,而不需要加鎖,因?yàn)樵诋?dāng)前讀的容器中不會(huì)添加任何元素。所以CopyOnWrite容器是一種讀寫(xiě)分離的思想,讀和寫(xiě)對(duì)應(yīng)不同的容器。
四、BlockingQueue
這種并發(fā)容器,會(huì)自動(dòng)實(shí)現(xiàn)阻塞式的生產(chǎn)者/消費(fèi)者模式。使用隊(duì)列解耦合,在實(shí)現(xiàn)異步事物的時(shí)候很有用。下面的例子,實(shí)現(xiàn)了阻塞隊(duì)列:
LinkedBlockingQueue static?BlockingQueue<String>?strs?=?new?LinkedBlockingQueue<>(10); strs.put("a"?+?i);?//加入隊(duì)列,如果滿了,就會(huì)等待 strs.take();?//取出隊(duì)列元素,如果空了,就會(huì)等待在實(shí)例化時(shí),可以指定具體的隊(duì)列容量。
在加入成員的時(shí)候,除了使用put方法還可以使用其他方法:
五、ArrayBlockingQueue
static?BlockingQueue<String>?strs?=?new?ArrayBlockingQueue<>(10);對(duì)象的方法和上面的BlockingQueue是一樣的,用法也是一樣的。
二者的區(qū)別主要是:
LinkedBlockingQueue是一個(gè)單向鏈表實(shí)現(xiàn)的阻塞隊(duì)列,在鏈表一頭加入元素,如果隊(duì)列滿,就會(huì)阻塞,另一頭取出元素,如果隊(duì)列為空,就會(huì)阻塞。
LinkedBlockingQueue內(nèi)部使用ReentrantLock實(shí)現(xiàn)插入鎖(putLock)和取出鎖(takeLock)。
相比于數(shù)組實(shí)現(xiàn)的ArrayBlockingQueue的有界情況,我們稱(chēng)之為有界隊(duì)列,LinkedBlockingQueue可認(rèn)為是無(wú)界隊(duì)列。當(dāng)然,也可以向上面那樣指定隊(duì)列容量,但是這個(gè)參數(shù)常常是省略的,多用于任務(wù)隊(duì)列。
六、LinkedBlockingQueue實(shí)例
public?class?T05_LinkedBlockingQueue?{private?static?BlockingQueue<String>?strings?=?new?LinkedBlockingQueue<String>();private?static?Random?r?=?new?Random();public?static?void?main(String[]?args)?{new?Thread(()->{for?(int?i?=?0;?i?<?100;?i++)?{try?{strings.put("a"?+?i);?//如果滿了,就會(huì)等待TimeUnit.SECONDS.sleep(r.nextInt(10));}?catch?(Exception?e)?{e.printStackTrace();}}},?"p1").start();for?(int?i?=?0;?i?<?5;?i++)?{new?Thread(()->{for(;;){try?{System.out.println(Thread.currentThread().getName()?+?"take?-"?+?strings.take());?//如果空了,就會(huì)等待}?catch?(Exception?e)?{e.printStackTrace();}?}},"c"?+?i).start();}} }LinkedBlockingQueue是使用鏈表是實(shí)現(xiàn)的阻塞式容器。
七、DelayQueue
DelayQueue也是一個(gè)BlockingQueue,其特化的參數(shù)是Delayed。
Delayed擴(kuò)展了Comparable接口,比較的基準(zhǔn)為延時(shí)的時(shí)間值,Delayed接口的實(shí)現(xiàn)類(lèi)getDelay()的返回值應(yīng)為固定值(final).DelayQueue內(nèi)部是使用PriorityQueue實(shí)現(xiàn)的,即:
可以說(shuō),DelayQueue是一個(gè)使用優(yōu)先隊(duì)列(PriorityQueue)實(shí)現(xiàn)的BlockingQueue,優(yōu)先隊(duì)列的比較基準(zhǔn)值是時(shí)間。這是一個(gè)無(wú)界的BlockingQueue,用于放置實(shí)現(xiàn)了Delayed接口的對(duì)象,其中的對(duì)象只能在其到期時(shí)才能從隊(duì)列中取走。這種隊(duì)列是有序的,即隊(duì)頭對(duì)象的延遲到期時(shí)間最長(zhǎng)。但是要注意的是,不能將null元素放置到這種隊(duì)列中。
Delayed,一種混合風(fēng)格的接口,用來(lái)標(biāo)記那些應(yīng)該在給定延遲時(shí)間之后執(zhí)行的對(duì)象。此接口的實(shí)現(xiàn)類(lèi)必須重寫(xiě)一個(gè) compareTo() 方法,該方法提供與此接口的 getDelay()方法一致的排序。
DelayQueue存儲(chǔ)的對(duì)象是實(shí)現(xiàn)了Delayed接口的對(duì)象,在這個(gè)對(duì)象中,需要重寫(xiě)compareTo()和getDelay()方法,例如:
因此,當(dāng)我們?cè)趍ain()函數(shù)中,向該隊(duì)列加入元素后再取出元素的過(guò)程,就會(huì)存在延時(shí),可以這樣驗(yàn)證:
long?now?=?System.currentTimeMillis(); MyTask?t1?=?new?MyTask(now?+?1000); MyTask?t2?=?new?MyTask(now?+?2000); MyTask?t3?=?new?MyTask(now?+?1500); MyTask?t4?=?new?MyTask(now?+?2500); MyTask?t5?=?new?MyTask(now?+?500); tasks.put(t1); tasks.put(t2); tasks.put(t3); tasks.put(t4); tasks.put(t5); System.out.println(tasks); for(int?i=0;?i<5;?i++)?{ System.out.println(tasks.take()); }注意:為了方便查看到效果,可以重寫(xiě)toString()函數(shù),來(lái)保證打印出來(lái)的結(jié)果有意義:
例如:
DelayQueue可以用在諸如用監(jiān)控線程來(lái)輪詢(xún)是否有超時(shí)任務(wù)出現(xiàn),來(lái)處理某些具有等待時(shí)延的情況,這樣,可以避免由于數(shù)量巨大造成的輪詢(xún)效率差的問(wèn)題。例如:
關(guān)閉空閑連接:服務(wù)器中,有很多客戶端的連接,空閑一段時(shí)間之后需要關(guān)閉他們。
緩存:緩存中的對(duì)象,超過(guò)了空閑時(shí)間,需要從緩存中移出。
任務(wù)超時(shí)處理:在網(wǎng)絡(luò)協(xié)議滑動(dòng)窗口請(qǐng)求應(yīng)答式交互時(shí),處理超時(shí)未響應(yīng)的請(qǐng)求。
實(shí)例:
public?class?T07_DelayQueue?{private?static?BlockingQueue<MyTask>?tasks?=?new?DelayQueue<>();private?static?Random?r?=?new?Random();static?class?MyTask?implements?Delayed{long?runningTime;public?MyTask(long?rt)?{this.runningTime?=?rt;}@Overridepublic?int?compareTo(Delayed?o)?{if?(this.getDelay(TimeUnit.MILLISECONDS)?<?o.getDelay(TimeUnit.MICROSECONDS))?{return?-1;}else?if?(this.getDelay(TimeUnit.MILLISECONDS)?>?o.getDelay(TimeUnit.MILLISECONDS))?{return?1;}else?{return?0;}}@Overridepublic?long?getDelay(TimeUnit?unit)?{return?unit.convert(runningTime?-?System.currentTimeMillis(),?TimeUnit.MILLISECONDS);}@Overridepublic?String?toString()?{return?""?+?runningTime;}public?static?void?main(String[]?args)?throws?InterruptedException?{long?now?=?System.currentTimeMillis();MyTask?t1?=?new?MyTask(now?+?1000);MyTask?t2?=?new?MyTask(now?+?2000);MyTask?t3?=?new?MyTask(now?+?1500);MyTask?t4?=?new?MyTask(now?+?2500);MyTask?t5?=?new?MyTask(now?+?500);tasks.put(t1);tasks.put(t2);tasks.put(t3);tasks.put(t4);tasks.put(t5);System.out.println(tasks);for?(int?i?=?0;?i?<?5;?i++)?{System.out.println(tasks.take());}}} }八、LinkedTransferQueue
TransferQueue是一個(gè)繼承了BlockingQueue的接口,并且增加若干新的方法。LinkedTransferQueue是TransferQueue接口的實(shí)現(xiàn)類(lèi),其定義為一個(gè)無(wú)界的隊(duì)列,具有先進(jìn)先出(FIFO)的特性。
TransferQueue接口含有下面幾個(gè)重要方法:
transfer(E e)
若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程,即立刻移交之;否則,會(huì)插入當(dāng)前元素e到隊(duì)列尾部,并且等待進(jìn)入阻塞狀態(tài),到有消費(fèi)者線程取走該元素。
tryTransfer(E e)
若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程(使用take()或者poll()函數(shù)),使用該方法會(huì)即刻轉(zhuǎn)移/傳輸對(duì)象元素e;若不存在,則返回false,并且不進(jìn)入隊(duì)列。這是一個(gè)不阻塞的操作。
tryTransfer(E e,long timeout,TimeUnit unit)
若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程,會(huì)立即傳輸給它;否則將插入元素e到隊(duì)列尾部,并且等待被消費(fèi)者線程獲取消費(fèi)掉;若在指定的時(shí)間內(nèi)元素e無(wú)法被消費(fèi)者線程獲取,則返回false,同時(shí)該元素被移除。
hasWaitingConsumer()
判斷是否存在消費(fèi)者線程。
getWaitingConsumerCount()
獲取所有等待獲取元素的消費(fèi)線程數(shù)量。
size()
因?yàn)殛?duì)列的異步特性,檢測(cè)當(dāng)前隊(duì)列的元素個(gè)數(shù)需要逐一迭代,無(wú)法保證原子性,可能會(huì)得到一個(gè)不太準(zhǔn)確的結(jié)果,尤其是在遍歷時(shí)有可能隊(duì)列發(fā)生更改。
使用方法:
LinkedTransferQueue<String>?strs?=?new?LinkedTransferQueue<>();//實(shí)例化如果當(dāng)前沒(méi)有消費(fèi)者線程(存在take方法的線程):
strs.transfer("aaa");該方法會(huì)一直阻塞在這里,知道有消費(fèi)者線程存在。
而如果使用傳統(tǒng)的put()方法來(lái)加入元素的話,則不會(huì)發(fā)生阻塞現(xiàn)象。
同樣,獲取隊(duì)列中元素的方法take()也是阻塞在這里等待獲取新的元素的。
九、SynchronousQueue
SynchronousQueue也是一種BlockingQueue,是一種無(wú)緩沖的等待隊(duì)列。所以,在某次添加元素后必須等待其他線程取走后才能繼續(xù)添加;可以認(rèn)為SynchronousQueue是一個(gè)緩存值為0的阻塞隊(duì)列(也可以認(rèn)為是1),它的isEmpty()方法永遠(yuǎn)返回是true,remainingCapacity()方法永遠(yuǎn)返回是0.
remove()和removeAll()方法永遠(yuǎn)返回是false,iterator()方法永遠(yuǎn)返回空,peek()方法永遠(yuǎn)返回null.
在使用put()方法時(shí),會(huì)一直阻塞在這里,等待被消費(fèi):
實(shí)例:
public?class?T09_Synchronized?{public?static?void?main(String[]?args)?throws?InterruptedException?{BlockingQueue<String>?strings?=?new?SynchronousQueue<String>();new?Thread(()->{try?{System.out.println(strings.take());}?catch?(Exception?e)?{e.printStackTrace();}}).start();strings.put("aaa");?//阻塞等待消費(fèi)者消費(fèi)//strings.add("aaa");System.out.println(strings.size());} }參考資料
https://blog.csdn.net/qq_34707744/article/details/79746622 https://blog.csdn.net/wang7807564/article/details/80048576推薦閱讀
1.?解密微信小程序登錄全過(guò)程(ssm實(shí)現(xiàn))
2.?springmvc入門(mén)
3.?servlet就是這么簡(jiǎn)單
4.?重溫javaweb過(guò)濾器filter
附上熱門(mén)QQ群,存放資源和歷史資料,2000容量(低門(mén)檻付費(fèi)群),長(zhǎng)按二維碼入群
總結(jié)
以上是生活随笔為你收集整理的高并发之并发容器,了解多少(从入门到超神)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 解密微信小程序Java登录流程(ssm实
- 下一篇: c++调用python的代码、函数、类