面试官:ConcurrentHashMap 是如何保证线程安全的
點擊“終碼一生”,關注,置頂公眾號
每日技術干貨,第一時間送達!
1、前言
閱讀此篇文章,你需要有以下知識基礎
-
Java內(nèi)存模型,可見性問題
-
CAS
-
HashMap底層原理
我們知道,在日常開發(fā)中使用的HashMap是線程不安全的,而線程安全類HashTable只是簡單的在方法上加鎖實現(xiàn)線程安全,效率低下,所以在線程安全的環(huán)境下我們通常會使用ConcurrentHashMap,但是又為何需要學習ConcurrentHashMap?用不就完事了?我認為學習其源碼有兩個好處:
-
更靈活的運用ConcurrentHashMap
-
欣賞并發(fā)編程大師Doug Lea的作品,源碼中有很多值得我們學習的并發(fā)思想,要意識到,線程安全不僅僅只是加鎖
我拋出以下問題:
-
ConcurrentHashMap是怎么做到線程安全的?
-
get方法如何線程安全地獲取key、value?
-
put方法如何線程安全地設置key、value?
-
size方法如果線程安全地獲取容器容量?
-
底層數(shù)據(jù)結(jié)構(gòu)擴容時如果保證線程安全?
-
初始化數(shù)據(jù)結(jié)構(gòu)時如果保證線程安全?
-
-
ConcurrentHashMap并發(fā)效率是如何提高的?
-
和加鎖相比較,為什么它比HashTable效率高?
-
接下來,帶著問題來繼續(xù)看下去,欣賞并發(fā)大師精妙絕倫的并發(fā)藝術作品(以下討論基于JDK1.8)
2、相關概念
Amdahl定律
此節(jié)定律描述均來自《Java并發(fā)編程實戰(zhàn)》一書
假設F是必須被串行執(zhí)行的部分,N代表處理器數(shù)量,Speedup代表加速比,可以簡單理解為CPU使用率
此公式告訴我們,當N趨近無限大,加速比最大趨近于1/F,假設我們的程序有50%的部分需要串行執(zhí)行,就算處理器數(shù)量無限多,最高的加速比只能是2(20%的使用率),如果程序中僅有10%的部分需要串行執(zhí)行,最高的加速比可以達到9.2(92%的使用率),但我們的程序或多或少都一定會有串行執(zhí)行的部分,所以F不可能為0,所以,就算有無限多的CPU,加速比也不可能達到10(100%的使用率),下面給一張圖來表示串行執(zhí)行部分占比不同對利用率的影響:
由此我們可以看出,程序中的可伸縮性(提升外部資源即可提升并發(fā)性能的比率)是由程序中串行執(zhí)行部分所影響的,而常見的串行執(zhí)行有鎖競爭(上下文切換消耗、等待、串行)等等,這給了我們一個啟發(fā),可以通過減少鎖競爭來優(yōu)化并發(fā)性能,而ConcurrentHashMap則使用了鎖分段(減小鎖范圍)、CAS(樂觀鎖,減小上下文切換開銷,無阻塞)等等技術,下面來具體看看吧
3、初始化數(shù)據(jù)結(jié)構(gòu)時的線程安全
HashMap的底層數(shù)據(jù)結(jié)構(gòu)這里簡單帶過一下,不做過多贅述:
大致是以一個Node對象數(shù)組來存放數(shù)據(jù),Hash沖突時會形成Node鏈表,在鏈表長度超過8,Node數(shù)組超過64時會將鏈表結(jié)構(gòu)轉(zhuǎn)換為紅黑樹,Node對象:
static?class?Node<K,V>?implements?Map.Entry<K,V>?{final?int?hash;final?K?key;volatile?V?val;volatile?Node<K,V>?next;... }值得注意的是,value和next指針使用了volatile來保證其可見性。
在JDK1.8中,初始化ConcurrentHashMap的時候這個Node[]數(shù)組是還未初始化的,會等到第一次put方法調(diào)用時才初始化:
final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{if?(key?==?null?||?value?==?null)?throw?new?NullPointerException();int?hash?=?spread(key.hashCode());int?binCount?=?0;for?(Node<K,V>[]?tab?=?table;;)?{Node<K,V>?f;?int?n,?i,?fh;//判斷Node數(shù)組為空if?(tab?==?null?||?(n?=?tab.length)?==?0)//初始化Node數(shù)組tab?=?initTable();... }此時是會有并發(fā)問題的,如果多個線程同時調(diào)用initTable初始化Node數(shù)組怎么辦?看看大師是如何處理的:
private?final?Node<K,V>[]?initTable()?{Node<K,V>[]?tab;?int?sc;//每次循環(huán)都獲取最新的Node數(shù)組引用while?((tab?=?table)?==?null?||?tab.length?==?0)?{//sizeCtl是一個標記位,若為-1也就是小于0,代表有線程在進行初始化工作了if?((sc?=?sizeCtl)?<?0)//讓出CPU時間片Thread.yield();?//?lost?initialization?race;?just?spin//CAS操作,將本實例的sizeCtl變量設置為-1else?if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?-1))?{//如果CAS操作成功了,代表本線程將負責初始化工作try?{//再檢查一遍數(shù)組是否為空if?((tab?=?table)?==?null?||?tab.length?==?0)?{//在初始化Map時,sizeCtl代表數(shù)組大小,默認16//所以此時n默認為16int?n?=?(sc?>?0)???sc?:?DEFAULT_CAPACITY;@SuppressWarnings("unchecked")//Node數(shù)組Node<K,V>[]?nt?=?(Node<K,V>[])new?Node<?,?>[n];//將其賦值給table變量table?=?tab?=?nt;//通過位運算,n減去n二進制右移2位,相當于乘以0.75//例如16經(jīng)過運算為12,與乘0.75一樣,只不過位運算更快sc?=?n?-?(n?>>>?2);}}?finally?{//將計算后的sc(12)直接賦值給sizeCtl,表示達到12長度就擴容//由于這里只會有一個線程在執(zhí)行,直接賦值即可,沒有線程安全問題//只需要保證可見性sizeCtl?=?sc;}break;}}return?tab; }table變量使用了volatile來保證每次獲取到的都是最新寫入的值:
transient?volatile?Node<K,V>[]?table;總結(jié)
就算有多個線程同時進行put操作,在初始化數(shù)組時使用了樂觀鎖CAS操作來決定到底是哪個線程有資格進行初始化,其他線程均只能等待。
用到的并發(fā)技巧:
-
volatile變量(sizeCtl):它是一個標記位,用來告訴其他線程這個坑位有沒有人在,其線程間的可見性由volatile保證。
-
CAS操作:CAS操作保證了設置sizeCtl標記位的原子性,保證了只有一個線程能設置成功
4、put操作的線程安全
直接看代碼:
final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{if?(key?==?null?||?value?==?null)?throw?new?NullPointerException();//對key的hashCode進行散列int?hash?=?spread(key.hashCode());int?binCount?=?0;//一個無限循環(huán),直到put操作完成后退出循環(huán)for?(Node<K,V>[]?tab?=?table;;)?{Node<K,V>?f;?int?n,?i,?fh;//當Node數(shù)組為空時進行初始化if?(tab?==?null?||?(n?=?tab.length)?==?0)tab?=?initTable();//Unsafe類volatile的方式取出hashCode散列后通過與運算得出的Node數(shù)組下標值對應的Node對象//此時的Node對象若為空,則代表還未有線程對此Node進行插入操作else?if?((f?=?tabAt(tab,?i?=?(n?-?1)?&?hash))?==?null)?{//直接CAS方式插入數(shù)據(jù)if?(casTabAt(tab,?i,?null,new?Node<K,V>(hash,?key,?value,?null)))//插入成功,退出循環(huán)break;???????????????????//?no?lock?when?adding?to?empty?bin}//查看是否在擴容,先不看,擴容再介紹else?if?((fh?=?f.hash)?==?MOVED)//幫助擴容tab?=?helpTransfer(tab,?f);else?{V?oldVal?=?null;//對Node對象進行加鎖synchronized?(f)?{//二次確認此Node對象還是原來的那一個if?(tabAt(tab,?i)?==?f)?{if?(fh?>=?0)?{binCount?=?1;//無限循環(huán),直到完成putfor?(Node<K,V>?e?=?f;;?++binCount)?{K?ek;//和HashMap一樣,先比較hash,再比較equalsif?(e.hash?==?hash?&&((ek?=?e.key)?==?key?||(ek?!=?null?&&?key.equals(ek))))?{oldVal?=?e.val;if?(!onlyIfAbsent)e.val?=?value;break;}Node<K,V>?pred?=?e;if?((e?=?e.next)?==?null)?{//和鏈表頭Node節(jié)點不沖突,就將其初始化為新Node作為上一個Node節(jié)點的next//形成鏈表結(jié)構(gòu)pred.next?=?new?Node<K,V>(hash,?key,value,?null);break;}}}... }值得關注的是tabAt(tab, i)方法,其使用Unsafe類volatile的操作volatile式地查看值,保證每次獲取到的值都是最新的:
static?final?<K,V>?Node<K,V>?tabAt(Node<K,V>[]?tab,?int?i)?{return?(Node<K,V>)U.getObjectVolatile(tab,?((long)i?<<?ASHIFT)?+?ABASE); }雖然上面的table變量加了volatile,但也只能保證其引用的可見性,并不能確保其數(shù)組中的對象是否是最新的,所以需要Unsafe類volatile式地拿到最新的Node。
總結(jié)
由于其減小了鎖的粒度,若Hash完美不沖突的情況下,可同時支持n個線程同時put操作,n為Node數(shù)組大小,在默認大小16下,可以支持最大同時16個線程無競爭同時操作且線程安全。當hash沖突嚴重時,Node鏈表越來越長,將導致嚴重的鎖競爭,此時會進行擴容,將Node進行再散列,下面會介紹擴容的線程安全性??偨Y(jié)一下用到的并發(fā)技巧:
-
減小鎖粒度:將Node鏈表的頭節(jié)點作為鎖,若在默認大小16情況下,將有16把鎖,大大減小了鎖競爭(上下文切換),就像開頭所說,將串行的部分最大化縮小,在理想情況下線程的put操作都為并行操作。同時直接鎖住頭節(jié)點,保證了線程安全
-
Unsafe的getObjectVolatile方法:此方法確保獲取到的值為最新。
5、擴容操作的線程安全
在擴容時,ConcurrentHashMap支持多線程并發(fā)擴容,在擴容過程中同時支持get查數(shù)據(jù),若有線程put數(shù)據(jù),還會幫助一起擴容,這種無阻塞算法,將并行最大化的設計,堪稱一絕。
先來看看擴容代碼實現(xiàn):
private?final?void?transfer(Node<K,V>[]?tab,?Node<K,V>[]?nextTab)?{int?n?=?tab.length,?stride;//根據(jù)機器CPU核心數(shù)來計算,一條線程負責Node數(shù)組中多長的遷移量if?((stride?=?(NCPU?>?1)???(n?>>>?3)?/?NCPU?:?n)?<?MIN_TRANSFER_STRIDE)//本線程分到的遷移量//假設為16(默認也為16)stride?=?MIN_TRANSFER_STRIDE;?//?subdivide?range//nextTab若為空代表線程是第一個進行遷移的//初始化遷移后的新Node數(shù)組if?(nextTab?==?null)?{????????????//?initiatingtry?{@SuppressWarnings("unchecked")//這里n為舊數(shù)組長度,左移一位相當于乘以2//例如原數(shù)組長度16,新數(shù)組長度則為32Node<K,V>[]?nt?=?(Node<K,V>[])new?Node<?,?>[n?<<?1];nextTab?=?nt;}?catch?(Throwable?ex)?{??????//?try?to?cope?with?OOMEsizeCtl?=?Integer.MAX_VALUE;return;}//設置nextTable變量為新數(shù)組nextTable?=?nextTab;//假設為16transferIndex?=?n;}//假設為32int?nextn?=?nextTab.length;//標示Node對象,此對象的hash變量為-1//在get或者put時若遇到此Node,則可以知道當前Node正在遷移//傳入nextTab對象ForwardingNode<K,V>?fwd?=?new?ForwardingNode<K,V>(nextTab);boolean?advance?=?true;boolean?finishing?=?false;?//?to?ensure?sweep?before?committing?nextTabfor?(int?i?=?0,?bound?=?0;;)?{Node<K,V>?f;?int?fh;while?(advance)?{int?nextIndex,?nextBound;//i為當前正在處理的Node數(shù)組下標,每次處理一個Node節(jié)點就會自減1if?(--i?>=?bound?||?finishing)advance?=?false;//假設nextIndex=16else?if?((nextIndex?=?transferIndex)?<=?0)?{i?=?-1;advance?=?false;}//由以上假設,nextBound就為0//且將nextIndex設置為0else?if?(U.compareAndSwapInt(this,?TRANSFERINDEX,?nextIndex,nextBound?=?(nextIndex?>?stride??nextIndex?-?stride?:?0)))?{//bound=0bound?=?nextBound;//i=16-1=15i?=?nextIndex?-?1;advance?=?false;}}if?(i?<?0?||?i?>=?n?||?i?+?n?>=?nextn)?{int?sc;if?(finishing)?{nextTable?=?null;table?=?nextTab;sizeCtl?=?(n?<<?1)?-?(n?>>>?1);return;}if?(U.compareAndSwapInt(this,?SIZECTL,?sc?=?sizeCtl,?sc?-?1))?{if?((sc?-?2)?!=?resizeStamp(n)?<<?RESIZE_STAMP_SHIFT)return;finishing?=?advance?=?true;i?=?n;?//?recheck?before?commit}}//此時i=15,取出Node數(shù)組下標為15的那個Node,若為空則不需要遷移//直接設置占位標示,代表此Node已處理完成else?if?((f?=?tabAt(tab,?i))?==?null)advance?=?casTabAt(tab,?i,?null,?fwd);//檢測此Node的hash是否為MOVED,MOVED是一個常量-1,也就是上面說的占位Node的hash//如果是占位Node,證明此節(jié)點已經(jīng)處理過了,跳過i=15的處理,繼續(xù)循環(huán)else?if?((fh?=?f.hash)?==?MOVED)advance?=?true;?//?already?processedelse?{//鎖住這個Nodesynchronized?(f)?{//確認Node是原先的Nodeif?(tabAt(tab,?i)?==?f)?{//ln為lowNode,低位Node,hn為highNode,高位Node//這兩個概念下面以圖來說明Node<K,V>?ln,?hn;if?(fh?>=?0)?{//此時fh與原來Node數(shù)組長度進行與運算//如果高X位為0,此時runBit=0//如果高X位為1,此時runBit=1int?runBit?=?fh?&?n;Node<K,V>?lastRun?=?f;for?(Node<K,V>?p?=?f.next;?p?!=?null;?p?=?p.next)?{//這里的Node,都是同一Node鏈表中的Node對象int?b?=?p.hash?&?n;if?(b?!=?runBit)?{runBit?=?b;lastRun?=?p;}}//正如上面所說,runBit=0,表示此Node為低位Nodeif?(runBit?==?0)?{ln?=?lastRun;hn?=?null;}else?{//Node為高位Nodehn?=?lastRun;ln?=?null;}for?(Node<K,V>?p?=?f;?p?!=?lastRun;?p?=?p.next)?{int?ph?=?p.hash;?K?pk?=?p.key;?V?pv?=?p.val;//若hash和n與運算為0,證明為低位Node,原理同上if?((ph?&?n)?==?0)ln?=?new?Node<K,V>(ph,?pk,?pv,?ln);//這里將高位Node與地位Node都各自組成了兩個鏈表elsehn?=?new?Node<K,V>(ph,?pk,?pv,?hn);}//將低位Node設置到新Node數(shù)組中,下標為原來的位置setTabAt(nextTab,?i,?ln);//將高位Node設置到新Node數(shù)組中,下標為原來的位置加上原Node數(shù)組長度setTabAt(nextTab,?i?+?n,?hn);//將此Node設置為占位Node,代表處理完成setTabAt(tab,?i,?fwd);//繼續(xù)循環(huán)advance?=?true;}....}}}} }這里說一下遷移時為什么要分一個ln(低位Node)、hn(高位Node),首先說一個現(xiàn)象:
我們知道,在put值的時候,首先會計算hash值,再散列到指定的Node數(shù)組下標中:
//根據(jù)key的hashCode再散列 int?hash?=?spread(key.hashCode()); //使用(n?-?1)?&?hash?運算,定位Node數(shù)組中下標值 (f?=?tabAt(tab,?i?=?(n?-?1)?&?hash);其中n為Node數(shù)組長度,這里假設為16。
假設有一個key進來,它的散列之后的hash=9,那么它的下標值是多少呢?
-
(16 - 1)和 9 進行與運算 -> 0000 1111 和 0000 1001 結(jié)果還是 0000 1001 = 9
假設Node數(shù)組需要擴容,我們知道,擴容是將數(shù)組長度增加兩倍,也就是32,那么下標值會是多少呢?
-
(32 - 1)和 9 進行與運算 -> 0001 1111 和 0000 1001 結(jié)果還是9
此時,我們把散列之后的hash換成20,那么會有怎樣的變化呢?
-
(16 - 1)和 20 進行與運算 -> 0000 1111 和 0001 0100 結(jié)果是 0000 0100 = 4
-
(32 - 1)和 20 進行與運算 -> 0001 1111 和 0001 0100 結(jié)果是 0001 0100 = 20
此時細心的讀者應該可以發(fā)現(xiàn),如果hash在高X位為1,(X為數(shù)組長度的二進制-1的最高位),則擴容時是需要變換在Node數(shù)組中的索引值的,不然就hash不到,丟失數(shù)據(jù),所以這里在遷移的時候?qū)⒏遆位為1的Node分類為hn,將高X位為0的Node分類為ln。
回到代碼中:
for?(Node<K,V>?p?=?f;?p?!=?lastRun;?p?=?p.next)?{int?ph?=?p.hash;?K?pk?=?p.key;?V?pv?=?p.val;if?((ph?&?n)?==?0)ln?=?new?Node<K,V>(ph,?pk,?pv,?ln);elsehn?=?new?Node<K,V>(ph,?pk,?pv,?hn); }這個操作將高低位組成了兩條鏈表結(jié)構(gòu),由下圖所示:
然后將其CAS操作放入新的Node數(shù)組中:
setTabAt(nextTab,?i,?ln); setTabAt(nextTab,?i?+?n,?hn);其中,低位鏈表放入原下標處,而高位鏈表則需要加上原Node數(shù)組長度,其中為什么不多贅述,上面已經(jīng)舉例說明了,這樣就可以保證高位Node在遷移到新Node數(shù)組中依然可以使用hash算法散列到對應下標的數(shù)組中去了。
最后將原Node數(shù)組中對應下標Node對象設置為fwd標記Node,表示該節(jié)點遷移完成,到這里,一個節(jié)點的遷移就完成了,將進行下一個節(jié)點的遷移,也就是i-1=14下標的Node節(jié)點。
擴容時的get操作:
假設Node下標為16的Node節(jié)點正在遷移,突然有一個線程進來調(diào)用get方法,正好key又散列到下標為16的節(jié)點,此時怎么辦?
public?V?get(Object?key)?{Node<K,V>[]?tab;?Node<K,V>?e,?p;?int?n,?eh;?K?ek;int?h?=?spread(key.hashCode());if?((tab?=?table)?!=?null?&&?(n?=?tab.length)?>?0?&&(e?=?tabAt(tab,?(n?-?1)?&?h))?!=?null)?{if?((eh?=?e.hash)?==?h)?{if?((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek)))return?e.val;}//假如Node節(jié)點的hash值小于0//則有可能是fwd節(jié)點else?if?(eh?<?0)//調(diào)用節(jié)點對象的find方法查找值return?(p?=?e.find(h,?key))?!=?null???p.val?:?null;while?((e?=?e.next)?!=?null)?{if?(e.hash?==?h?&&((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek))))return?e.val;}}return?null; }重點看有注釋的那兩行,在get操作的源碼中,會判斷Node中的hash是否小于0,是否還記得我們的占位Node,其hash為MOVED,為常量值-1,所以此時判斷線程正在遷移,委托給fwd占位Node去查找值:
//內(nèi)部類?ForwardingNode中 Node<K,V>?find(int?h,?Object?k)?{//?loop?to?avoid?arbitrarily?deep?recursion?on?forwarding?nodes//?這里的查找,是去新Node數(shù)組中查找的//?下面的查找過程與HashMap查找無異,不多贅述outer:?for?(Node<K,V>[]?tab?=?nextTable;;)?{Node<K,V>?e;?int?n;if?(k?==?null?||?tab?==?null?||?(n?=?tab.length)?==?0?||(e?=?tabAt(tab,?(n?-?1)?&?h))?==?null)return?null;for?(;;)?{int?eh;?K?ek;if?((eh?=?e.hash)?==?h?&&((ek?=?e.key)?==?k?||?(ek?!=?null?&&?k.equals(ek))))return?e;if?(eh?<?0)?{if?(e?instanceof?ForwardingNode)?{tab?=?((ForwardingNode<K,V>)e).nextTable;continue?outer;}elsereturn?e.find(h,?k);}if?((e?=?e.next)?==?null)return?null;}} }到這里應該可以恍然大悟了,之所以占位Node需要保存新Node數(shù)組的引用也是因為這個,它可以支持在遷移的過程中照樣不阻塞地查找值,可謂是精妙絕倫的設計。
多線程協(xié)助擴容
在put操作時,假設正在遷移,正好有一個線程進來,想要put值到遷移的Node上,怎么辦?
final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{if?(key?==?null?||?value?==?null)?throw?new?NullPointerException();int?hash?=?spread(key.hashCode());int?binCount?=?0;for?(Node<K,V>[]?tab?=?table;;)?{Node<K,V>?f;?int?n,?i,?fh;if?(tab?==?null?||?(n?=?tab.length)?==?0)tab?=?initTable();else?if?((f?=?tabAt(tab,?i?=?(n?-?1)?&?hash))?==?null)?{if?(casTabAt(tab,?i,?null,new?Node<K,V>(hash,?key,?value,?null)))break;???????????????????//?no?lock?when?adding?to?empty?bin}//若此時發(fā)現(xiàn)了占位Node,證明此時HashMap正在遷移else?if?((fh?=?f.hash)?==?MOVED)//進行協(xié)助遷移tab?=?helpTransfer(tab,?f);... } final?Node<K,V>[]?helpTransfer(Node<K,V>[]?tab,?Node<K,V>?f)?{Node<K,V>[]?nextTab;?int?sc;if?(tab?!=?null?&&?(f?instanceof?ForwardingNode)?&&(nextTab?=?((ForwardingNode<K,V>)f).nextTable)?!=?null)?{int?rs?=?resizeStamp(tab.length);while?(nextTab?==?nextTable?&&?table?==?tab?&&(sc?=?sizeCtl)?<?0)?{if?((sc?>>>?RESIZE_STAMP_SHIFT)?!=?rs?||?sc?==?rs?+?1?||sc?==?rs?+?MAX_RESIZERS?||?transferIndex?<=?0)break;//sizeCtl加一,標示多一個線程進來協(xié)助擴容if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?sc?+?1))?{//擴容transfer(tab,?nextTab);break;}}return?nextTab;}return?table; }此方法涉及大量復雜的位運算,這里不多贅述,只是簡單的說幾句,此時sizeCtl變量用來標示HashMap正在擴容,當其準備擴容時,會將sizeCtl設置為一個負數(shù),(例如數(shù)組長度為16時)其二進制表示為:
1000?0000?0001?1011?0000?0000?0000?0010無符號位為1,表示負數(shù)。其中高16位代表數(shù)組長度的一個位算法標示(有點像epoch的作用,表示當前遷移朝代為數(shù)組長度X),低16位表示有幾個線程正在做遷移,剛開始為2,接下來自增1,線程遷移完會進行減1操作,也就是如果低十六位為2,代表有一個線程正在遷移,如果為3,代表2個線程正在遷移以此類推…
只要數(shù)組長度足夠長,就可以同時容納足夠多的線程來一起擴容,最大化并行任務,提高性能。
在什么情況下會進行擴容操作?
-
在put值時,發(fā)現(xiàn)Node為占位Node(fwd)時,會協(xié)助擴容。
-
在新增節(jié)點后,檢測到鏈表長度大于8時。
treeifyBin方法會將鏈表轉(zhuǎn)換為紅黑樹,增加查找效率,但在這之前,會檢查數(shù)組長度,若小于64,則會優(yōu)先做擴容操作:
private?final?void?treeifyBin(Node<K,V>[]?tab,?int?index)?{Node<K,V>?b;?int?n,?sc;if?(tab?!=?null)?{//MIN_TREEIFY_CAPACITY=64//若數(shù)組長度小于64,則先擴容if?((n?=?tab.length)?<?MIN_TREEIFY_CAPACITY)//擴容tryPresize(n?<<?1);else?if?((b?=?tabAt(tab,?index))?!=?null?&&?b.hash?>=?0)?{synchronized?(b)?{//...轉(zhuǎn)換為紅黑樹的操作}}} }在每次新增節(jié)點之后,都會調(diào)用addCount方法,檢測Node數(shù)組大小是否達到閾值:
final?V?putVal(K?key,?V?value,?boolean?onlyIfAbsent)?{...//在下面一節(jié)會講到,此方法統(tǒng)計容器元素數(shù)量addCount(1L,?binCount);return?null; } private?final?void?addCount(long?x,?int?check)?{CounterCell[]?as;?long?b,?s;if?((as?=?counterCells)?!=?null?||!U.compareAndSwapLong(this,?BASECOUNT,?b?=?baseCount,?s?=?b?+?x))?{//統(tǒng)計元素個數(shù)的操作...}if?(check?>=?0)?{Node<K,V>[]?tab,?nt;?int?n,?sc;//元素個數(shù)達到閾值,進行擴容while?(s?>=?(long)(sc?=?sizeCtl)?&&?(tab?=?table)?!=?null?&&(n?=?tab.length)?<?MAXIMUM_CAPACITY)?{int?rs?=?resizeStamp(n);//發(fā)現(xiàn)sizeCtl為負數(shù),證明有線程正在遷移if?(sc?<?0)?{if?((sc?>>>?RESIZE_STAMP_SHIFT)?!=?rs?||?sc?==?rs?+?1?||sc?==?rs?+?MAX_RESIZERS?||?(nt?=?nextTable)?==?null?||transferIndex?<=?0)break;if?(U.compareAndSwapInt(this,?SIZECTL,?sc,?sc?+?1))transfer(tab,?nt);}//不為負數(shù),則為第一個遷移的線程else?if?(U.compareAndSwapInt(this,?SIZECTL,?sc,(rs?<<?RESIZE_STAMP_SHIFT)?+?2))transfer(tab,?null);s?=?sumCount();}} }總結(jié)
ConcurrentHashMap運用各類CAS操作,將擴容操作的并發(fā)性能實現(xiàn)最大化,在擴容過程中,就算有線程調(diào)用get查詢方法,也可以安全的查詢數(shù)據(jù),若有線程進行put操作,還會協(xié)助擴容,利用sizeCtl標記位和各種volatile變量進行CAS操作達到多線程之間的通信、協(xié)助,在遷移過程中只鎖一個Node節(jié)點,即保證了線程安全,又提高了并發(fā)性能。
6、統(tǒng)計容器大小的線程安全
ConcurrentHashMap在每次put操作之后都會調(diào)用addCount方法,此方法用于統(tǒng)計容器大小且檢測容器大小是否達到閾值,若達到閾值需要進行擴容操作,這在上面也是有提到的。這一節(jié)重點討論容器大小的統(tǒng)計是如何做到線程安全且并發(fā)性能不低的。
大部分的單機數(shù)據(jù)查詢優(yōu)化方案都會降低并發(fā)性能,就像緩存的存儲,在多線程環(huán)境下將有并發(fā)問題,所以會產(chǎn)生并行或者一系列并發(fā)沖突鎖競爭的問題,降低了并發(fā)性能。類似的,熱點數(shù)據(jù)也有這樣的問題,在多線程并發(fā)的過程中,熱點數(shù)據(jù)(頻繁被訪問的變量)是在每一個線程中幾乎或多或少都會訪問到的數(shù)據(jù),這將增加程序中的串行部分,回憶一下開頭所描述的,程序中的串行部分將影響并發(fā)的可伸縮性,使并發(fā)性能下降,這通常會成為并發(fā)程序性能的瓶頸。
而在ConcurrentHashMap中,如何快速的統(tǒng)計容器大小更是一個很重要的議題,因為容器內(nèi)部需要依靠容器大小來考慮是否需要擴容,而在客戶端而言需要調(diào)用此方法來知道容器有多少個元素,如果處理不好這種熱點數(shù)據(jù),并發(fā)性能將因為這個短板整體性能下降。
試想一下,如果是你,你會如何設計這種熱點數(shù)據(jù)?是加鎖,還是進行CAS操作?進入ConcurrentHashMap中,看看大師是如何巧妙的運用了并發(fā)技巧,提高熱點數(shù)據(jù)的并發(fā)性能。
先用圖的方式來看看大致的實現(xiàn)思路:
@sun.misc.Contended?static?final?class?CounterCell?{volatile?long?value;CounterCell(long?x)?{?value?=?x;?} }這是一個粗略的實現(xiàn),在設計中,使用了分而治之的思想,將每一個計數(shù)都分散到各個countCell對象里面(下面稱之為桶),使競爭最小化,又使用了CAS操作,就算有競爭,也可以對失敗了的線程進行其他的處理。樂觀鎖的實現(xiàn)方式與悲觀鎖不同之處就在于樂觀鎖可以對競爭失敗了的線程進行其他策略的處理,而悲觀鎖只能等待鎖釋放,所以這里使用CAS操作對競爭失敗的線程做了其他處理,很巧妙的運用了CAS樂觀鎖。
下面看看具體的代碼實現(xiàn)吧:
//計數(shù),并檢查長度是否達到閾值 private?final?void?addCount(long?x,?int?check)?{//計數(shù)桶CounterCell[]?as;?long?b,?s;//如果counterCells不為null,則代表已經(jīng)初始化了,直接進入if語句塊//若競爭不嚴重,counterCells有可能還未初始化,為null,先嘗試CAS操作遞增baseCount值if?((as?=?counterCells)?!=?null?||!U.compareAndSwapLong(this,?BASECOUNT,?b?=?baseCount,?s?=?b?+?x))?{//進入此語句塊有兩種可能//1.counterCells被初始化完成了,不為null//2.CAS操作遞增baseCount值失敗了,說明有競爭CounterCell?a;?long?v;?int?m;//標志是否存在競爭boolean?uncontended?=?true;//1.先判斷計數(shù)桶是否還沒初始化,則as=null,進入語句塊//2.判斷計數(shù)桶長度是否為空或,若是進入語句塊//3.這里做了一個線程變量隨機數(shù),與上桶大小-1,若桶的這個位置為空,進入語句塊//4.到這里說明桶已經(jīng)初始化了,且隨機的這個位置不為空,嘗試CAS操作使桶加1,失敗進入語句塊if?(as?==?null?||?(m?=?as.length?-?1)?<?0?||(a?=?as[ThreadLocalRandom.getProbe()?&?m])?==?null?||!(uncontended?=U.compareAndSwapLong(a,?CELLVALUE,?v?=?a.value,?v?+?x)))?{fullAddCount(x,?uncontended);return;}if?(check?<=?1)return;//統(tǒng)計容器大小s?=?sumCount();}... }假設當前線程為第一個put的線程
先假設當前Map還未被put數(shù)據(jù),則addCount一定沒有被調(diào)用過,當前線程第一個調(diào)用addCount方法,則此時countCell一定沒有被初始化,為null,則進行如下判斷:
if?((as?=?counterCells)?!=?null?||!U.compareAndSwapLong(this,?BASECOUNT,?b?=?baseCount,?s?=?b?+?x))?這里的if判斷一定會走第二個判斷,先CAS增加變量baseCount的值:
private?transient?volatile?long?baseCount;這個值有什么用呢?我們看看統(tǒng)計容器大小的方法sumCount:
final?long?sumCount()?{//獲取計數(shù)桶CounterCell[]?as?=?counterCells;?CounterCell?a;//獲取baseCount,賦值給sum總數(shù)long?sum?=?baseCount;//若計數(shù)桶不為空,統(tǒng)計計數(shù)桶內(nèi)的值if?(as?!=?null)?{for?(int?i?=?0;?i?<?as.length;?++i)?{//遍歷計數(shù)桶,將value值相加if?((a?=?as[i])?!=?null)sum?+=?a.value;}}return?sum; }這個方法的大體思路與我們開頭那張圖差不多,容器的大小其實是分為兩部分,開頭只說了計數(shù)桶的那部分,其實還有一個baseCount,在線程沒有競爭的情況下的統(tǒng)計值,換句話說,在增加容量的時候其實是先去做CAS遞增baseCount的。
由此可見,統(tǒng)計容器大小其實是用了兩種思路:
-
CAS方式直接遞增:在線程競爭不大的時候,直接使用CAS操作遞增baseCount值即可,這里說的競爭不大指的是CAS操作不會失敗的情況
-
分而治之桶計數(shù):若出現(xiàn)了CAS操作失敗的情況,則證明此時有線程競爭了,計數(shù)方式從CAS方式轉(zhuǎn)變?yōu)榉侄沃耐坝嫈?shù)方式
出現(xiàn)了線程競爭導致CAS失敗
此時出現(xiàn)了競爭,則不會再用CAS方式來計數(shù)了,直接使用桶方式,從上面的addCount方法可以看出來,此時的countCell是為空的,最終一定會進入fullAddCount方法來進行初始化桶:
???private?final?void?fullAddCount(long?x,?boolean?wasUncontended)?{int?h;if?((h?=?ThreadLocalRandom.getProbe())?==?0)?{ThreadLocalRandom.localInit();??????//?force?initializationh?=?ThreadLocalRandom.getProbe();wasUncontended?=?true;}boolean?collide?=?false;????????????????//?True?if?last?slot?nonemptyfor?(;;)?{CounterCell[]?as;?CounterCell?a;?int?n;?long?v;...//如果計數(shù)桶!=null,證明已經(jīng)初始化,此時不走此語句塊if?((as?=?counterCells)?!=?null?&&?(n?=?as.length)?>?0)?{...}//進入此語句塊進行計數(shù)桶的初始化//CAS設置cellsBusy=1,表示現(xiàn)在計數(shù)桶Busy中...else?if?(cellsBusy?==?0?&&?counterCells?==?as?&&U.compareAndSwapInt(this,?CELLSBUSY,?0,?1))?{//若有線程同時初始化計數(shù)桶,由于CAS操作只有一個線程進入這里boolean?init?=?false;try?{???????????????????????????//?Initialize?table//再次確認計數(shù)桶為空if?(counterCells?==?as)?{//初始化一個長度為2的計數(shù)桶CounterCell[]?rs?=?new?CounterCell[2];//h為一個隨機數(shù),與上1則代表結(jié)果為0、1中隨機的一個//也就是在0、1下標中隨便選一個計數(shù)桶,x=1,放入1的值代表增加1個容量rs[h?&?1]?=?new?CounterCell(x);//將初始化好的計數(shù)桶賦值給ConcurrentHashMapcounterCells?=?rs;init?=?true;}}?finally?{//最后將busy標識設置為0,表示不busy了cellsBusy?=?0;}if?(init)break;}//若有線程同時來初始化計數(shù)桶,則沒有搶到busy資格的線程就先來CAS遞增baseCountelse?if?(U.compareAndSwapLong(this,?BASECOUNT,?v?=?baseCount,?v?+?x))break;??????????????????????????//?Fall?back?on?using?base}}到這里就完成了計數(shù)桶的初始化工作,在之后的計數(shù)都將會使用計數(shù)桶方式來統(tǒng)計總數(shù)。
計數(shù)桶擴容
從上面的分析中我們知道,計數(shù)桶初始化之后長度為2,在競爭大的時候肯定是不夠用的,所以一定有計數(shù)桶的擴容操作,所以現(xiàn)在就有兩個問題了:
-
什么條件下會進行計數(shù)桶的擴容?
-
擴容操作是怎么樣的?
假設此時是用計數(shù)桶方式進行計數(shù):
private?final?void?addCount(long?x,?int?check)?{CounterCell[]?as;?long?b,?s;if?((as?=?counterCells)?!=?null?||!U.compareAndSwapLong(this,?BASECOUNT,?b?=?baseCount,?s?=?b?+?x))?{CounterCell?a;?long?v;?int?m;boolean?uncontended?=?true;//此時顯然會在計數(shù)桶數(shù)組中隨機選一個計數(shù)桶//然后使用CAS操作將此計數(shù)桶中的value+1if?(as?==?null?||?(m?=?as.length?-?1)?<?0?||(a?=?as[ThreadLocalRandom.getProbe()?&?m])?==?null?||!(uncontended?=U.compareAndSwapLong(a,?CELLVALUE,?v?=?a.value,?v?+?x)))?{//若CAS操作失敗,證明有競爭,進入fullAddCount方法fullAddCount(x,?uncontended);return;}if?(check?<=?1)return;s?=?sumCount();}... }進入fullAddCount方法:
private?final?void?fullAddCount(long?x,?boolean?wasUncontended)?{int?h;if?((h?=?ThreadLocalRandom.getProbe())?==?0)?{ThreadLocalRandom.localInit();??????//?force?initializationh?=?ThreadLocalRandom.getProbe();wasUncontended?=?true;}boolean?collide?=?false;????????????????//?True?if?last?slot?nonemptyfor?(;;)?{CounterCell[]?as;?CounterCell?a;?int?n;?long?v;//計數(shù)桶初始化好了,一定是走這個if語句塊if?((as?=?counterCells)?!=?null?&&?(n?=?as.length)?>?0)?{//從計數(shù)桶數(shù)組隨機選一個計數(shù)桶,若為null表示該桶位還沒線程遞增過if?((a?=?as[(n?-?1)?&?h])?==?null)?{//查看計數(shù)桶busy狀態(tài)是否被標識if?(cellsBusy?==?0)?{????????????//?Try?to?attach?new?Cell//若不busy,直接new一個計數(shù)桶CounterCell?r?=?new?CounterCell(x);?//?Optimistic?create//CAS操作,標示計數(shù)桶busy中if?(cellsBusy?==?0?&&U.compareAndSwapInt(this,?CELLSBUSY,?0,?1))?{boolean?created?=?false;try?{???????????????//?Recheck?under?lockCounterCell[]?rs;?int?m,?j;//在鎖下再檢查一次計數(shù)桶為nullif?((rs?=?counterCells)?!=?null?&&(m?=?rs.length)?>?0?&&rs[j?=?(m?-?1)?&?h]?==?null)?{//將剛剛創(chuàng)建的計數(shù)桶賦值給對應位置rs[j]?=?r;created?=?true;}}?finally?{//標示不busy了cellsBusy?=?0;}if?(created)break;continue;???????????//?Slot?is?now?non-empty}}collide?=?false;}else?if?(!wasUncontended)???????//?CAS?already?known?to?failwasUncontended?=?true;??????//?Continue?after?rehash//走到這里代表計數(shù)桶不為null,嘗試遞增計數(shù)桶else?if?(U.compareAndSwapLong(a,?CELLVALUE,?v?=?a.value,?v?+?x))break;else?if?(counterCells?!=?as?||?n?>=?NCPU)collide?=?false;????????????//?At?max?size?or?stale//若CAS操作失敗了,到了這里,會先進入一次,然后再走一次剛剛的for循環(huán)//若是第二次for循環(huán),collide=true,則不會走進去else?if?(!collide)collide?=?true;//計數(shù)桶擴容,一個線程若走了兩次for循環(huán),也就是進行了多次CAS操作遞增計數(shù)桶失敗了//則進行計數(shù)桶擴容,CAS標示計數(shù)桶busy中else?if?(cellsBusy?==?0?&&U.compareAndSwapInt(this,?CELLSBUSY,?0,?1))?{try?{//確認計數(shù)桶還是同一個if?(counterCells?==?as)?{//?Expand?table?unless?stale//將長度擴大到2倍CounterCell[]?rs?=?new?CounterCell[n?<<?1];//遍歷舊計數(shù)桶,將引用直接搬過來for?(int?i?=?0;?i?<?n;?++i)rs[i]?=?as[i];//賦值counterCells?=?rs;}}?finally?{//取消busy狀態(tài)cellsBusy?=?0;}collide?=?false;continue;???????????????????//?Retry?with?expanded?table}h?=?ThreadLocalRandom.advanceProbe(h);}else?if?(cellsBusy?==?0?&&?counterCells?==?as?&&U.compareAndSwapInt(this,?CELLSBUSY,?0,?1))?{//初始化計數(shù)桶...}else?if?(U.compareAndSwapLong(this,?BASECOUNT,?v?=?baseCount,?v?+?x))break;??????????????????????????//?Fall?back?on?using?base} }看到這里,想必已經(jīng)可以解決上面兩個問題了:
-
什么條件下會進行計數(shù)桶的擴容?
答:在CAS操作遞增計數(shù)桶失敗了3次之后,會進行擴容計數(shù)桶操作,注意此時同時進行了兩次隨機定位計數(shù)桶來進行CAS遞增的,所以此時可以保證大概率是因為計數(shù)桶不夠用了,才會進行計數(shù)桶擴容
-
擴容操作是怎么樣的?
答:計數(shù)桶長度增加到兩倍長度,數(shù)據(jù)直接遍歷遷移過來,由于計數(shù)桶不像HashMap數(shù)據(jù)結(jié)構(gòu)那么復雜,有hash算法的影響,加上計數(shù)桶只是存放一個long類型的計數(shù)值而已,所以直接賦值引用即可。
總結(jié)
個人感覺,統(tǒng)計容器大小的線程安全與擴容ConcurrentHashMap這兩個算得上ConcurrentHashMap中最聰明的兩個并發(fā)設計了,閱讀此源碼的我看到這兩個操作的設計,都忍不住拍手叫絕,我想,這或許也是一個看源碼的樂趣吧,站在巨人的肩膀看巨人的思想。
總結(jié)一下計數(shù)中用到的并發(fā)技巧:
-
利用CAS遞增baseCount值來感知是否存在線程競爭,若競爭不大直接CAS遞增baseCount值即可,性能與直接baseCount++差別不大。
-
若存在線程競爭,則初始化計數(shù)桶,若此時初始化計數(shù)桶的過程中也存在競爭,多個線程同時初始化計數(shù)桶,則沒有搶到初始化資格的線程直接嘗試CAS遞增baseCount值的方式完成計數(shù),最大化利用了線程的并行。此時使用計數(shù)桶計數(shù),分而治之的方式來計數(shù),此時兩個計數(shù)桶最大可提供兩個線程同時計數(shù),同時使用CAS操作來感知線程競爭,若兩個桶情況下CAS操作還是頻繁失敗(失敗3次),則直接擴容計數(shù)桶,變?yōu)?個計數(shù)桶,支持最大同時4個線程并發(fā)計數(shù),以此類推…同時使用位運算和隨機數(shù)的方式"負載均衡"一樣的將線程計數(shù)請求接近均勻的落在各個計數(shù)桶中。
7、get操作的線程安全
對于get操作,其實沒有線程安全的問題,只有可見性的問題,只需要確保get的數(shù)據(jù)是線程之間可見的即可:
public?V?get(Object?key)?{Node<K,V>[]?tab;?Node<K,V>?e,?p;?int?n,?eh;?K?ek;int?h?=?spread(key.hashCode());//此過程與HashMap的get操作無異,不多贅述if?((tab?=?table)?!=?null?&&?(n?=?tab.length)?>?0?&&(e?=?tabAt(tab,?(n?-?1)?&?h))?!=?null)?{if?((eh?=?e.hash)?==?h)?{if?((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek)))return?e.val;}//當hash<0,有可能是在遷移,使用fwd占位Node去查找新table中的數(shù)據(jù)else?if?(eh?<?0)return?(p?=?e.find(h,?key))?!=?null???p.val?:?null;while?((e?=?e.next)?!=?null)?{if?(e.hash?==?h?&&((ek?=?e.key)?==?key?||?(ek?!=?null?&&?key.equals(ek))))return?e.val;}}return?null; }在get操作中除了增加了遷移的判斷以外,基本與HashMap的get操作無異,這里不多贅述,值得一提的是這里使用了tabAt方法Unsafe類volatile的方式去獲取Node數(shù)組中的Node,保證獲得到的Node是最新的。
static?final?<K,V>?Node<K,V>?tabAt(Node<K,V>[]?tab,?int?i)?{return?(Node<K,V>)U.getObjectVolatile(tab,?((long)i?<<?ASHIFT)?+?ABASE); }8
JDK1.7與1.8的不同實現(xiàn)
JDK1.7的ConcurrentHashMap底層數(shù)據(jù)結(jié)構(gòu):
其中1.7的實現(xiàn)也同樣采用了分段鎖的技術,只不過多個一個segment,一個segment里對應一個小HashMap,其中segment繼承了ReentrantLock,充當了鎖的角色,一把鎖鎖一個小HashMap(相當于多個Node),從1.8的實現(xiàn)來看, 鎖的粒度從多個Node級別又減小到一個Node級別,再度減小鎖競爭,減小程序同步的部分。
9、總結(jié)
不得不說,大師將CAS操作運用的淋漓盡致,相信理解了以上源碼的讀者也可以學習到大師所運用的并發(fā)技巧,不僅僅是在ConcurrentHashMap中,其實在大部分JUC的源碼中很多并發(fā)思想很值得我們?nèi)ラ喿x、學習。
PS:防止找不到本篇文章,可以收藏點贊,方便翻閱查找哦
總結(jié)
以上是生活随笔為你收集整理的面试官:ConcurrentHashMap 是如何保证线程安全的的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: vue使用外部字体自定义LCD字体(晶管
- 下一篇: 西门子S-1200PLC开发笔记(一、选