ConcurrentHashMap源码分析(转载)
我們知道哈希表是一種非常高效的數據結構,設計優良的哈希函數可以使其上的增刪改查操作達到O(1)級別。Java為我們提供了一個現成的哈希結構,那就是HashMap類,在前面的文章中我曾經介紹過HashMap類,知道它的所有方法都未進行同步,因此在多線程環境中是不安全的。為此,Java為我們提供了另外一個HashTable類,它對于多線程同步的處理非常簡單粗暴,那就是在HashMap的基礎上對其所有方法都使用synchronized關鍵字進行加鎖。
這種方法雖然簡單,但導致了一個問題,那就是在同一時間內只能由一個線程去操作哈希表。即使這些線程都只是進行讀操作也必須要排隊,這在競爭激烈的多線程環境中極為影響性能。本篇介紹的ConcurrentHashMap就是為了解決這個問題的,它的內部使用分段鎖將鎖進行細粒度化,從而使得多個線程能夠同時操作哈希表,這樣極大的提高了性能。
下圖是其內部結構的示意圖。
1.?ConcurrentHashMap有哪些成員變量?
//默認初始化容量 static?final?int?DEFAULT_INITIAL_CAPACITY?=?16;//默認加載因子 static?final?float?DEFAULT_LOAD_FACTOR?=?0.75f;//默認并發級別 static?final?int?DEFAULT_CONCURRENCY_LEVEL?=?16;//集合最大容量 static?final?int?MAXIMUM_CAPACITY?=?1?<<?30;//分段鎖的最小數量 static?final?int?MIN_SEGMENT_TABLE_CAPACITY?=?2;//分段鎖的最大數量 static?final?int?MAX_SEGMENTS?=?1?<<?16;//加鎖前的重試次數 static?final?int?RETRIES_BEFORE_LOCK?=?2;//分段鎖的掩碼值 final?int?segmentMask;//分段鎖的移位值 final?int?segmentShift;//分段鎖數組 final?Segment<K,V>[]?segments;在閱讀完本篇文章之前,相信讀者不能理解這些成員變量的具體含義和作用,不過請讀者們耐心看下去,后面將會在具體場景中一一介紹到這些成員變量的作用。在這里讀者只需對這些成員變量留個眼熟即可。
但是仍有個別變量是我們現在需要了解的,例如Segment數組代表分段鎖集合,并發級別則代表分段鎖的數量(也意味有多少線程可以同時操作),初始化容量代表整個容器的容量,加載因子代表容器元素可以達到多滿的一種程度。
2.?分段鎖的內部結構是怎樣的?
//分段鎖 static?final?class?Segment<K,V>?extends?ReentrantLock?implements?Serializable?{//自旋最大次數static?final?int?MAX_SCAN_RETRIES?=?Runtime.getRuntime().availableProcessors()?>?1???64?:?1;//哈希表transient?volatile?HashEntry<K,V>[]?table;//元素總數transient?int?count;//修改次數transient?int?modCount;//元素閥值transient?int?threshold;//加載因子final?float?loadFactor;//省略以下內容... }Segment是ConcurrentHashMap的靜態內部類,可以看到它繼承自ReentrantLock,因此它在本質上是一個鎖。它在內部持有一個HashEntry數組(哈希表),并且保證所有對該數組的增刪改查方法都是線程安全的,具體是怎樣實現的后面會講到。
所有對ConcurrentHashMap的增刪改查操作都可以委托Segment來進行,因此ConcurrentHashMap能夠保證在多線程環境下是安全的。又因為不同的Segment是不同的鎖,所以多線程可以同時操作不同的Segment,也就意味著多線程可以同時操作ConcurrentHashMap,這樣就能避免HashTable的缺陷,從而極大的提高性能。
3.?ConcurrentHashMap初始化時做了些什么?
//核心構造器 @SuppressWarnings("unchecked") public?ConcurrentHashMap(int?initialCapacity,?float?loadFactor,?int?concurrencyLevel)?{if?(!(loadFactor?>?0)?||?initialCapacity?<?0?||?concurrencyLevel?<=?0)?{throw?new?IllegalArgumentException();}//確保并發級別不大于限定值if?(concurrencyLevel?>?MAX_SEGMENTS)?{concurrencyLevel?=?MAX_SEGMENTS;}int?sshift?=?0;int?ssize?=?1;//保證ssize為2的冪,?且是最接近的大于等于并發級別的數while?(ssize?<?concurrencyLevel)?{++sshift;ssize?<<=?1;}//計算分段鎖的移位值this.segmentShift?=?32?-?sshift;//計算分段鎖的掩碼值this.segmentMask?=?ssize?-?1;//總的初始容量不能大于限定值if?(initialCapacity?>?MAXIMUM_CAPACITY)?{initialCapacity?=?MAXIMUM_CAPACITY;}//獲取每個分段鎖的初始容量int?c?=?initialCapacity?/?ssize;//分段鎖容量總和不小于初始總容量if?(c?*?ssize?<?initialCapacity)?{++c;}int?cap?=?MIN_SEGMENT_TABLE_CAPACITY;//保證cap為2的冪,?且是最接近的大于等于c的數while?(cap?<?c)?{cap?<<=?1;}//新建一個Segment對象模版Segment<K,V>?s0?=?new?Segment<K,V>(loadFactor,?(int)(cap?*?loadFactor),?(HashEntry<K,V>[])new?HashEntry[cap]);//新建指定大小的分段鎖數組Segment<K,V>[]?ss?=?(Segment<K,V>[])new?Segment[ssize];//使用UnSafe給數組第0個元素賦值UNSAFE.putOrderedObject(ss,?SBASE,?s0);this.segments?=?ss; }ConcurrentHashMap有多個構造器,但是上面貼出的是它的核心構造器,其他構造器都通過調用它來完成初始化。核心構造器需要傳入三個參數,分別是初始容量,加載因子和并發級別。在前面介紹成員變量時我們可以知道默認的初始容量為16,加載因子為0.75f,并發級別為16。
現在我們看到核心構造器的代碼,首先是通過傳入的concurrencyLevel來計算出ssize,ssize是Segment數組的長度,它必須保證是2的冪,這樣就可以通過hash&ssize-1來計算分段鎖在數組中的下標。
由于傳入的concurrencyLevel不能保證是2的冪,所以不能直接用它來當作Segment數組的長度,因此我們要找到一個最接近concurrencyLevel的2的冪,用它來作為數組的長度。假如現在傳入的concurrencyLevel=15,通過上面代碼可以計算出ssize=16,sshift=4。接下來立馬可以算出segmentShift=16,segmentMask=15。注意這里的segmentShift是分段鎖的移位值,segmentMask是分段鎖的掩碼值,這兩個值是用來計算分段鎖在數組中的下標,在下面我們會講到。
在算出分段鎖的個數ssize之后,就可以根據傳入的總容量來計算每個分段鎖的容量,它的值c?=?initialCapacity?/?ssize。分段鎖的容量也就是HashEntry數組的長度,同樣也必須保證是2的冪,而上面算出的c的值不能保證這一點,所以不能直接用c作為HashEntry數組的長度,需要另外找到一個最接近c的2的冪,將這個值賦給cap,然后用cap來作為HashEntry數組的長度。現在我們有了ssize和cap,就可以新建分段鎖數組Segment[]和元素數組HashEntry[]了。注意,與JDK1.6不同是的,在JDK1.7中只新建了Segment數組,并沒有對它初始化,初始化Segment的操作留到了插入操作時進行。
4.?通過怎樣的方式來定位鎖和定位元素?
//根據哈希碼獲取分段鎖 @SuppressWarnings("unchecked") private?Segment<K,V>?segmentForHash(int?h)?{long?u?=?(((h?>>>?segmentShift)?&?segmentMask)?<<?SSHIFT)?+?SBASE;return?(Segment<K,V>)?UNSAFE.getObjectVolatile(segments,?u); }//根據哈希碼獲取元素 @SuppressWarnings("unchecked") static?final?<K,V>?HashEntry<K,V>?entryForHash(Segment<K,V>?seg,?int?h)?{HashEntry<K,V>[]?tab;return?(seg?==?null?||?(tab?=?seg.table)?==?null)???null?:(HashEntry<K,V>)?UNSAFE.getObjectVolatile(tab,?((long)(((tab.length?-?1)?&?h))?<<?TSHIFT)?+?TBASE); }在JDK1.7中是通過UnSafe來獲取數組元素的,因此這里比JDK1.6多了些計算數組元素偏移量的代碼,這些代碼我們暫時不關注,現在我們只需知道下面這兩點:
-
通過哈希碼計算分段鎖在數組中的下標:(h?>>>?segmentShift)?&?segmentMask。
-
通過哈希碼計算元素在數組中的下標:(tab.length?-?1)?&?h。
現在我們假設傳給構造器的兩個參數為initialCapacity=128,?concurrencyLevel=16。根據計算可以得到ssize=16,?sshift=4,segmentShift=28,segmentMask=15。同樣,算得每個分段鎖內的HashEntry數組的長度為8,所以tab.length-1=7。根據這些值,我們通過下圖來解釋如何根據同一個哈希碼來定位分段鎖和元素。
?
可以看到分段鎖和元素的定位都是通過元素的哈希碼來決定的。定位分段鎖是取哈希碼的高位值(從32位處取起),定位元素是取的哈希碼的低位值。現在有個問題,它們一個從32位的左端取起,一個從32位的右端取起,那么會在某個時刻產生沖突嗎?我們在成員變量里可以找到MAXIMUM_CAPACITY?=?1?<<?30,MAX_SEGMENTS?=?1?<<?16,這說明定位分段鎖和定位元素使用的總的位數不超過30,并且定位分段鎖使用的位數不超過16,所以至少還隔著2位的空余,因此是不會產生沖突的。
5.?查找元素具體是怎樣實現的?
//根據key獲取value public?V?get(Object?key)?{Segment<K,V>?s;HashEntry<K,V>[]?tab;//使用哈希函數計算哈希碼int?h?=?hash(key);//根據哈希碼計算分段鎖的索引long?u?=?(((h?>>>?segmentShift)?&?segmentMask)?<<?SSHIFT)?+?SBASE;//獲取分段鎖和對應的哈希表if?((s?=?(Segment<K,V>)UNSAFE.getObjectVolatile(segments,?u))?!=?null?&&?(tab?=?s.table)?!=?null)?{//根據哈希碼獲取鏈表頭結點,?再對鏈表進行遍歷for?(HashEntry<K,V>?e?=?(HashEntry<K,V>)?UNSAFE.getObjectVolatile(tab,?((long)(((tab.length?-?1)?&?h))?<<?TSHIFT)?+?TBASE);e?!=?null;?e?=?e.next)?{K?k;//根據key和hash找到對應元素后返回value值if?((k?=?e.key)?==?key?||?(e.hash?==?h?&&?key.equals(k)))?{return?e.value;}}}return?null; }在JDK1.6中分段鎖的get方法是通過下標來訪問數組元素的,而在JDK1.7中是通過UnSafe的getObjectVolatile方法來讀取數組中的元素。為啥要這樣做?
我們知道雖然Segment對象持有的HashEntry數組引用是volatile類型的,但是數組內的元素引用不是volatile類型的,因此多線程對數組元素的修改是不安全的,可能會在數組中讀取到尚未構造完成的對象。
在JDK1.6中是通過第二次加鎖讀取來保證安全的,而JDK1.7中通過UnSafe的getObjectVolatile方法來讀取同樣也是為了保證這一點。使用getObjectVolatile方法讀取數組元素需要先獲得元素在數組中的偏移量,在這里根據哈希碼計算得到分段鎖在數組中的偏移量為u,然后通過偏移量u來嘗試讀取分段鎖。由于分段鎖數組在構造時沒進行初始化,因此可能讀出來一個空值,所以需要先進行判斷。
在確定分段鎖和它內部的哈希表都不為空之后,再通過哈希碼讀取HashEntry數組的元素,根據上面的結構圖可以看到,這時獲得的是鏈表的頭結點。之后再從頭到尾的對鏈表進行遍歷查找,如果找到對應的值就將其返回,否則就返回null。以上就是整個查找元素的過程。
6.?插入元素具體是怎樣實現的?
//向集合添加鍵值對(若存在則替換) @SuppressWarnings("unchecked") public?V?put(K?key,?V?value)?{Segment<K,V>?s;//傳入的value不能為空if?(value?==?null)?throw?new?NullPointerException();//使用哈希函數計算哈希碼int?hash?=?hash(key);//根據哈希碼計算分段鎖的下標int?j?=?(hash?>>>?segmentShift)?&?segmentMask;//根據下標去嘗試獲取分段鎖if?((s?=?(Segment<K,V>)UNSAFE.getObject(segments,?(j?<<?SSHIFT)?+?SBASE))?==?null)?{//獲得的分段鎖為空就去構造一個s?=?ensureSegment(j);}//調用分段鎖的put方法return?s.put(key,?hash,?value,?false); }//向集合添加鍵值對(不存在才添加) @SuppressWarnings("unchecked") public?V?putIfAbsent(K?key,?V?value)?{Segment<K,V>?s;//傳入的value不能為空if?(value?==?null)?throw?new?NullPointerException();//使用哈希函數計算哈希碼int?hash?=?hash(key);//根據哈希碼計算分段鎖的下標int?j?=?(hash?>>>?segmentShift)?&?segmentMask;//根據下標去嘗試獲取分段鎖if?((s?=?(Segment<K,V>)UNSAFE.getObject(segments,?(j?<<?SSHIFT)?+?SBASE))?==?null)?{//獲得的分段鎖為空就去構造一個s?=?ensureSegment(j);}//調用分段鎖的put方法return?s.put(key,?hash,?value,?true); }ConcurrentHashMap中有兩個添加鍵值對的方法,通過put方法添加時如果存在則會進行覆蓋,通過putIfAbsent方法添加時如果存在則不進行覆蓋,這兩個方法都是調用分段鎖的put方法來完成操作,只是傳入的最后一個參數不同而已。
在上面代碼中我們可以看到首先是根據key的哈希碼來計算出分段鎖在數組中的下標,然后根據下標使用UnSafe類getObject方法來讀取分段鎖。由于在構造ConcurrentHashMap時沒有對Segment數組中的元素初始化,所以可能讀到一個空值,這時會先通過ensureSegment方法新建一個分段鎖。獲取到分段鎖之后再調用它的put方法完成添加操作,下面我們來看看具體是怎樣操作的。
//添加鍵值對 final?V?put(K?key,?int?hash,?V?value,?boolean?onlyIfAbsent)?{//嘗試獲取鎖,?若失敗則進行自旋HashEntry<K,V>?node?=?tryLock()???null?:?scanAndLockForPut(key,?hash,?value);V?oldValue;try?{HashEntry<K,V>[]?tab?=?table;//計算元素在數組中的下標int?index?=?(tab.length?-?1)?&?hash;//根據下標獲取鏈表頭結點HashEntry<K,V>?first?=?entryAt(tab,?index);for?(HashEntry<K,V>?e?=?first;;)?{//遍歷鏈表尋找該元素,?找到則進行替換if?(e?!=?null)?{K?k;if?((k?=?e.key)?==?key?||?(e.hash?==?hash?&&?key.equals(k)))?{oldValue?=?e.value;//根據參數決定是否替換舊值if?(!onlyIfAbsent)?{e.value?=?value;++modCount;}break;}e?=?e.next;//沒找到則在鏈表添加一個結點}?else?{//將node結點插入鏈表頭部if?(node?!=?null)?{node.setNext(first);}?else?{node?=?new?HashEntry<K,V>(hash,?key,?value,?first);}//插入結點后將元素總是加1int?c?=?count?+?1;//元素超過閥值則進行擴容if?(c?>?threshold?&&?tab.length?<?MAXIMUM_CAPACITY)?{rehash(node);//否則就將哈希表指定下標替換為node結點}?else?{setEntryAt(tab,?index,?node);}++modCount;count?=?c;oldValue?=?null;break;}}}?finally?{unlock();}return?oldValue; }為保證線程安全,分段鎖中的put操作是需要進行加鎖的,所以線程一開始就會去獲取鎖,如果獲取成功就繼續執行,若獲取失敗則調用scanAndLockForPut方法進行自旋,在自旋過程中會先去掃描哈希表去查找指定的key,如果key不存在就會新建一個HashEntry返回,這樣在獲取到鎖之后就不必再去新建了,為的是在等待鎖的過程中順便做些事情,不至于白白浪費時間,可見作者的良苦用心。
具體自旋方法我們后面再細講,現在先把關注點拉回來,線程在成功獲取到鎖之后會根據計算到的下標,獲取指定下標的元素。此時獲取到的是鏈表的頭結點,如果頭結點不為空就對鏈表進行遍歷查找,找到之后再根據onlyIfAbsent參數的值決定是否進行替換。
如果遍歷沒找到就會新建一個HashEntry指向頭結點,此時如果自旋時創建了HashEntry,則直接將它的next指向當前頭結點,如果自旋時沒有創建就在這里新建一個HashEntry并指向頭結點。在向鏈表添加元素之后檢查元素總數是否超過閥值,如果超過就調用rehash進行擴容,沒超過的話就直接將數組對應下標的元素引用指向新添加的node。setEntryAt方法內部是通過調用UnSafe的putOrderedObject方法來更改數組元素引用的,這樣就保證了其他線程在讀取時可以讀到最新的值。
7.?刪除元素具體是怎樣實現的?
//刪除指定元素(找到對應元素后直接刪除) public?V?remove(Object?key)?{//使用哈希函數計算哈希碼int?hash?=?hash(key);//根據哈希碼獲取分段鎖的索引Segment<K,V>?s?=?segmentForHash(hash);//調用分段鎖的remove方法return?s?==?null???null?:?s.remove(key,?hash,?null); }//刪除指定元素(查找值等于給定值才刪除) public?boolean?remove(Object?key,?Object?value)?{//使用哈希函數計算哈希碼int?hash?=?hash(key);Segment<K,V>?s;//確保分段鎖不為空才調用remove方法return?value?!=?null?&&?(s?=?segmentForHash(hash))?!=?null?&&?s.remove(key,?hash,?value)?!=?null; }ConcurrentHashMap提供了兩種刪除操作,一種是找到后直接刪除,一種是找到后先比較再刪除。這兩種刪除方法都是先根據key的哈希碼找到對應的分段鎖后,再通過調用分段鎖的remove方法完成刪除操作。下面我們來看看分段鎖的remove方法。
//刪除指定元素 final?V?remove(Object?key,?int?hash,?Object?value)?{//嘗試獲取鎖,?若失敗則進行自旋if?(!tryLock())?{scanAndLock(key,?hash);}V?oldValue?=?null;try?{HashEntry<K,V>[]?tab?=?table;//計算元素在數組中的下標int?index?=?(tab.length?-?1)?&?hash;//根據下標取得數組元素(鏈表頭結點)HashEntry<K,V>?e?=?entryAt(tab,?index);HashEntry<K,V>?pred?=?null;//遍歷鏈表尋找要刪除的元素while?(e?!=?null)?{K?k;//next指向當前結點的后繼結點HashEntry<K,V>?next?=?e.next;//根據key和hash尋找對應結點if?((k?=?e.key)?==?key?||?(e.hash?==?hash?&&?key.equals(k)))?{V?v?=?e.value;//傳入的value不等于v就跳過,?其他情況就進行刪除操作if?(value?==?null?||?value?==?v?||?value.equals(v))?{//如果pred為空則代表要刪除的結點為頭結點if?(pred?==?null)?{//重新設置鏈表頭結點setEntryAt(tab,?index,?next);}?else?{//設置pred結點的后繼為next結點pred.setNext(next);}++modCount;--count;//記錄元素刪除之前的值oldValue?=?v;}break;}//若e不是要找的結點就將pred引用指向它pred?=?e;//檢查下一個結點e?=?next;}}?finally?{unlock();}return?oldValue; }在刪除分段鎖中的元素時需要先獲取鎖,如果獲取失敗就調用scanAndLock方法進行自旋,如果獲取成功就執行下一步,首先計算數組下標然后通過下標獲取HashEntry數組的元素,這里獲得了鏈表的頭結點,接下來就是對鏈表進行遍歷查找,在此之前先用next指針記錄當前結點的后繼結點,然后對比key和hash看看是否是要找的結點,如果是的話就執行下一個if判斷。
滿足value為空或者value的值等于結點當前值這兩個條件就會進入到if語句中進行刪除操作,否則直接跳過。在if語句中執行刪除操作時會有兩種情況,如果當前結點為頭結點則直接將next結點設置為頭結點,如果當前結點不是頭結點則將pred結點的后繼設置為next結點。這里的pred結點表示當前結點的前繼結點,每次在要檢查下一個結點之前就將pred指向當前結點,這就保證了pred結點總是當前結點的前繼結點。
注意,與JDK1.6不同,在JDK1.7中HashEntry對象的next變量不是final的,因此這里可以通過直接修改next引用的值來刪除元素,由于next變量是volatile類型的,所以讀線程可以馬上讀到最新的值。
8.?替換元素具體是怎樣實現的?
//替換指定元素(CAS操作) public?boolean?replace(K?key,?V?oldValue,?V?newValue)?{//使用哈希函數計算哈希碼int?hash?=?hash(key);//保證oldValue和newValue不為空if?(oldValue?==?null?||?newValue?==?null)?throw?new?NullPointerException();//根據哈希碼獲取分段鎖的索引Segment<K,V>?s?=?segmentForHash(hash);//調用分段鎖的replace方法return?s?!=?null?&&?s.replace(key,?hash,?oldValue,?newValue); }//替換元素操作(CAS操作) final?boolean?replace(K?key,?int?hash,?V?oldValue,?V?newValue)?{//嘗試獲取鎖,?若失敗則進行自旋if?(!tryLock())?{scanAndLock(key,?hash);}boolean?replaced?=?false;try?{HashEntry<K,V>?e;//通過hash直接找到頭結點然后對鏈表遍歷for?(e?=?entryForHash(this,?hash);?e?!=?null;?e?=?e.next)?{K?k;//根據key和hash找到要替換的結點if?((k?=?e.key)?==?key?||?(e.hash?==?hash?&&?key.equals(k)))?{//如果指定的當前值正確則進行替換if?(oldValue.equals(e.value))?{e.value?=?newValue;++modCount;replaced?=?true;}//否則不進行任何操作直接返回break;}}}?finally?{unlock();}return?replaced; }ConcurrentHashMap同樣提供了兩種替換操作,一種是找到后直接替換,另一種是找到后先比較再替換(CAS操作)。這兩種操作的實現大致是相同的,只是CAS操作在替換前多了一層比較操作,因此我們只需簡單了解其中一種操作即可。
這里拿CAS操作進行分析,還是老套路,首先根據key的哈希碼找到對應的分段鎖,然后調用它的replace方法。進入分段鎖中的replace方法后需要先去獲取鎖,如果獲取失敗則進行自旋,如果獲取成功則進行下一步。首先根據hash碼獲取鏈表頭結點,然后根據key和hash進行遍歷查找,找到了對應的元素之后,比較給定的oldValue是否是當前值,如果不是則放棄修改,如果是則用新值進行替換。由于HashEntry對象的value域是volatile類型的,因此可以直接替換。
9.?自旋時具體做了些什么?
//自旋等待獲取鎖(put操作) private?HashEntry<K,V>?scanAndLockForPut(K?key,?int?hash,?V?value)?{//根據哈希碼獲取頭結點HashEntry<K,V>?first?=?entryForHash(this,?hash);HashEntry<K,V>?e?=?first;HashEntry<K,V>?node?=?null;int?retries?=?-1;//在while循環內自旋while?(!tryLock())?{HashEntry<K,V>?f;if?(retries?<?0)?{//如果頭結點為空就新建一個nodeif?(e?==?null)?{if?(node?==?null)?{node?=?new?HashEntry<K,V>(hash,?key,?value,?null);}retries?=?0;//否則就遍歷鏈表定位該結點}?else?if?(key.equals(e.key))?{retries?=?0;}?else?{e?=?e.next;}//retries每次在這加1,?并判斷是否超過最大值}?else?if?(++retries?>?MAX_SCAN_RETRIES)?{lock();break;//retries為偶數時去判斷first有沒有改變}?else?if?((retries?&?1)?==?0?&&?(f?=?entryForHash(this,?hash))?!=?first)?{e?=?first?=?f;retries?=?-1;}}return?node; }//自旋等待獲取鎖(remove和replace操作) private?void?scanAndLock(Object?key,?int?hash)?{//根據哈希碼獲取鏈表頭結點HashEntry<K,V>?first?=?entryForHash(this,?hash);HashEntry<K,V>?e?=?first;int?retries?=?-1;//在while循環里自旋while?(!tryLock())?{HashEntry<K,V>?f;if?(retries?<?0)?{//遍歷鏈表定位到該結點if?(e?==?null?||?key.equals(e.key))?{retries?=?0;}?else?{e?=?e.next;}//retries每次在這加1,?并判斷是否超過最大值}?else?if?(++retries?>?MAX_SCAN_RETRIES)?{lock();break;//retries為偶數時去判斷first有沒有改變}?else?if?((retries?&?1)?==?0?&&?(f?=?entryForHash(this,?hash))?!=?first)?{e?=?first?=?f;retries?=?-1;}} }在前面我們講到過,分段鎖中的put,remove,replace這些操作都會要求先去獲取鎖,只有成功獲得鎖之后才能進行下一步操作,如果獲取失敗就會進行自旋。
自旋操作也是在JDK1.7中添加的,為了避免線程頻繁的掛起和喚醒,以此提高并發操作時的性能。在put方法中調用的是scanAndLockForPut,在remove和replace方法中調用的是scanAndLock。這兩種自旋方法大致是相同的,這里我們只分析scanAndLockForPut方法。首先還是先根據hash碼獲得鏈表頭結點,之后線程會進入while循環中執行,退出該循環的唯一方式是成功獲取鎖,而在這期間線程不會被掛起。
剛進入循環時retries的值為-1,這時線程不會馬上再去嘗試獲取鎖,而是先去尋找到key對應的結點(沒找到會新建一個),然后再將retries設為0,接下來就會一次次的嘗試獲取鎖,對應retries的值也會每次加1,直到超過最大嘗試次數如果還沒獲取到鎖,就會調用lock方法進行阻塞獲取。在嘗試獲取鎖的期間,還會每隔一次(retries為偶數)去檢查頭結點是否被改變,如果被改變則將retries重置回-1,然后再重走一遍剛才的流程。這就是線程自旋時所做的操作,需注意的是如果在自旋時檢測到頭結點已被改變,則會延長線程的自旋時間。
10.?哈希表擴容時都做了哪些操作?
//再哈希 @SuppressWarnings("unchecked") private?void?rehash(HashEntry<K,V>?node)?{//獲取舊哈希表的引用HashEntry<K,V>[]?oldTable?=?table;//獲取舊哈希表的容量int?oldCapacity?=?oldTable.length;//計算新哈希表的容量(為舊哈希表的2倍)int?newCapacity?=?oldCapacity?<<?1;//計算新的元素閥值threshold?=?(int)(newCapacity?*?loadFactor);//新建一個HashEntry數組HashEntry<K,V>[]?newTable?=?(HashEntry<K,V>[])?new?HashEntry[newCapacity];//生成新的掩碼值int?sizeMask?=?newCapacity?-?1;//遍歷舊表的所有元素for?(int?i?=?0;?i?<?oldCapacity?;?i++)?{//取得鏈表頭結點HashEntry<K,V>?e?=?oldTable[i];if?(e?!=?null)?{HashEntry<K,V>?next?=?e.next;//計算元素在新表中的索引int?idx?=?e.hash?&?sizeMask;//next為空表明鏈表只有一個結點if?(next?==?null)?{//直接把該結點放到新表中newTable[idx]?=?e;}else?{HashEntry<K,V>?lastRun?=?e;int?lastIdx?=?idx;//定位lastRun結點,?將lastRun之后的結點直接放到新表中for?(HashEntry<K,V>?last?=?next;?last?!=?null;?last?=?last.next)?{int?k?=?last.hash?&?sizeMask;if?(k?!=?lastIdx)?{lastIdx?=?k;lastRun?=?last;}}newTable[lastIdx]?=?lastRun;//遍歷在鏈表lastRun結點之前的元素,?將它們依次復制到新表中for?(HashEntry<K,V>?p?=?e;?p?!=?lastRun;?p?=?p.next)?{V?v?=?p.value;int?h?=?p.hash;int?k?=?h?&?sizeMask;HashEntry<K,V>?n?=?newTable[k];newTable[k]?=?new?HashEntry<K,V>(h,?p.key,?v,?n);}}}}//計算傳入結點在新表中的下標int?nodeIndex?=?node.hash?&?sizeMask;//將傳入結點添加到鏈表頭結點node.setNext(newTable[nodeIndex]);//將新表指定下標元素換成傳入結點newTable[nodeIndex]?=?node;//將哈希表引用指向新表table?=?newTable; }rehash方法在put方法中被調用,我們知道在put方法時會新建元素并添加到哈希數組中,隨著元素的增多發生哈希沖突的可能性越大,哈希表的性能也會隨之下降。因此每次put操作時都會檢查元素總數是否超過閥值,如果超過則調用rehash方法進行擴容。
因為數組長度一旦確定則不能再被改變,因此需要新建一個數組來替換原先的數組。從代碼中可以知道新創建的數組長度為原數組的2倍(oldCapacity?<<?1)。創建好新數組后需要將舊數組中的所有元素移到新數組中,因此需要計算每個元素在新數組中的下標。計算新下標的過程如下圖所示。
我們知道下標直接取的是哈希碼的后幾位,由于新數組的容量是直接用舊數組容量右移1位得來的,因此掩碼位數向右增加1位,取到的哈希碼位數也向右增加1位。如上圖,若舊的掩碼值為111,則元素下標為101,擴容后新的掩碼值為1111,則計算出元素的新下標為0101。
由于同一條鏈表上的元素下標是相同的,現在假設鏈表所有元素的下標為101,在擴容后該鏈表元素的新下標只有0101或1101這兩種情況,因此數組擴容會打亂原先的鏈表并將鏈表元素分成兩批。在計算出新下標后需要將元素移動到新數組中,在HashMap中通過直接修改next引用導致了多線程的死鎖。
雖然在ConcurrentHashMap中通過加鎖避免了這種情況,但是我們知道next域是volatile類型的,它的改動能立馬被讀線程讀取到,因此為保證線程安全采用復制元素來遷移數組。但是對鏈表中每個元素都進行復制有點影響性能,作者發現鏈表尾部有許多元素的next是不變的,它們在新數組中的下標是相同的,因此可以考慮整體移動這部分元素。具統計實際操作中只有1/6的元素是必須復制的,所以整體移動鏈表尾部元素(lastRun后面的元素)是可以提升一定性能的。
注:本篇文章基于JDK1.7版本。
?
?
總結
以上是生活随笔為你收集整理的ConcurrentHashMap源码分析(转载)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SpringMvc执行过程
- 下一篇: mysql语法中的LIMIT的用法