Java中ConcurrentHashMap底层实现原理(JDK1.8)源码分析2
https://blog.csdn.net/programmer_at/article/details/79715177
https://blog.csdn.net/qq_41737716/category_8894817.html
空船理論
這個故事是出自莊子的《山木》中的“方舟濟河”,這是一篇極富道家哲學的寓言,一個人在乘船渡河的時候,前面一只船正要撞過來。
這個人喊了好幾聲沒有人回應,于是破口大罵前面開船的人不長眼。
結果撞上來的竟是一只空船,于是剛才怒氣沖沖的人,一下子怒火就消失得無影無蹤了。
其實你會發現,生氣與不生氣,取決撞來的船上有沒有人!
一個人“看不慣”的東西、人和事越多,這個人的境界也就越低,格局也就越小。
一個人如果總是以自我為中心,太把自己當回事,那么就很容易與別人起沖突。
前言
我們知道,在日常開發中使用的HashMap是線程不安全的,而線程安全類HashTable只是簡單的在方法上加鎖實現線程安全,效率低下,所以在線程安全的環境下我們通常會使用ConcurrentHashMap,但是又為何需要學習ConcurrentHashMap?用不就完事了?我認為學習其源碼有兩個好處:
Amdahl定律
此節定律描述均來自《Java并發編程實戰》一書
假設F是必須被串行執行的部分,N代表處理器數量,Speedup代表加速比,可以簡單理解為CPU使用率
此公式告訴我們,當N趨近無限大,加速比最大趨近于1/F,假設我們的程序有50%的部分需要串行執行,就算處理器數量無限多,最高的加速比只能是2(20%的使用率),如果程序中僅有10%的部分需要串行執行,最高的加速比可以達到9.2(92%的使用率),但我們的程序或多或少都一定會有串行執行的部分,所以F不可能為0,所以,就算有無限多的CPU,加速比也不可能達到10(100%的使用率),下面給一張圖來表示串行執行部分占比不同對利用率的影響:
由此我們可以看出,程序中的可伸縮性(提升外部資源即可提升并發性能的比率)是由程序中串行執行部分所影響的,而常見的串行執行有鎖競爭(上下文切換消耗、等待、串行)等等,這給了我們一個啟發,可以通過減少鎖競爭來優化并發性能,而ConcurrentHashMap則使用了鎖分段(減小鎖范圍)、CAS(樂觀鎖,減小上下文切換開銷,無阻塞)等等技術,下面來具體看看吧。
初始化數據結構時的線程安全
HashMap的底層數據結構這里簡單帶過一下,不做過多贅述:
大致是以一個Node對象數組來存放數據,Hash沖突時會形成Node鏈表,在鏈表長度超過8,Node數組超過64時會將鏈表結構轉換為紅黑樹,Node對象:
值得注意的是,value和next指針使用了volatile來保證其可見性
在JDK1.8中,初始化ConcurrentHashMap的時候這個Node[]數組是還未初始化的,會等到第一次put方法調用時才初始化:
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;//判斷Node數組為空if (tab == null || (n = tab.length) == 0)//初始化Node數組tab = initTable();... }此時是會有并發問題的,如果多個線程同時調用initTable初始化Node數組怎么辦?看看大師是如何處理的:
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;//每次循環都獲取最新的Node數組引用while ((tab = table) == null || tab.length == 0) {//sizeCtl是一個標記位,若為-1也就是小于0,代表有線程在進行初始化工作了if ((sc = sizeCtl) < 0)//讓出CPU時間片Thread.yield(); // lost initialization race; just spin//CAS操作,將本實例的sizeCtl變量設置為-1else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//如果CAS操作成功了,代表本線程將負責初始化工作try {//再檢查一遍數組是否為空if ((tab = table) == null || tab.length == 0) {//在初始化Map時,sizeCtl代表數組大小,默認16//所以此時n默認為16int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")//Node數組Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//將其賦值給table變量table = tab = nt;//通過位運算,n減去n二進制右移2位,相當于乘以0.75//例如16經過運算為12,與乘0.75一樣,只不過位運算更快sc = n - (n >>> 2);}} finally {//將計算后的sc(12)直接賦值給sizeCtl,表示達到12長度就擴容//由于這里只會有一個線程在執行,直接賦值即可,沒有線程安全問題//只需要保證可見性sizeCtl = sc;}break;}}return tab; }table變量使用了volatile來保證每次獲取到的都是最新寫入的值
transient volatile Node<K,V>[] table;
總結
就算有多個線程同時進行put操作,在初始化數組時使用了樂觀鎖CAS操作來決定到底是哪個線程有資格進行初始化,其他線程均只能等待。
用到的并發技巧:
- volatile變量(sizeCtl):它是一個標記位,用來告訴其他線程這個坑位有沒有人在,其線程間的可見性由volatile保證。
- CAS操作:CAS操作保證了設置sizeCtl標記位的原子性,保證了只有一個線程能設置成功。
put操作的線程安全
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();//對key的hashCode進行散列int hash = spread(key.hashCode());int binCount = 0;//一個無限循環,直到put操作完成后退出循環for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;//當Node數組為空時進行初始化if (tab == null || (n = tab.length) == 0)tab = initTable();//Unsafe類volatile的方式取出hashCode散列后通過與運算得出的Node數組下標值對應的Node對象//此時的Node對象若為空,則代表還未有線程對此Node進行插入操作else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//直接CAS方式插入數據if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))//插入成功,退出循環break; // no lock when adding to empty bin}//查看是否在擴容,先不看,擴容再介紹else if ((fh = f.hash) == MOVED)//幫助擴容tab = helpTransfer(tab, f);else {V oldVal = null;//對Node對象進行加鎖synchronized (f) {//二次確認此Node對象還是原來的那一個if (tabAt(tab, i) == f) {if (fh >= 0) {binCount = 1;//無限循環,直到完成putfor (Node<K,V> e = f;; ++binCount) {K ek;//和HashMap一樣,先比較hash,再比較equalsif (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {//和鏈表頭Node節點不沖突,就將其初始化為新Node作為上一個Node節點的next//形成鏈表結構pred.next = new Node<K,V>(hash, key,value, null);break;}}}... }值得關注的是tabAt(tab, i)方法,其使用Unsafe類volatile的操作volatile式地查看值,保證每次獲取到的值都是最新的:
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); }雖然上面的table變量加了volatile,但也只能保證其引用的可見性,并不能確保其數組中的對象是否是最新的,所以需要Unsafe類volatile式地拿到最新的Node
總結
由于其減小了鎖的粒度,若Hash完美不沖突的情況下,可同時支持n個線程同時put操作,n為Node數組大小,在默認大小16下,可以支持最大同時16個線程無競爭同時操作且線程安全。當hash沖突嚴重時,Node鏈表越來越長,將導致嚴重的鎖競爭,此時會進行擴容,將Node進行再散列,下面會介紹擴容的線程安全性。總結一下用到的并發技巧:
擴容操作的線程安全
在擴容時,ConcurrentHashMap支持多線程并發擴容,在擴容過程中同時支持get查數據,若有線程put數據,還會幫助一起擴容,這種無阻塞算法,將并行最大化的設計,堪稱一絕。
先來看看擴容代碼實現:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;//根據機器CPU核心數來計算,一條線程負責Node數組中多長的遷移量if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)//本線程分到的遷移量//假設為16(默認也為16)stride = MIN_TRANSFER_STRIDE; // subdivide range//nextTab若為空代表線程是第一個進行遷移的//初始化遷移后的新Node數組if (nextTab == null) { // initiatingtry {@SuppressWarnings("unchecked")//這里n為舊數組長度,左移一位相當于乘以2//例如原數組長度16,新數組長度則為32Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}//設置nextTable變量為新數組nextTable = nextTab;//假設為16transferIndex = n;}//假設為32int nextn = nextTab.length;//標示Node對象,此對象的hash變量為-1//在get或者put時若遇到此Node,則可以知道當前Node正在遷移//傳入nextTab對象ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {Node<K,V> f; int fh;while (advance) {int nextIndex, nextBound;//i為當前正在處理的Node數組下標,每次處理一個Node節點就會自減1if (--i >= bound || finishing)advance = false;//假設nextIndex=16else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}//由以上假設,nextBound就為0//且將nextIndex設置為0else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {//bound=0bound = nextBound;//i=16-1=15i = nextIndex - 1;advance = false;}}if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit}}//此時i=15,取出Node數組下標為15的那個Node,若為空則不需要遷移//直接設置占位標示,代表此Node已處理完成else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);//檢測此Node的hash是否為MOVED,MOVED是一個常量-1,也就是上面說的占位Node的hash//如果是占位Node,證明此節點已經處理過了,跳過i=15的處理,繼續循環else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {//鎖住這個Nodesynchronized (f) {//確認Node是原先的Nodeif (tabAt(tab, i) == f) {//ln為lowNode,低位Node,hn為highNode,高位Node//這兩個概念下面以圖來說明Node<K,V> ln, hn;if (fh >= 0) {//此時fh與原來Node數組長度進行與運算//如果高X位為0,此時runBit=0//如果高X位為1,此時runBit=1int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {//這里的Node,都是同一Node鏈表中的Node對象int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}//正如上面所說,runBit=0,表示此Node為低位Nodeif (runBit == 0) {ln = lastRun;hn = null;}else {//Node為高位Nodehn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;//若hash和n與運算為0,證明為低位Node,原理同上if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);//這里將高位Node與地位Node都各自組成了兩個鏈表elsehn = new Node<K,V>(ph, pk, pv, hn);}//將低位Node設置到新Node數組中,下標為原來的位置setTabAt(nextTab, i, ln);//將高位Node設置到新Node數組中,下標為原來的位置加上原Node數組長度setTabAt(nextTab, i + n, hn);//將此Node設置為占位Node,代表處理完成setTabAt(tab, i, fwd);//繼續循環advance = true;}....}}}} }這里說一下遷移時為什么要分一個ln(低位Node)、hn(高位Node),首先說一個現象:
我們知道,在put值的時候,首先會計算hash值,再散列到指定的Node數組下標中:
//根據key的hashCode再散列 int hash = spread(key.hashCode()); //使用(n - 1) & hash 運算,定位Node數組中下標值 (f = tabAt(tab, i = (n - 1) & hash);其中n為Node數組長度,這里假設為16。
假設有一個key進來,它的散列之后的hash=9,那么它的下標值是多少呢?
(16 - 1)和 9 進行與運算 -> 0000 1111 和 0000 1001 結果還是 0000 1001 = 9假設Node數組需要擴容,我們知道,擴容是將數組長度增加兩倍,也就是32,那么下標值會是多少呢?
(32 - 1)和 9 進行與運算 -> 0001 1111 和 0000 1001 結果還是9此時,我們把散列之后的hash換成20,那么會有怎樣的變化呢?
(16 - 1)和 20 進行與運算 -> 0000 1111 和 0001 0100 結果是 0000 0100 = 4 (32 - 1)和 20 進行與運算 -> 0001 1111 和 0001 0100 結果是 0001 0100 = 20此時細心的讀者應該可以發現,如果hash在高X位為1,(X為數組長度的二進制-1的最高位),則擴容時是需要變換在Node數組中的索引值的,不然就hash不到,丟失數據,所以這里在遷移的時候將高X位為1的Node分類為hn,將高X位為0的Node分類為ln。
回到代碼中:
for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn); }這個操作將高低位組成了兩條鏈表結構,由下圖所示:
然后將其CAS操作放入新的Node數組中:
其中,低位鏈表放入原下標處,而高位鏈表則需要加上原Node數組長度,其中為什么不多贅述,上面已經舉例說明了,這樣就可以保證高位Node在遷移到新Node數組中依然可以使用hash算法散列到對應下標的數組中去了。
最后將原Node數組中對應下標Node對象設置為fwd標記Node,表示該節點遷移完成,到這里,一個節點的遷移就完成了,將進行下一個節點的遷移,也就是i-1=14下標的Node節點。
擴容時的get操作
假設Node下標為16的Node節點正在遷移,突然有一個線程進來調用get方法,正好key又散列到下標為16的節點,此時怎么辦?
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;int h = spread(key.hashCode());if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}//假如Node節點的hash值小于0//則有可能是fwd節點else if (eh < 0)//調用節點對象的find方法查找值return (p = e.find(h, key)) != null ? p.val : null;while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null; }重點看有注釋的那兩行,在get操作的源碼中,會判斷Node中的hash是否小于0,是否還記得我們的占位Node,其hash為MOVED,為常量值-1,所以此時判斷線程正在遷移,委托給fwd占位Node去查找值:
//內部類 ForwardingNode中 Node<K,V> find(int h, Object k) {// loop to avoid arbitrarily deep recursion on forwarding nodes// 這里的查找,是去新Node數組中查找的// 下面的查找過程與HashMap查找無異,不多贅述outer: for (Node<K,V>[] tab = nextTable;;) {Node<K,V> e; int n;if (k == null || tab == null || (n = tab.length) == 0 ||(e = tabAt(tab, (n - 1) & h)) == null)return null;for (;;) {int eh; K ek;if ((eh = e.hash) == h &&((ek = e.key) == k || (ek != null && k.equals(ek))))return e;if (eh < 0) {if (e instanceof ForwardingNode) {tab = ((ForwardingNode<K,V>)e).nextTable;continue outer;}elsereturn e.find(h, k);}if ((e = e.next) == null)return null;}} }到這里應該可以恍然大悟了,之所以占位Node需要保存新Node數組的引用也是因為這個,它可以支持在遷移的過程中照樣不阻塞地查找值,可謂是精妙絕倫的設計。
多線程協助擴容
在put操作時,假設正在遷移,正好有一個線程進來,想要put值到遷移的Node上,怎么辦?
final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}//若此時發現了占位Node,證明此時HashMap正在遷移else if ((fh = f.hash) == MOVED)//進行協助遷移tab = helpTransfer(tab, f);... } final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {int rs = resizeStamp(tab.length);while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;//sizeCtl加一,標示多一個線程進來協助擴容if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//擴容transfer(tab, nextTab);break;}}return nextTab;}return table; }此方法涉及大量復雜的位運算,這里不多贅述,只是簡單的說幾句,此時sizeCtl變量用來標示HashMap正在擴容,當其準備擴容時,會將sizeCtl設置為一個負數,(例如數組長度為16時)其二進制表示為:
1000 0000 0001 1011 0000 0000 0000 0010
無符號位為1,表示負數。其中高16位代表數組長度的一個位算法標示(有點像epoch的作用,表示當前遷移朝代為數組長度X),低16位表示有幾個線程正在做遷移,剛開始為2,接下來自增1,線程遷移完會進行減1操作,也就是如果低十六位為2,代表有一個線程正在遷移,如果為3,代表2個線程正在遷移以此類推…
只要數組長度足夠長,就可以同時容納足夠多的線程來一起擴容,最大化并行任務,提高性能。
在什么情況下會進行擴容操作?
- 在put值時,發現Node為占位Node(fwd)時,會協助擴容
- 在新增節點后,檢測到鏈表長度大于8時
treeifyBin方法會將鏈表轉換為紅黑樹,增加查找效率,但在這之前,會檢查數組長度,若小于64,則會優先做擴容操作:
private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;if (tab != null) {//MIN_TREEIFY_CAPACITY=64//若數組長度小于64,則先擴容if ((n = tab.length) < MIN_TREEIFY_CAPACITY)//擴容tryPresize(n << 1);else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {synchronized (b) {//...轉換為紅黑樹的操作}}} }在每次新增節點之后,都會調用addCount方法,檢測Node數組大小是否達到閾值:
final V putVal(K key, V value, boolean onlyIfAbsent) {...//在下面一節會講到,此方法統計容器元素數量addCount(1L, binCount);return null; } private final void addCount(long x, int check) {CounterCell[] as; long b, s;if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//統計元素個數的操作...}if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;//元素個數達到閾值,進行擴容while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);//發現sizeCtl為負數,證明有線程正在遷移if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}//不為負數,則為第一個遷移的線程else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}} }總結
ConcurrentHashMap運用各類CAS操作,將擴容操作的并發性能實現最大化,在擴容過程中,就算有線程調用get查詢方法,也可以安全的查詢數據,若有線程進行put操作,還會協助擴容,利用sizeCtl標記位和各種volatile變量進行CAS操作達到多線程之間的通信、協助,在遷移過程中只鎖一個Node節點,即保證了線程安全,又提高了并發性能。
統計容器大小的線程安全
ConcurrentHashMap在每次put操作之后都會調用addCount方法,此方法用于統計容器大小且檢測容器大小是否達到閾值,若達到閾值需要進行擴容操作,這在上面也是有提到的。這一節重點討論容器大小的統計是如何做到線程安全且并發性能不低的。
大部分的單機數據查詢優化方案都會降低并發性能,就像緩存的存儲,在多線程環境下將有并發問題,所以會產生并行或者一系列并發沖突鎖競爭的問題,降低了并發性能。類似的,熱點數據也有這樣的問題,在多線程并發的過程中,熱點數據(頻繁被訪問的變量)是在每一個線程中幾乎或多或少都會訪問到的數據,這將增加程序中的串行部分,回憶一下開頭所描述的,程序中的串行部分將影響并發的可伸縮性,使并發性能下降,這通常會成為并發程序性能的瓶頸。而在ConcurrentHashMap中,如何快速的統計容器大小更是一個很重要的議題,因為容器內部需要依靠容器大小來考慮是否需要擴容,而在客戶端而言需要調用此方法來知道容器有多少個元素,如果處理不好這種熱點數據,并發性能將因為這個短板整體性能下降。
試想一下,如果是你,你會如何設計這種熱點數據?是加鎖,還是進行CAS操作?進入ConcurrentHashMap中,看看大師是如何巧妙的運用了并發技巧,提高熱點數據的并發性能。
先用圖的方式來看看大致的實現思路:
這是一個粗略的實現,在設計中,使用了分而治之的思想,將每一個計數都分散到各個countCell對象里面(下面稱之為桶),使競爭最小化,又使用了CAS操作,就算有競爭,也可以對失敗了的線程進行其他的處理。樂觀鎖的實現方式與悲觀鎖不同之處就在于樂觀鎖可以對競爭失敗了的線程進行其他策略的處理,而悲觀鎖只能等待鎖釋放,所以這里使用CAS操作對競爭失敗的線程做了其他處理,很巧妙的運用了CAS樂觀鎖。
下面看看具體的代碼實現吧:
//計數,并檢查長度是否達到閾值 private final void addCount(long x, int check) {//計數桶CounterCell[] as; long b, s;//如果counterCells不為null,則代表已經初始化了,直接進入if語句塊//若競爭不嚴重,counterCells有可能還未初始化,為null,先嘗試CAS操作遞增baseCount值if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//進入此語句塊有兩種可能//1.counterCells被初始化完成了,不為null//2.CAS操作遞增baseCount值失敗了,說明有競爭CounterCell a; long v; int m;//標志是否存在競爭boolean uncontended = true;//1.先判斷計數桶是否還沒初始化,則as=null,進入語句塊//2.判斷計數桶長度是否為空或,若是進入語句塊//3.這里做了一個線程變量隨機數,與上桶大小-1,若桶的這個位置為空,進入語句塊//4.到這里說明桶已經初始化了,且隨機的這個位置不為空,嘗試CAS操作使桶加1,失敗進入語句塊if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);return;}if (check <= 1)return;//統計容器大小s = sumCount();}... }假設當前線程為第一個put的線程
先假設當前Map還未被put數據,則addCount一定沒有被調用過,當前線程第一個調用addCount方法,則此時countCell一定沒有被初始化,為null,則進行如下判斷:
if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x))這里的if判斷一定會走第二個判斷,先CAS增加變量baseCount的值:
private transient volatile long baseCount;這個值有什么用呢?我們看看統計容器大小的方法sumCount:
final long sumCount() {//獲取計數桶CounterCell[] as = counterCells; CounterCell a;//獲取baseCount,賦值給sum總數long sum = baseCount;//若計數桶不為空,統計計數桶內的值if (as != null) {for (int i = 0; i < as.length; ++i) {//遍歷計數桶,將value值相加if ((a = as[i]) != null)sum += a.value;}}return sum; }這個方法的大體思路與我們開頭那張圖差不多,容器的大小其實是分為兩部分,開頭只說了計數桶的那部分,其實還有一個baseCount,在線程沒有競爭的情況下的統計值,換句話說,在增加容量的時候其實是先去做CAS遞增baseCount的。
由此可見,統計容器大小其實是用了兩種思路:
出現了線程競爭導致CAS失敗
此時出現了競爭,則不會再用CAS方式來計數了,直接使用桶方式,從上面的addCount方法可以看出來,此時的countCell是為空的,最終一定會進入fullAddCount方法來進行初始化桶:
private final void fullAddCount(long x, boolean wasUncontended) {int h;if ((h = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit(); // force initializationh = ThreadLocalRandom.getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonemptyfor (;;) {CounterCell[] as; CounterCell a; int n; long v;...//如果計數桶!=null,證明已經初始化,此時不走此語句塊if ((as = counterCells) != null && (n = as.length) > 0) {...}//進入此語句塊進行計數桶的初始化//CAS設置cellsBusy=1,表示現在計數桶Busy中...else if (cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//若有線程同時初始化計數桶,由于CAS操作只有一個線程進入這里boolean init = false;try { // Initialize table//再次確認計數桶為空if (counterCells == as) {//初始化一個長度為2的計數桶CounterCell[] rs = new CounterCell[2];//h為一個隨機數,與上1則代表結果為0、1中隨機的一個//也就是在0、1下標中隨便選一個計數桶,x=1,放入1的值代表增加1個容量rs[h & 1] = new CounterCell(x);//將初始化好的計數桶賦值給ConcurrentHashMapcounterCells = rs;init = true;}} finally {//最后將busy標識設置為0,表示不busy了cellsBusy = 0;}if (init)break;}//若有線程同時來初始化計數桶,則沒有搶到busy資格的線程就先來CAS遞增baseCountelse if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))break; // Fall back on using base}}到這里就完成了計數桶的初始化工作,在之后的計數都將會使用計數桶方式來統計總數
計數桶擴容
從上面的分析中我們知道,計數桶初始化之后長度為2,在競爭大的時候肯定是不夠用的,所以一定有計數桶的擴容操作,所以現在就有兩個問題了:
假設此時是用計數桶方式進行計數:
private final void addCount(long x, int check) {CounterCell[] as; long b, s;if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;//此時顯然會在計數桶數組中隨機選一個計數桶//然后使用CAS操作將此計數桶中的value+1if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {//若CAS操作失敗,證明有競爭,進入fullAddCount方法fullAddCount(x, uncontended);return;}if (check <= 1)return;s = sumCount();}... }進入fullAddCount方法:
private final void fullAddCount(long x, boolean wasUncontended) {int h;if ((h = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit(); // force initializationh = ThreadLocalRandom.getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonemptyfor (;;) {CounterCell[] as; CounterCell a; int n; long v;//計數桶初始化好了,一定是走這個if語句塊if ((as = counterCells) != null && (n = as.length) > 0) {//從計數桶數組隨機選一個計數桶,若為null表示該桶位還沒線程遞增過if ((a = as[(n - 1) & h]) == null) {//查看計數桶busy狀態是否被標識if (cellsBusy == 0) { // Try to attach new Cell//若不busy,直接new一個計數桶CounterCell r = new CounterCell(x); // Optimistic create//CAS操作,標示計數桶busy中if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean created = false;try { // Recheck under lockCounterCell[] rs; int m, j;//在鎖下再檢查一次計數桶為nullif ((rs = counterCells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {//將剛剛創建的計數桶賦值給對應位置rs[j] = r;created = true;}} finally {//標示不busy了cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty}}collide = false;}else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehash//走到這里代表計數桶不為null,嘗試遞增計數桶else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))break;else if (counterCells != as || n >= NCPU)collide = false; // At max size or stale//若CAS操作失敗了,到了這里,會先進入一次,然后再走一次剛剛的for循環//若是第二次for循環,collide=true,則不會走進去else if (!collide)collide = true;//計數桶擴容,一個線程若走了兩次for循環,也就是進行了多次CAS操作遞增計數桶失敗了//則進行計數桶擴容,CAS標示計數桶busy中else if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {try {//確認計數桶還是同一個if (counterCells == as) {// Expand table unless stale//將長度擴大到2倍CounterCell[] rs = new CounterCell[n << 1];//遍歷舊計數桶,將引用直接搬過來for (int i = 0; i < n; ++i)rs[i] = as[i];//賦值counterCells = rs;}} finally {//取消busy狀態cellsBusy = 0;}collide = false;continue; // Retry with expanded table}h = ThreadLocalRandom.advanceProbe(h);}else if (cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//初始化計數桶...}else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))break; // Fall back on using base} }看到這里,想必已經可以解決上面兩個問題了:
答:在CAS操作遞增計數桶失敗了3次之后,會進行擴容計數桶操作,注意此時同時進行了兩次隨機定位計數桶來進行CAS遞增的,所以此時可以保證大概率是因為計數桶不夠用了,才會進行計數桶擴容
答:計數桶長度增加到兩倍長度,數據直接遍歷遷移過來,由于計數桶不像HashMap數據結構那么復雜,有hash算法的影響,加上計數桶只是存放一個long類型的計數值而已,所以直接賦值引用即可。
總結
個人感覺,統計容器大小的線程安全與擴容ConcurrentHashMap這兩個算得上ConcurrentHashMap中最聰明的兩個并發設計了,閱讀此源碼的我看到這兩個操作的設計,都忍不住拍手叫絕,我想,這或許也是一個看源碼的樂趣吧,站在巨人的肩膀看巨人的思想。
總結一下計數中用到的并發技巧:
get操作的線程安全
對于get操作,其實沒有線程安全的問題,只有可見性的問題,只需要確保get的數據是線程之間可見的即可:
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;int h = spread(key.hashCode());//此過程與HashMap的get操作無異,不多贅述if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}//當hash<0,有可能是在遷移,使用fwd占位Node去查找新table中的數據else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null; }在get操作中除了增加了遷移的判斷以外,基本與HashMap的get操作無異,這里不多贅述,值得一提的是這里使用了tabAt方法Unsafe類volatile的方式去獲取Node數組中的Node,保證獲得到的Node是最新的
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); }JDK1.7與1.8的不同實現
JDK1.7的ConcurrentHashMap底層數據結構:
其中1.7的實現也同樣采用了分段鎖的技術,只不過多個一個segment,一個segment里對應一個小HashMap,其中segment繼承了ReentrantLock,充當了鎖的角色,一把鎖鎖一個小HashMap(相當于多個Node),從1.8的實現來看, 鎖的粒度從多個Node級別又減小到一個Node級別,再度減小鎖競爭,減小程序同步的部分。
總結
不得不說,大師將CAS操作運用的淋漓盡致,相信理解了以上源碼的讀者也可以學習到大師所運用的并發技巧,不僅僅是在ConcurrentHashMap中,其實在大部分JUC的源碼中很多并發思想很值得我們去閱讀、學習。
源碼分析
構造函數
public ConcurrentHashMap() {}public ConcurrentHashMap(int initialCapacity) {if (initialCapacity < 0)throw new IllegalArgumentException();int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?MAXIMUM_CAPACITY :tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));this.sizeCtl = cap;}public ConcurrentHashMap(Map<? extends K, ? extends V> m) {this.sizeCtl = DEFAULT_CAPACITY;putAll(m);}public ConcurrentHashMap(int initialCapacity, float loadFactor) {this(initialCapacity, loadFactor, 1);}public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (initialCapacity < concurrencyLevel) // Use at least as many binsinitialCapacity = concurrencyLevel; // as estimated threadslong size = (long)(1.0 + (long)initialCapacity / loadFactor);int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);this.sizeCtl = cap;}靜態變量
private static final int MAXIMUM_CAPACITY = 1 << 30;private static final int DEFAULT_CAPACITY = 16;static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;private static final int DEFAULT_CONCURRENCY_LEVEL = 16;private static final float LOAD_FACTOR = 0.75f;static final int TREEIFY_THRESHOLD = 8;static final int UNTREEIFY_THRESHOLD = 6;static final int MIN_TREEIFY_CAPACITY = 64;private static final int MIN_TRANSFER_STRIDE = 16;private static int RESIZE_STAMP_BITS = 16;private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;static final int MOVED = -1; // hash for forwarding nodesstatic final int TREEBIN = -2; // hash for roots of treesstatic final int RESERVED = -3; // hash for transient reservations成員變量
/*** The array of bins. Lazily initialized upon first insertion.* Size is always a power of two. Accessed directly by iterators.*/transient volatile Node<K,V>[] table;/*** The next table to use; non-null only while resizing.*/private transient volatile Node<K,V>[] nextTable;/*** Base counter value, used mainly when there is no contention,* but also as a fallback during table initialization* races. Updated via CAS.*/private transient volatile long baseCount;/*** Table initialization and resizing control. When negative, the* table is being initialized or resized: -1 for initialization,* else -(1 + the number of active resizing threads). Otherwise,* when table is null, holds the initial table size to use upon* creation, or 0 for default. After initialization, holds the* next element count value upon which to resize the table.*/private transient volatile int sizeCtl;/*** The next table index (plus one) to split while resizing.*/private transient volatile int transferIndex;/*** Spinlock (locked via CAS) used when resizing and/or creating CounterCells.*/private transient volatile int cellsBusy;/*** Table of counter cells. When non-null, size is a power of 2.*/private transient volatile CounterCell[] counterCells;// viewsprivate transient KeySetView<K,V> keySet;private transient ValuesView<K,V> values;private transient EntrySetView<K,V> entrySet;方法
public V put(K key, V value) {return putVal(key, value, false);}/** Implementation for put and putIfAbsent */final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null)throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K, V>[] tab = table;;) {Node<K, V> f;int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null, new Node<K, V>(hash, key, value, null)))break; // no lock when adding to empty bin} else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {binCount = 1;for (Node<K, V> e = f;; ++binCount) {K ek;if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K, V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K, V>(hash, key, value, null);break;}}} else if (f instanceof TreeBin) {Node<K, V> p;binCount = 2;if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);return null;} public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;int h = spread(key.hashCode());if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;}https://www.cnblogs.com/banjinbaijiu/p/9147434.html
總結
以上是生活随笔為你收集整理的Java中ConcurrentHashMap底层实现原理(JDK1.8)源码分析2的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java的poll方法_Java中的队列
- 下一篇: 记忆力太差了,怎么回事情呢?