Java集合:ConcurrentHashMap(JDK 1.7 JDK 1.8)
ConcurrentHashMap(JDK 1.7)
? ?HashTable容器在競爭激烈的并發環境下表現出效率低下的原因,是因為所有訪問HashTable的線程都必須競爭同一把鎖,那假如容器里有多把鎖,每一把鎖用于鎖容器其中一部分數據,那么當多線程訪問容器里不同數據段的數據時,線程間就不會存在鎖競爭,從而可以有效的提高并發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術,首先將數據分成一段一段的存儲,然后給每一段數據配一把鎖,當一個線程占用鎖訪問其中一個段數據的時候,其他段的數據也能被其他線程訪問。有些方法需要跨段,比如size()和containsValue(),它們可能需要鎖定整個表而而不僅僅是某個段,這需要按順序鎖定所有段,操作完畢后,又按順序釋放所有段的鎖。這里“按順序”是很重要的,否則極有可能出現死鎖,在ConcurrentHashMap內部,段數組是final的,并且其成員變量實際上也是final的,但是,僅僅是將數組聲明為final的并不保證數組成員也是final的,這需要實現上的保證。這可以確保不會出現死鎖,因為獲得鎖的順序是固定的。
ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment是一種可重入鎖ReentrantLock,在ConcurrentHashMap里扮演鎖的角色,HashEntry則用于存儲鍵值對數據。一個ConcurrentHashMap里包含一個Segment數組,Segment的結構和HashMap類似,是一種數組和鏈表結構,?一個Segment里包含一個HashEntry數組,每個HashEntry是一個鏈表結構的元素,?每個Segment守護者一個HashEntry數組里的元素,當對HashEntry數組的數據進行修改時,必須首先獲得它對應的Segment鎖。
應用場景
當有一個大數組時需要在多個線程共享時就可以考慮是否把它給分成多個節點了,避免大鎖。并可以考慮通過hash算法進行一些模塊定位。
其實不止用于線程,當設計數據表的事務時(事務某種意義上也是同步機制的體現),可以把一個表看成一個需要同步的數組,如果操作的表數據太多時就可以考慮事務分離了(這也是為什么要避免大表的出現),比如把數據進行字段拆分,水平分表等.
-
ConcurrentHashMap 是由 Segment 數組和 HashEntry 數組和鏈表組成
-
Segment 是基于重入鎖(ReentrantLock):一個數據段競爭鎖。每個 HashEntry 一個鏈表結構的元素,利用 Hash 算法得到索引確定歸屬的數據段,也就是對應到在修改時需要競爭獲取的鎖。ConcurrentHashMap 支持 CurrencyLevel(Segment 數組數量)的線程并發。每當一個線程占用鎖訪問一個 Segment 時,不會影響到其他的 Segment
-
核心數據如 value,以及鏈表都是 volatile 修飾的,保證了獲取時的可見性
-
首先是通過 key 定位到 Segment,之后在對應的 Segment 中進行具體的 put 操作如下:
-
將當前 Segment 中的 table 通過 key 的 hashcode 定位到 HashEntry。
-
遍歷該 HashEntry,如果不為空則判斷傳入的? key 和當前遍歷的 key 是否相等,相等則覆蓋舊的 value
-
不為空則需要新建一個 HashEntry 并加入到 Segment 中,同時會先判斷是否需要擴容
-
最后會解除在 1 中所獲取當前 Segment 的鎖
-
雖然 HashEntry 中的 value 是用 volatile 關鍵詞修飾的,但是并不能保證并發的原子性,所以 put 操作時仍然需要加鎖處理
首先第一步的時候會嘗試獲取鎖,如果獲取失敗肯定就有其他線程存在競爭,則利用 scanAndLockForPut() 自旋獲取鎖
-
嘗試自旋獲取鎖
-
如果重試的次數達到了 MAX_SCAN_RETRIES 則改為阻塞鎖獲取,保證能獲取成功。最后解除當前 Segment 的鎖
ConcurrentHashMap設計
ConcurrentHashMap是線程安全,通過分段鎖的方式提高了并發度。分段Segment是一開始就確定的了,后期不能再進行擴容的。
其中的段Segment繼承了重入鎖ReentrantLock,所以調用lock方法將當前segment進行加鎖,有了鎖的功能,同時含有類似HashMap中的數組加鏈表結構(這里沒有使用紅黑樹)
雖然Segment的個數是不能擴容的,但是單個Segment里面的數組是可以擴容的。
jdk1.7中采用Segment?+?HashEntry的方式進行實現,
結構如下:
2.1 整體概覽
ConcurrentHashMap有3個參數:
·????????initialCapacity:初始總容量,默認16
·????????loadFactor:加載因子,默認0.75
·????????concurrencyLevel:并發級別,默認16
?
segment的個數即ssize(段數)
取大于等于并發級別的最小的2的冪次。如concurrencyLevel=16,那么sszie=16,如concurrencyLevel=10,那么ssize=16
單個segment的初始容量cap(容量/段數,如果有余數則向上+1)
c=initialCapacity/ssize,并且可能需要+1。如15/7=2,那么c要取3,如16/8=2,那么c取2,c可能是一個任意值,那么同上述一樣,cap取的值就是大于等于c的最小2的冪次。最小值要求是2
單個segment的閾值threshold(cap*loadFactor)
如2*0.75=1.5。
·????????所以默認情況下,
·????????initialCapacity:初始總容量,默認16
·????????loadFactor:加載因子,默認0.75
·????????concurrencyLevel:并發級別,默認16
segment的個數sszie=16,每個segment的初始容量cap=2(最少為2),單個segment的閾值threshold=1
2.2 put過程
·????????首先根據key計算出一個hash值,找到對應的Segment
·????????調用Segment的lock方法,為后面的put操作加鎖
·????????根據key計算出hash值,找到Segment中數組中對應index的鏈表,并將該數據放置到該鏈表中
·????????判斷當前Segment包含元素的數量大于閾值,則Segment進行擴容
2.4 get過程
·????????根據key計算出對應的segment
·????????再根據key計算出對應segment中數組的index
·????????最終遍歷上述index位置的鏈表,查找出對應的key的value
2.5 size()操作
? ? ?因為ConcurrentHashMap是可以并發插入數據的,所以在準確計算元素數量時存在一定的難度,一般的思路是統計每個Segment對象中的元素個數,然后進行累加,但是這種方式計算出來的結果并不一樣的準確的,因為在計算后面幾個Segment的元素個數時,已經計算過的Segment同時可能有數據的插入或則刪除。
如果我們要統計整個ConcurrentHashMap里元素的大小,就必須統計所有Segment里元素的大小后求和。Segment里的全局變量count是一個volatile變量,那么在多線程場景下,我們是不是直接把所有Segment的count相加就可以得到整個ConcurrentHashMap大小了呢?
? 不是的,雖然相加時可以獲取每個Segment的count的最新值,但是拿到之后可能累加前使用的count發生了變化,那么統計結果就不準了。所以最安全的做法,是在統計size的時候把所有Segment的put,remove和clean方法全部鎖住,但是這種做法顯然非常低效。
因為在累加count操作過程中,之前累加過的count發生變化的幾率非常小,所以ConcurrentHashMap的做法是先嘗試2次通過不鎖住Segment的方式來統計各個Segment大小,如果統計的過程中,容器的count發生了變化,則再采用加鎖的方式來統計所有Segment的大小。
那么ConcurrentHashMap是如何判斷在統計的時候容器是否發生了變化呢?使用modCount變量,在put?,?remove和clean方法里操作元素前都會將變量modCount進行加1,那么在統計size前后比較modCount是否發生變化,從而得知容器的大小是否發生變化。
總結:
先采用不加鎖的方式,連續計算元素的個數,最多計算3次:
1、如果前后兩次計算結果相同,則說明計算出來的元素個數是準確的;
2、如果前后兩次計算結果都不同,則給每個Segment進行加鎖(限制無法插入或者刪除數據),再計算一次元素的個數;
?
缺點
這一種結構的帶來的副作用是Hash的過程要比普通的HashMap要長
優點
寫操作的時候可以只對元素所在的Segment進行加鎖即可,不會影響到其他的Segment,這樣,在最理想的情況下,ConcurrentHashMap可以最高同時支持Segment數量大小的寫操作(剛好這些寫操作都非常平均地分布在所有的Segment上)。通過這一種結構,ConcurrentHashMap的并發能力可以大大的提高。
ConcurrentHashMap(JDK 1.8)
CocurrentHashMap?拋棄了原有的 Segment 分段鎖,采用了?CAS + synchronized?來保證并發安全性。
其中的?value和 next?都用了 volatile 修飾,保證了可見性。最大特點是引入了 CAS
CAS是compare and swap的縮寫,即我們所說的比較交換。cas是一種基于鎖的操作,而且是樂觀鎖。在java中鎖分為樂觀鎖和悲觀鎖。悲觀鎖是將資源鎖住,等一個之前獲得鎖的線程釋放鎖之后,下一個線程才可以訪問。而樂觀鎖采取了一種寬泛的態度,通過某種方式不加鎖來處理資源,比如通過給記錄加version來獲取數據,性能較悲觀鎖有很大的提高。
CAS有3個操作數,內存值 V、舊的預期值 A、要修改的新值 B。當且僅當預期值 A 和內存值 V 相同時,將內存值V修改為 B,否則什么都不做。借助 CPU 指令 cmpxchg 來實現。
CAS 使用實例
對 sizeCtl 的控制都是用 CAS 來實現的:
-
-1 代表 table 正在初始化
-
N 表示有 -N-1 個線程正在進行擴容操作
-
如果 table 未初始化,表示table需要初始化的大小
-
如果 table 初始化完成,表示table的容量,默認是table大小的0.75倍,用這個公式算 0.75(n – (n >>> 2))
CAS 會出現的問題:ABA
解決:對變量增加一個版本號,每次修改,版本號加 1,比較的時候比較版本號。
put 過程
-
根據 key 計算出 hashcode
-
判斷是否需要進行初始化
-
通過 key 定位出的 Node,如果為空表示當前位置可以寫入數據,利用 CAS 嘗試寫入,失敗則自旋保證成功
-
如果當前位置的 hashcode == MOVED == -1,則需要進行擴容
-
如果都不滿足,則利用 synchronized 鎖寫入數據
-
如果數量大于 TREEIFY_THRESHOLD 則要轉換為紅黑樹
get 過程
-
根據計算出來的 hashcode 尋址,如果就在桶上那么直接返回值
-
如果是紅黑樹那就按照樹的方式獲取值
-
就不滿足那就按照鏈表的方式遍歷獲取值
1.8的ConcurrentHashMap設計
1.8的ConcurrentHashMap摒棄了1.7的segment(鎖段)設計,而是啟用了一種全新的方式實現,利用CAS算法,在1.8HashMap的基礎上實現了線程安全的版本,即也是采用數組+鏈表+紅黑樹的形式。
數組可以擴容,鏈表可以轉化為紅黑樹
看完ConcurrentHashMap整個類的源碼,給自己的感覺就是和HashMap的實現基本一模一樣,當有修改操作時借助了synchronized來對table[i]進行鎖定保證了線程安全以及使用了CAS來保證原子性操作,其它的基本一致.
例如:ConcurrentHashMap的get(int key)方法的實現思路為:
根據key的hash值找到其在table所對應的位置i,然后在table[i]位置所存儲的鏈表(或者是樹)進行查找是否有鍵為key的節點,
如果有,則返回節點對應的value,否則返回null。思路是不是很熟悉,是不是和HashMap中該方法的思路一樣。
所以,如果你也在看ConcurrentHashMap的源碼,不要害怕,思路還是原來的思路,只是多了些許東西罷了。
3.1 整體概覽(JDK1.8)
sizeCtl最重要的屬性之一,看源碼之前,這個屬性表示什么意思,一定要記住。
private transient volatile int sizeCtl;//控制標識符
? transient是Java語言的關鍵字,用來表示一個域不是該對象序列化的一部分。當一個對象被序列化的時候,transient型變量的值不包括在序列化的表示中,然而非transient型的變量是被包括進去的。??
sizeCtl是控制標識符,不同的值表示不同的意義。
·????????負數代表正在進行初始化或擴容操作 ,其中-1代表正在初始化 ,-N 表示有N-1個線程正在進行擴容操作
·????????正數或0代表hash表還沒有被初始化,這個數值表示初始化或下一次進行擴容的大小,類似于擴容閾值。它的值始終是當前ConcurrentHashMap容量的0.75倍,表示閾值,實際容量>=sizeCtl,則擴容。
?
改進一:取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存數據,采用table數組元素作為鎖,從而實現了對每一行數據進行加鎖,進一步減少并發沖突的概率。
改進二:將原先table數組+單向鏈表的數據結構,變更為table數組+單向鏈表+紅黑樹的結構。對于hash表來說,最核心的能力在于將key hash之后能均勻的分布在數組中。如果hash之后散列的很均勻,那么table數組中的每個隊列長度主要為0或者1。但實際情況并非總是如此理想,雖然ConcurrentHashMap類默認的加載因子為0.75,但是在數據量過大或者運氣不佳的情況下,還是會存在一些隊列長度過長的情況,如果還是采用單向列表方式,那么查詢某個節點的時間復雜度為O(n);因此,對于個數超過8(默認值)的列表,jdk1.8中采用了紅黑樹的結構,那么查詢的時間復雜度可以降低到O(logN),可以改進性能。
為了說明以上2個改動,看一下put操作是如何實現的。
?
其實在我看來,就是兩處改進:
1.在未初始化的時候,采用CAS初始化,這樣可以避免多個key相同同時初始化導致某些先來的鍵值對丟失。
2.在檢測到容器在擴容的時候,不做任何的操作,等待擴容后的新表再進一步操作。這樣既可以解決多個線程同時擴容所帶來的鏈表死循環問題;也可以解決put的同時擴容,舊表在賦值給新表后,另外線程在get到null值。
3.最后在每個真正put的操作都加鎖,可以避免由于同時put導致某個位置重疊之類的問題。
3.2 Put 過程
finalV putVal(K key, V value, boolean onlyIfAbsent) {
????if(key == null || value == null) thrownew NullPointerException();
????inthash = spread(key.hashCode());
????intbinCount = 0;
????for(Node<K,V>[] tab = table;;) {
????????Node<K,V> f;int n, i, fh;
???????// 如果table為空或者長度為0,初始化;否則,根據hash值計算得到數組索引i,如果tab[i]為空,直接新建節點Node即可(此處用cas寫入新的數值,大概就是是null則新建Node)。注:tab[i]實質為鏈表或者紅黑樹的首節點。
???????if (tab == null || (n= tab.length) == 0)
???????????tab = initTable();
???????elseif ((f = tabAt(tab, i = (n - 1) & hash)) ==null) {
???????????if (casTabAt(tab, i, null,
?????????????????????????newNode<K,V>(hash, key, value, null)))
????????????????break;???????????????????// no lock when adding to empty bin
???????}
???????// 如果tab[i]不為空并且hash值為MOVED,說明該鏈表正在進行transfer操作,返回擴容完成后的table。(sizeCtl是控制標識符,此處MOVED=-1)//在hashmap有可能會因為取得舊表而get null,在這兒解決了 。因為這兒是發現擴容就等待新表再做下一步的操作。
???????elseif ((fh = f.hash) == MOVED)
???????????tab = helpTransfer(tab, f);
???????else {
???????????V oldVal = null;
???????????// 針對首個節點進行加鎖操作,而不是segment,進一步減少線程沖突。1.7是整個segment加鎖,一個segment又有很多個數組節點。
???????????synchronized(f) {
????????????????if(tabAt(tab, i) == f) {
????????????????????if (fh>= 0) {
????????????????????????binCount = 1;
????????????????????????for(Node<K,V> e = f;; ++binCount) {
????????????????????????????K ek;
????????????????????????????// 如果在鏈表中找到值為key的節點e,直接設置e.val =value即可。
????????????????????????????if(e.hash == hash &&
????????????????????????????????((ek = e.key)== key ||
?????????????????????????????????(ek != null&& key.equals(ek)))) {
????????????????????????????????oldVal = e.val;
????????????????????????????????if(!onlyIfAbsent)
????????????????????????????????????e.val = value;
????????????????????????????????break;
????????????????????????????}
????????????????????????????// 如果沒有找到值為key的節點,直接新建Node并加入鏈表即可。
????????????????????????????Node<K,V>pred = e;
????????????????????????????if((e = e.next) == null) {
????????????????????????????????pred.next = newNode<K,V>(hash, key,
?????????????????????????????????????????????????????????value, null);
????????????????????????????????break;
????????????????????????????}
????????????????????????}
????????????????????}
????????????????????// 如果首節點為TreeBin類型,說明為紅黑樹結構,執行putTreeVal操作。
????????????????????elseif(f instanceof TreeBin) {
????????????????????????Node<K,V> p;
????????????????????????binCount = 2;
????????????????????????if((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
??????????????????????????????????????????????????????value)) != null) {
????????????????????????????oldVal = p.val;
????????????????????????????if(!onlyIfAbsent)
????????????????????????????????p.val = value;
????????????????????????}
???????????????????}
????????????????}
???????????}
???????????if (binCount != 0) {
????????????????// 如果節點數>=8,那么轉換鏈表結構為紅黑樹結構。
????????????????if(binCount >= TREEIFY_THRESHOLD)
????????????????????treeifyBin(tab, i);
????????????????if(oldVal != null)
????????????????????returnoldVal;
????????????????break;
???????????}
???????}
????}
????// 計數增加1,有可能觸發transfer操作(擴容)。
???addCount(1L, binCount);
????returnnull;
}
當執行?put 方法插入數據時,根據key的hash值,在Node 數組中找到相應的位置,實現如下:
1. 如果相應位置的 Node 還未初始化,則通過CAS插入相應的數據;
2. 如果相應位置的 Node 不為空,且當前該節點不處于移動狀態,則對該節點加 synchronized 鎖,如果該節點的 hash 不小于0,則遍歷鏈表更新節點或插入新節點;
3. 如果該節點是 TreeBin 類型的節點,說明是紅黑樹結構,則通過 putTreeVal 方法往紅黑樹中插入節點;
4. 如果 binCount 不為0,說明put 操作對數據產生了影響,如果當前鏈表的個數達到8個,則通過treeifyBin 方法轉化為紅黑樹,如果 oldVal 不為空,說明是一次更新操作,沒有對元素個數產生影響,則直接返回舊值;
5. 如果插入的是一個新節點,則執行 addCount() 方法嘗試更新元素個數 baseCount ;
3.3 get過程
1.????根據k計算出hash值,首先定位到table[]中的i。
2.????若table[i]存在,則繼續查找。
3.????首先比較鏈表頭部,如果是則返回。
4.????然后如果為紅黑樹,查找樹。
5.????最后再循環鏈表查找。
·?????????
3.4擴容過程
一旦鏈表中的元素個數超過了8個,那么可以執行數組擴容或者鏈表轉為紅黑樹,這里依據的策略跟HashMap依據的策略是一致的。
當數組長度還未達到64個時,優先數組的擴容,否則選擇鏈表轉為紅黑樹。
3.5 size實現
JDK1.8的ConcurrentHashMap中保存元素的個數的記錄方法也有不同,首先在添加和刪除元素時,會通過CAS操作更新ConcurrentHashMap的baseCount屬性值來統計元素個數。但是CAS操作可能會失敗,因此,ConcurrentHashMap又定義了一個CounterCell數組來記錄CAS操作失敗時的元素個數。因此,ConcurrentHashMap中元素的個數則通過如下方式獲得:
元素總數 = baseCount + sum(CounterCell)
? ? final long sumCount() {
? ? ? ? CounterCell[] as = counterCells; CounterCell a;
? ? ? ? long sum = baseCount;
? ? ? ? if (as != null) {
? ? ? ? ? ? for (int i = 0; i < as.length; ++i) {
? ? ? ? ? ? ? ? if ((a = as[i]) != null)
? ? ? ? ? ? ? ? ? ? sum += a.value;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? return sum;
? ? }
而JDK1.8中提供了兩種方法獲取ConcurrentHashMap中的元素個數。
? ? public int size() {
? ? ? ? long n = sumCount();
? ? ? ? return ((n < 0L) ? 0 :
? ? ? ? ? ? ? ? (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
? ? ? ? ? ? ? ? (int)n);
? ? }
?
? ? public long mappingCount() {
? ? ? ? long n = sumCount();
? ? ? ? return (n < 0L) ? 0L : n; // ignore transient negative values
? ? }
如代碼所示,size只能獲取int范圍內的ConcurrentHashMap元素個數;而如果hash表中的數據過多,超過了int類型的最大值,則推薦使用mappingCount()方法獲取其元素個數。
JDK 8 推薦使用mappingCount 方法,因為這個方法的返回值是 long 類型,不會因為 size 方法是 int 類型限制最大值(size 方法是接口定義的,不能修改)。
?
put完成后,會調用addCount函數會通過CAS更新baseCount,更新失敗,會創建一個CounterCell對象,保存失敗的記錄數到counterCells數組,如果失敗,會調用fullAddCount死循環創建直到成功。每次addCount都會去調用sumCount函數遍歷counterCells數組,與baseCount累加,更新baseCount。
?
在沒有并發的情況下,使用一個 baseCount volatile 變量就足夠了,當并發的時候,CAS 修改 baseCount 失敗后,就會使用 CounterCell 類了,會創建一個這個對象,通常對象的 volatile value 屬性是 1。在計算 size 的時候,會將 baseCount 和 CounterCell 數組中的元素的 value 累加,得到總的大小,但這個數字仍舊可能是不準確的。
?
所以在1.8中的 size 實現比1.7簡單多,因為元素個數保存baseCount 中,部分元素的變化個數保存在 CounterCell 數組中,實現如下:
1. 初始化時 counterCells 為空,在并發量很高時,如果存在兩個線程同時執行 CAS 修改baseCount值,則失敗的線程會繼續執行方法體中的邏輯,使用CounterCell 記錄元素個數的變化;
2. 如果 CounterCell 數組 counterCells 為空,調用 fullAddCount() 方法進行初始化,并插入對應的記錄數,通過 CAS 設置cellsBusy字段,只有設置成功的線程才能初始化 CounterCell 數組,實現如下:
3. 如果通過 CAS 設置cellsBusy字段失敗的話,則繼續嘗試通過 CAS 修改 baseCount 字段,如果修改 baseCount 字段成功的話,就退出循環,否則繼續循環插入 CounterCell 對象;
通過累加?baseCount 和 CounterCell 數組中的數量,即可得到元素的總個數;
?
ConcurrentHashMap1.7和ConcurrentHashMap1.8的比較:
1.數據結構:1.8取消了Segment分段鎖的數據結構,取而代之的是數組+鏈表+紅黑樹的結構。
2.保證線程安全機制:JDK1.7采用segment的分段鎖機制實現線程安全,其中segment繼承自ReentrantLock。JDK1.8采用CAS+Synchronized保證線程安全。
3.鎖的粒度:原來是對需要進行數據操作的Segment加鎖,現調整為對每個數組元素加鎖(Node)。
4.鏈表轉化為紅黑樹:定位結點的hash算法簡化會帶來弊端,Hash沖突加劇,因此1.8在鏈表節點數量大于8時,會將鏈表轉化為紅黑樹進行存儲。
5.查詢時間復雜度:從原來的遍歷鏈表O(n),變成遍歷紅黑樹O(logN)。
總結
以上是生活随笔為你收集整理的Java集合:ConcurrentHashMap(JDK 1.7 JDK 1.8)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java集合:HashMap线程不安全?
- 下一篇: 数据库:MySQL索引总结