史上最详细的ConcurrentHashMap详解--源码分析
ps.本文所有源碼都是基于jdk1.6
首先說明一點,ConcurrentHashMap并不是可以完全替換Hashtable的,因為ConcurrentHashMap的get、clear函數是弱一致的(后面會說到),而Hashtable是強一致的。有作者是這么解釋的:我們將“一致性強度”和“擴展性”之間的對比交給用戶來權衡,所以大多數集合都提供了synchronized和concurrent兩個版本。不過真正需要“強一致性”的場景可能非常少,我們大多應用中ConcurrentHashMap是滿足的。
ConcurrentHashMap的數據結構
不得不說,ConcurrentHashMap設計的相當巧妙,它有多個段,每個段下面都是一個Hashtable(相似),所以每個段上都有一把鎖(分布式鎖),各個段之間的鎖互不影響,可以實現并發操作。話不多說,上代碼。
final Segment<K,V>[] segments;- 1
可以看到ConcurrentHashMap實際上就是一個Segment數組,那么Segment是什么呢?
static final class Segment<K,V> extends ReentrantLock implements Serializable {...transient volatile HashEntry<K,V>[] table; //發現Segment實際上是HashEntry數組... }- 1
- 2
- 3
- 4
- 5
那HashEntry又是什么呢?
static final class HashEntry<K,V> {final K key;final int hash;volatile V value;final HashEntry<K,V> next;... }- 1
- 2
- 3
- 4
- 5
- 6
- 7
HashEntry是一個單鏈表
所以ConcurrentHashMap的數據結構如下圖:
這里每一個segment所指向的數據結構,其實就是一個Hashtable,所以說每一個segment都有一把獨立的鎖,來保證當訪問不同的segment時互不影響。
ConcurrentHashMap的基礎方法
基礎方法分為這么幾種:
1、段內加鎖的:put,putIfAbsent,remove,replace等
2、不加鎖的:get,containsKey等
3、整個數據結構加鎖的:size,containsValue等
構造函數
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException(); //參數合法性校驗if (concurrencyLevel > MAX_SEGMENTS)concurrencyLevel = MAX_SEGMENTS; //比如我輸入的concurrencyLevel=12,那么sshift = 4,ssize =16,所以sshift是意思就是1左移了幾次比concurrencyLevel大,ssize就是那個大于等于concurrencyLevel的最小2的冪次方的數int sshift = 0;int ssize = 1;while (ssize < concurrencyLevel) {++sshift;ssize <<= 1; //ssize = ssize << 1 , ssize = ssize * 2}segmentShift = 32 - sshift;segmentMask = ssize - 1; //segmentMask的二進制是一個全是1的數 this.segments = Segment.newArray(ssize); //segment個數是ssize,也就是上圖黃色方塊數,默認16個if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;int c = initialCapacity / ssize;if (c * ssize < initialCapacity)++c;int cap = 1;while (cap < c)cap <<= 1; for (int i = 0; i < this.segments.length; ++i)this.segments[i] = new Segment<K,V>(cap, loadFactor);//上圖淺綠色塊的個數是cap,是一個2的冪次方的數,默認是1,也就是每個segment下都構造了cap大小的table數組 } Segment(int initialCapacity, float lf) {loadFactor = lf;setTable(HashEntry.<K,V>newArray(initialCapacity));//構造了一個initialCapacity大小的table }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
put函數
public V put(K key, V value) {if (value == null)throw new NullPointerException(); //明確指定value不能為nullint hash = hash(key.hashCode());return segmentFor(hash).put(key, hash, value, false); //segmentFor如下,定位segment下標,然后再put }final Segment<K,V> segmentFor(int hash) { //尋找segment的下標return segments[(hash >>> segmentShift) & segmentMask]; //前面說了segmentMask是一個2進制全是1的數,那么&segmentMask就保證了下標小于等于segmentMask,與HashMap的尋下標相似。 }//真正的put操作 V put(K key, int hash, V value, boolean onlyIfAbsent) {lock(); //先加鎖,可以看到,put操作是在segment里面加鎖的,也就是每個segment都可以加一把鎖try {int c = count;if (c++ > threshold) // ensure capacityrehash(); //判斷容量,如果不夠了就擴容HashEntry<K,V>[] tab = table; //將table賦給一個局部變量tab,這是因為table是volatile變量,讀寫volatile變量的開銷很大,編譯器也不能對volatile變量的讀寫做任何優化,直接多次訪問非volatile實例變量沒有多大影響,編譯器會做相應優化。int index = hash & (tab.length - 1); //尋找table的下標HashEntry<K,V> first = tab[index];HashEntry<K,V> e = first;while (e != null && (e.hash != hash || !key.equals(e.key)))e = e.next; //遍歷單鏈表,找到key相同的為止,如果沒找到,e指向鏈表尾V oldValue;if (e != null) { //如果有相同的key,那么直接替換oldValue = e.value;if (!onlyIfAbsent)e.value = value;}else { //否則在鏈表表頭插入新的結點oldValue = null;++modCount;tab[index] = new HashEntry<K,V>(key, hash, first, value);count = c; // write-volatile}return oldValue;} finally {unlock();} }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
該方法也是在持有段鎖的情況下執行的,首先判斷是否需要rehash,需要就先rehash,擴容都是針對單個段的,也就是單個段的數據數量大于設定的量的時候會觸發擴容。接著是找是否存在同樣一個key的結點,如果存在就直接替換這個結點的值。否則創建一個新的結點并添加到hash鏈的頭部,這時一定要修改modCount和count的值,同樣修改count的值一定要放在最后一步。put方法調用了rehash方法,reash方法實現得也很精巧,主要利用了table的大小為2^n,和HashMap的擴容基本一樣,這里就不介紹了。
還有一個叫putIfAbsent(K key, V value)的函數,這個函數的實現和put幾乎一模一樣,作用是,如果map中不存在這個key,那么插入這個數據,如果存在這個key,那么不覆蓋原來的value,也就是不插入了。
remove函數
>remove操作也交給了段內的remove,如下: V remove(Object key, int hash, Object value) {lock(); //段內先獲得一把鎖try {int c = count - 1;HashEntry<K,V>[] tab = table;int index = hash & (tab.length - 1);HashEntry<K,V> first = tab[index];HashEntry<K,V> e = first;while (e != null && (e.hash != hash || !key.equals(e.key)))e = e.next;V oldValue = null;if (e != null) { //如果找到該keyV v = e.value;if (value == null || value.equals(v)) {oldValue = v;// All entries following removed node can stay// in list, but all preceding ones need to be// cloned.++modCount;HashEntry<K,V> newFirst = e.next; //newFirst此時為要刪除結點的nextfor (HashEntry<K,V> p = first; p != e; p = p.next)newFirst = new HashEntry<K,V>(p.key,p.hash,newFirst, p.value);//從頭遍歷鏈表將要刪除結點的前面所有結點復制一份插入到newFirst之前,如下圖tab[index] = newFirst;count = c; // write-volatile}}return oldValue;} finally {unlock();} }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
為什么用這么方式刪除呢,細心的同學會發現上面定義的HashEntry的key和next都是final類型的,所以不能改變next的指向,所以又復制了一份指向刪除的結點的next。
get函數
public V get(Object key) {int hash = hash(key.hashCode()); //雙hash,和HashMap一樣,也是為了更好的離散化return segmentFor(hash).get(key, hash); //先尋找segment的下標,然后再get }final Segment<K,V> segmentFor(int hash) { //尋找segment的下標return segments[(hash >>> segmentShift) & segmentMask]; //前面說了segmentMask是一個2進制全是1的數,那么&segmentMask就保證了下標小于等于segmentMask,與HashMap的尋下標相似。 }V get(Object key, int hash) {// count是一個volatile變量,count非常巧妙,每次put和remove之后的最后一步都要更新count,就是為了get的時候不讓編譯器對代碼進行重排序,來保證if (count != 0) { HashEntry<K,V> e = getFirst(hash); //尋找table的下標,也就是鏈表的表頭while (e != null) {if (e.hash == hash && key.equals(e.key)) {V v = e.value;if (v != null)return v;return readValueUnderLock(e); // recheck 加鎖讀,這個加鎖讀不用重新計算位置,而是直接拿e的值}e = e.next;}}return null; }HashEntry<K,V> getFirst(int hash) {HashEntry<K,V>[] tab = table;return tab[hash & (tab.length - 1)]; }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
讀完上面代碼我有一個疑問,就是如果找到的key對應的value是null的話,加鎖再讀一次,既然上面put操作不允許value是null,那讀到的value為什么會有null的情況呢?我們分析一下這種情況,就是put操作和get操作同時進行的時候,可以分為兩種情況:
1、put的key已經存在,由于value不是final的,可以直接更新,且value是volatile的,所以修改會立馬對get線程可見,而不用等到put方法結束。
2、put的key不存在,那么將在鏈表表頭插入一個數據,那么將new HashEntry賦值給tab[index]是否能立刻對執行get的線程可見呢,我們知道每次put完之后都要更新一個count變量(寫),而每次get數據的時候,再最一開始都要讀一個count變量(讀),而且發現這個count是volatile的,而對同一個volatile變量,有volatile寫 happens-before volatile讀,所以如果寫發生在讀之前,那么new HashEntry賦值給tab[index]是對get線程可見的,但是如果寫沒有發生在讀之前,就無法保證new HashEntry賦值給tab[index]要先于get函數的getFirst(hash),也就是說,如果某個Segment實例中的put將一個Entry加入到了table中,在未執行count賦值操作之前有另一個線程執行了同一個Segment實例中的get,來獲取這個剛加入的Entry中的value,那么是有可能取不到的,這也就是get的弱一致性。但是什么時候會找到key但是讀到的value是null呢,仔細看下put操作的語句:tab[index] = new HashEntry(key, hash, first, value),在這條語句中,HashEntry構造函數中對value的賦值以及對tab[index]的賦值可能被重新排序,舉個例子就是這條語句有可能先執行對key賦值,再執行對tab[index]的賦值,最后對value賦值,如果在對tab和key都賦值但是對value還沒賦值的情況下的get就是一個空值。
詳細可以看看這篇文章:http://ifeve.com/concurrenthashmap-weakly-consistent/
這也就是說無鎖的get操作是一個弱一致性的操作。
clear函數
public void clear() {for (int i = 0; i < segments.length; ++i) //循環clear掉每個段中的內容segments[i].clear(); } void clear() {if (count != 0) {lock(); //段內加鎖try {HashEntry<K,V>[] tab = table;for (int i = 0; i < tab.length ; i++)tab[i] = null;++modCount;count = 0; // write-volatile} finally {unlock();}} }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
因為沒有全局的鎖,在清除完一個segments之后,正在清理下一個segments的時候,已經清理segments可能又被加入了數據,因此clear返回的時候,ConcurrentHashMap中是可能存在數據的。因此,clear方法是弱一致的。
size函數
public int size() {final Segment<K,V>[] segments = this.segments;long sum = 0;long check = 0;int[] mc = new int[segments.length];// Try a few times to get accurate count. On failure due to// continuous async changes in table, resort to locking.for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {check = 0;sum = 0;int mcsum = 0;for (int i = 0; i < segments.length; ++i) {sum += segments[i].count; //循環相加每個段內數據的個數mcsum += mc[i] = segments[i].modCount; //循環相加每個段內的modCount}if (mcsum != 0) { //如果是0,代表根本沒有過數據更改,也就是size是0for (int i = 0; i < segments.length; ++i) {check += segments[i].count; //再次循環相加每個段內數據的個數,這里為什么會再算一次呢,后面會說if (mc[i] != segments[i].modCount) { check = -1; // force retry //如果modCount和之前統計的不一致了,check直接賦值-1,重新再來break;}}}if (check == sum)break;}if (check != sum) { // Resort to locking all segmentssum = 0;for (int i = 0; i < segments.length; ++i) //循環獲取所有segment的鎖segments[i].lock();for (int i = 0; i < segments.length; ++i) //在持有所有段的鎖的時候進行count的相加sum += segments[i].count;for (int i = 0; i < segments.length; ++i) //循環釋放所有段的鎖segments[i].unlock();}if (sum > Integer.MAX_VALUE) //returnreturn Integer.MAX_VALUE;elsereturn (int)sum; }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
如果我們要統計整個ConcurrentHashMap里元素的大小,就必須統計所有Segment里元素的大小后求和。Segment里的全局變量count是一個volatile變量,那么在多線程場景下,我們是不是直接把所有Segment的count相加就可以得到整個ConcurrentHashMap大小了呢?不是的,雖然相加時可以獲取每個Segment的count的最新值,但是拿到之后可能累加前使用的count發生了變化,那么統計結果就不準了。所以最安全的做法,是在統計size的時候把所有Segment的put,remove和clean方法全部鎖住,但是這種做法顯然非常低效,因為在累加count操作過程中,之前累加過的count發生變化的幾率非常小,所以ConcurrentHashMap的做法是先嘗試2次通過不鎖住Segment的方式來統計各個Segment大小,如果統計的過程中,容器的count發生了變化,則再采用加鎖的方式來統計所有Segment的大小。那么ConcurrentHashMap是如何判斷在統計的時候容器是否發生了變化呢?使用modCount變量,在put , remove和clear方法里操作元素前都會將變量modCount進行加1,那么在統計size前后比較modCount是否發生變化,從而得知容器的大小是否發生變化。size()的實現還有一點需要注意,必須要先segments[i].count,才能segments[i].modCount,這是因為segment[i].count是對volatile變量的訪問,接下來segments[i].modCount才能得到幾乎最新的值,這里和get方法的方式是一樣的,也是一個volatile寫 happens-before volatile讀的問題。
上面18行代碼,拋出了一個問題,就是為什么會再算一遍,上面說只需要比較modCount不變不就可以了么?但是仔細分析,就會發現,在13行14行代碼那里,計算完count之后,計算modCount之前有可能count的值又變了,那么18行的代碼主要是解決這個問題。
containsValue函數
containsKey函數比較簡單,也是不加鎖的簡易get,下面說一下containsValue有一個有意思的地方
public boolean containsValue(Object value) {if (value == null)throw new NullPointerException();// See explanation of modCount use abovefinal Segment<K,V>[] segments = this.segments;int[] mc = new int[segments.length];// Try a few times without lockingfor (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {int sum = 0;int mcsum = 0;for (int i = 0; i < segments.length; ++i) {int c = segments[i].count; //注意這行,聲明了一個c變量,并且賦值了,但是下面卻完全沒有用到。mcsum += mc[i] = segments[i].modCount;if (segments[i].containsValue(value))return true;}boolean cleanSweep = true; //默認結構沒變if (mcsum != 0) {for (int i = 0; i < segments.length; ++i) {int c = segments[i].count;if (mc[i] != segments[i].modCount) { //如果結構變了,cleanSweep = falsecleanSweep = false;break;}}}if (cleanSweep) //如果沒變,直接返回falsereturn false;}// Resort to locking all segmentsfor (int i = 0; i < segments.length; ++i)segments[i].lock();boolean found = false;try {for (int i = 0; i < segments.length; ++i) {if (segments[i].containsValue(value)) {found = true;break;}}} finally {for (int i = 0; i < segments.length; ++i)segments[i].unlock();}return found; }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
注意上面代碼15行處,里面有語句int c = segments[i].count; 但是c卻從來沒有被使用過,即使如此,編譯器也不能做優化將這條語句去掉,因為存在對volatile變量count的讀取,這條語句存在的唯一目的就是保證segments[i].modCount讀取到幾乎最新的值,上面有說道這個問題。
本文章參考文章:
1、http://ifeve.com/concurrenthashmap-weakly-consistent/
2、http://www.iteye.com/topic/344876
總結
以上是生活随笔為你收集整理的史上最详细的ConcurrentHashMap详解--源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Three.js中添加指南针
- 下一篇: cad指北针lisp_用CAD里的LIS