Java集合:ConcurrentHashMap详解
前言
近期深入學習了ConcurrentHashMap,便整理成一篇博文記錄一下,請注意:此博文針對的是JDK1.6,因此如果你看到的源碼跟我文中的不同,則可能是由于版本不一樣。
ConcurrentHashMap的鎖分段技術
HashTable容器在競爭激烈的并發環境下表現出效率低下的原因,是因為所有訪問HashTable的線程必須競爭同一把鎖。如果容器里有多把鎖,每一把鎖用于鎖容器的其中一部分數據,那么當多線程訪問容器里不同數據段的數據時,線程間就不會存在鎖競爭,從而可以有效的提高并發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術。首先將數據分成一段一段的存儲,然后給每一段數據配一把鎖,當一個線程占用鎖訪問其中一個段數據的時候,其他段的數據也能被其他線程訪問。
ConcurrentHashMap的結構
我們通過ConcurrentHashMap的類圖來分析ConcurrentHashMap的結構。
ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment是一種可重入鎖ReentrantLock,在ConcurrentHashMap里扮演鎖的角色,HashEntry則用于存儲鍵值對數據。一個ConcurrentHashMap里包含一個Segment數組,Segment的結構和HashMap類似,是一種數組和鏈表結構,一個Segment里包含一個HashEntry數組,每個HashEntry是一個鏈表結構的元素,每個Segment守護著一個HashEntry數組里的元素,當對HashEntry數組的數據進行修改時,必須首先獲得它對應的Segment鎖。
?
ConcurrentHashMap方法源碼解讀
請注意,如果一個方法中我貼了幾段代碼,那么一般是:第一段代碼為方法的入口,其他的為被入口方法調用過的方法。
1.初始化方法
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (concurrencyLevel > MAX_SEGMENTS)concurrencyLevel = MAX_SEGMENTS;// Find power-of-two sizes best matching argumentsint sshift = 0;int ssize = 1;while (ssize < concurrencyLevel) {++sshift;ssize <<= 1;}segmentShift = 32 - sshift;segmentMask = ssize - 1;this.segments = Segment.newArray(ssize);if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;int c = initialCapacity / ssize;if (c * ssize < initialCapacity)++c;int cap = 1;while (cap < c)cap <<= 1;for (int i = 0; i < this.segments.length; ++i)this.segments[i] = new Segment<K,V>(cap, loadFactor); }
代碼中的第一個while循環是用來計算segments數組的大小ssize(必須為2的N次方)。segmentShift和segmentMask是用來定位當前元素在哪個segment,前者用于移位,后者用于進行位與運算。第二個while循環是用來計算每個segment中HashEntry數組的大小cap(必須為2的N次方),最后對segments數組進行初始化。
對segments數組進行初始化的同時,也對segment類里面的HashEntry進行初始化,并給loadFactor和threshold賦值。
2.get方法
public V get(Object key) {int hash = hash(key.hashCode());return segmentFor(hash).get(key, hash); }
根據key的hashcode重新計算hash值(主要是為了減少hash沖突),通過segmentFor方法定位到具體的哪個segment,然后調用segment的get方法。
segmentFor方法是用來定位到具體的segment的,主要是通過使用hash值的高位與掩碼進行位運算,segmentShift和segmentMask是通過上文初始化方法計算而來。
根據hash值跟數組長度-1進行位與運算,定位到具體的HashEntry(getFirst方法),遍歷該HashEntry鏈表,找到鏈表中某個元素的hash值與傳入的hash值相同并且使用equals方法比較key相同的元素,如果該元素的value不為空,返回value值;如果為空,則嘗試在加鎖的情況下再讀一次。
get操作的高效之處在于整個get過程不需要加鎖,除非讀到的值是空的才會加鎖重讀,我們知道HashTable容器的get方法是需要加鎖的,那么ConcurrentHashMap的get操作是如何做到不加鎖的呢?原因是它的get方法里將要使用的共享變量都定義成volatile,如用于統計當前Segement大小的count字段和用于存儲值的HashEntry的value,定義成volatile的變量,能夠在線程之間保持可見性,能夠被多線程同時讀,并且保證不會讀到過期的值,但是只能被單線程寫(有一種情況可以被多線程寫,就是寫入的值不依賴于原值),在get操作里只需要讀不需要寫共享變量count和value,所以可以不用加鎖。之所以不會讀到過期的值,是根據Java內存模型的happen before原則,對volatile字段的寫入操作先于讀操作,即使兩個線程同時修改和獲取volatile變量,get操作也能拿到最新的值,這是用volatile替換鎖的經典應用場景。
3.put方法
public V put(K key, V value) {if (value == null)throw new NullPointerException();int hash = hash(key.hashCode());return segmentFor(hash).put(key, hash, value, false); }根據key的hashcode重新計算hash值(跟get方法一樣),通過segmentFor方法定位到具體的哪個segment,然后調用segment的put方法。
加鎖進行以下操作:判斷是否需要擴容,如果需要則調用rehash方法(下面有介紹)。根據hash值跟數組長度-1進行位與運算,定位到具體的HashEntry,遍歷該HashEntry,根據傳入的的key,使用equals方法找到需要的元素。如果能找到,則將該元素的value值覆蓋為傳入的value,否則將傳入的key、value、hash值作為一個新元素放在該HashEntry的頭部,最后進行解鎖。入參中的onlyIfAbsent為true時,表示如果該key已經存在value值,則不會覆蓋原value值。
lastRun:最后一個需要處理的元素的意思就是該元素之后的所有元素都跟該元素有相同的索引值(對于新表),所以只需要將該元素放到新表的對應位置,該元素之后的所有元素也就跟著到了新表的對應位置。相當于直接將該鏈表的最后一截(可能包含若干個元素)直接一次性移到了新表的某個位置。
如果整個循環結束,if?(k != lastIdx)?語句沒有成立過,就代表當前位置(oldTable[i])的整個HashEntry在新表中的索引位置是一致的,只需要移動一次即可將整個鏈表移到新表上。根據rehash方法中的那一大段注釋提到的“?Statistically, at the default threshold, only about one-sixth of them need cloning when a table doubles”(據統計,在默認閾值下,當表擴大為原來的兩倍時,只有約六分之一的元素需要克隆),可以想象,這個if語句沒有成立過的可能性應該是挺大的。
4.remove方法
public V remove(Object key) { int hash = hash(key.hashCode());return segmentFor(hash).remove(key, hash, null); }根據key的hashcode重新計算hash值(跟get方法一樣),通過segmentFor方法定位到具體的哪個segment,然后調用segment的remove方法。
加鎖進行以下操作:根據hash值跟數組長度-1進行位運算,定位到具體的HashEntry,遍歷該HashEntry,根據傳入的的key,使用equals方法找到需要的元素,進行以下操作。
該段代碼是remove方法中的片段,過程比較特殊,拿出來單獨討論。因為HashEntry使用final修飾,這意味著在第一次設置了next域之后便不能再改變它,因此,此處的remove操作是新建一個HashEntry并將它之前的節點全都克隆一次。至于HashEntry為什么要設置為不變性,這跟不變性的訪問不需要同步從而節省時間有關。
用實際例子看上面這段代碼更容易懂:
假設1:此時HashEntry為:1 2 3 4 5 6,其中1為鏈表頭,并且1.next = 2,2.next = 3以此類推。
假設2:此時e = 4,即根據key匹配到的元素4是即將remove掉的。
則上面這段代碼有以下流程:
HashEntry<K,V> ?newFirst = 4.next = 5
for( p = 1; p != 4; p++)
newFirst = new HashEntry(p.key, p.hash, newFirst, p.value);
此循環如下:
p = 1:newFirst = new HashEntry(1.key, 1.hash, 5, 1.value)
p = 2:newFirst = new HashEntry(2.key, 2.hash, 1, 2.value)
p = 3:newFirst = new HashEntry(3.key, 3.hash, 2, 3.value)
p = 4:結束循環
tab[index] = 3;
index為當前鏈表在HashEntry中的索引位置,所以此時HashEntry為:3 2 1 5 6,被remove的元素之前的元素順序顛倒了。
HashEntry<K,V>[] tab = table;
這句代碼是將table賦給一個局部變量tab,這是因為table是 volatile變量,讀寫volatile變量的開銷很大,編譯器也不能對volatile變量的讀寫做任何優化,直接多次訪問非volatile實例變量沒有多大影響,編譯器會做相應優化。
5.replace方法
public boolean replace(K key, V oldValue, V newValue) {if (oldValue == null || newValue == null)throw new NullPointerException();int hash = hash(key.hashCode());return segmentFor(hash).replace(key, hash, oldValue, newValue); }根據key的hashcode重新計算hash值(跟get方法一樣),通過segmentFor方法定位到具體的哪個segment,然后調用segment的replace方法。
加鎖進行以下操作:根據hash值跟數組長度-1進行位運算,定位到具體的HashEntry(getFirst方法),遍歷該HashEntry,使用equals方法比較傳入的key和鏈表中元素中的key,找到所需元素。如果能找到并且該元素的value跟傳入的oldValue相等,則將該元素的value替換成newValue。
6.clear方法
public void clear() {for (int i = 0; i < segments.length; ++i)segments[i].clear(); } void clear() {if (count != 0) {lock();try {HashEntry<K,V>[] tab = table;for (int i = 0; i < tab.length ; i++)tab[i] = null;++modCount;count = 0; // write-volatile} finally {unlock();}} }遍歷segments,對每一個segment進行清空操作:加鎖進行以下操作,遍歷HashEntry數組,將每個HashEntry設置為null,并將count設置為0。
7.size方法
public int size() {final Segment<K,V>[] segments = this.segments;long sum = 0;long check = 0;int[] mc = new int[segments.length];// Try a few times to get accurate count. On failure due to// continuous async changes in table, resort to locking.for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {check = 0;sum = 0;int mcsum = 0;for (int i = 0; i < segments.length; ++i) {//第一次統計sum += segments[i].count;mcsum += mc[i] = segments[i].modCount;}if (mcsum != 0) {for (int i = 0; i < segments.length; ++i) {//第二次統計check += segments[i].count;if (mc[i] != segments[i].modCount) {//modCount發生該變則結束當次嘗試check = -1; // force retrybreak;}}}if (check == sum)break;}if (check != sum) { // Resort to locking all segmentssum = 0;for (int i = 0; i < segments.length; ++i)segments[i].lock();for (int i = 0; i < segments.length; ++i)sum += segments[i].count;for (int i = 0; i < segments.length; ++i)segments[i].unlock();}if (sum > Integer.MAX_VALUE)return Integer.MAX_VALUE;elsereturn (int)sum; }先在不加鎖的情況下嘗試進行統計,如果兩次統計結果相同,并且兩次統計之間沒有任何對segment的修改操作(即每個segment的modCount沒有改變),則返回統計結果。否則,對每個segment進行加鎖,然后統計出結果,返回結果。
8.containsValue方法
public boolean containsValue(Object value) {if (value == null)throw new NullPointerException();// See explanation of modCount use abovefinal Segment<K,V>[] segments = this.segments;int[] mc = new int[segments.length];// Try a few times without lockingfor (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {int sum = 0;int mcsum = 0;for (int i = 0; i < segments.length; ++i) {int c = segments[i].count;mcsum += mc[i] = segments[i].modCount;if (segments[i].containsValue(value))//遍歷該segment里面的所有HashEntry的所有元素return true;}boolean cleanSweep = true;if (mcsum != 0) {for (int i = 0; i < segments.length; ++i) {int c = segments[i].count;if (mc[i] != segments[i].modCount) {//如果modCount發生改變則結束嘗試,進行加鎖操作cleanSweep = false;break;}}}if (cleanSweep) //cleanSweep為true表示所有segment的modCount沒有發生過改變return false;}// Resort to locking all segmentsfor (int i = 0; i < segments.length; ++i)segments[i].lock(); //對所有segment進行加鎖boolean found = false;try {for (int i = 0; i < segments.length; ++i) {if (segments[i].containsValue(value)) {//遍歷該segment里面的所有HashEntry的所有元素found = true;break;}}} finally {for (int i = 0; i < segments.length; ++i)segments[i].unlock();}return found; } boolean containsValue(Object value) {if (count != 0) { // read-volatileHashEntry<K,V>[] tab = table;int len = tab.length;for (int i = 0 ; i < len; i++) { //遍歷所有HashEntryfor (HashEntry<K,V> e = tab[i]; e != null; e = e.next) { //遍歷每個HashEntry的所有元素V v = e.value;if (v == null) // recheckv = readValueUnderLock(e);if (value.equals(v))return true;}}}return false; }先在不加鎖的情況下嘗試進行查找,遍歷所有segment的所有HashEntry的所有元素,如果找到則返回true,如果找不到且在遍歷期間沒有任何對segment的修改操作(即每個segment的modCount沒有改變)則返回false。如果在遍歷期間segment進行過修改操作,則結束不加鎖的嘗試。循環對每個segment進行加鎖,然后進行遍歷查找是否存在。
參考:
JDK1.6源碼
總結
以上是生活随笔為你收集整理的Java集合:ConcurrentHashMap详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: VB.NET编写一个“个人简历表”程序
- 下一篇: 2007执业医师成绩查询