Java7 ConcurrentHashMap源码浅析
Java中的哈希表主要包括:HashMap,HashTable,ConcurrentHashMap,LinkedHashMap和TreeMap等。HashMap是無序的,并且不是線程安全的,在多線程環境下會出現數據安全性問題,典型的問題是多線程同時rehash過程中產生的死循環問題。LinkedHashMap和TreeMap都是有序的,但是兩者的有序機制不同:LinkedHashMap是通過鏈表的結構保證元素有序,而TreeMap是一種紅黑樹結構,通過堆排序保證元素有序。在Java6中HashMap,HashTable和ConcurrentHashMap都是采用數組+鏈表的數據結構,在Java8之后則采用數組+鏈表+紅黑樹的數據結構。HashTable和ConcurrentHashMap都是線程安全的,保證線程安全無外乎加鎖,但是二者加鎖的粒度不通,HashTable整個表就一把鎖,它的get和put都是通過synchronized保證安全,在多線程競爭鎖激烈的情況下,會出現性能問題。本文講解的ConcurrentHashMap是Java7版本。
數據結構
Java7中ConcurrentHashMap采用數組+鏈表的數據結構,哈希表整體上是一個Segment的數組,而每個分段Segment又是一個HashEntry的數組,每個HashEntry是一個鏈表。
ConcurrentHashMap鎖分段
鎖粗化是鎖優化的一種重要措施,而鎖粗化又包含"lock-splitting"(鎖定拆分)和"lock-stripping"(鎖條帶化)。讀寫鎖分離是一種典型的鎖定拆分方式,JUC中的ReentrantReadWriteLock就是一種讀寫分離鎖,鎖定條帶化是指將一把“大鎖”拆分成若干個“小鎖”來降低鎖的競爭。ConcurrentHashMap就是通過鎖條帶化來做鎖的優化。我們都知道ConcurrentHashMap是分段的,它的表是一個Segment數組:
/*** The segments, each of which is a specialized hash table*/ final Segment<K,V>[] segments; 復制代碼而每個Segment都是繼承了一個ReentrantLock:
static final class Segment<K,V> extends ReentrantLock implements Serializable {...} 復制代碼所以ConcurrentHashMap的每個Segment都是一把鎖,不同的Segment之間的讀寫不構成競爭,大大降低了鎖的競爭。既然每個Segment都是一把鎖,那么這個segment數組的長度是多少呢?也就是說整個表我們需要多少把鎖呢?
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (concurrencyLevel > MAX_SEGMENTS) // MAX_SEGMENTS是指整個表最多能分成多少個segment,也即是多少把鎖concurrencyLevel = MAX_SEGMENTS;// Find power-of-two sizes best matching argumentsint sshift = 0;int ssize = 1;while (ssize < concurrencyLevel) { // 找到不小于我們指定的concurrencyLevel的2的冪次方的一個數作為segment數組的長度++sshift;ssize <<= 1;}segmentShift = 32 - sshift;segmentMask = ssize - 1;this.segments = Segment.newArray(ssize);if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;int c = initialCapacity / ssize;if (c * ssize < initialCapacity)++c;int cap = 1;while (cap < c)cap <<= 1;for (int i = 0; i < this.segments.length; ++i)this.segments[i] = new Segment<K,V>(cap, loadFactor); } 復制代碼在ConcurrentHashMap的構造函數中我們指定了concurrencyLevel,也即是多少把鎖。這個數量不能超過上限:MAX_SEGMENTS(1 << 16),鎖的個數必須是2的冪次方,如果我們指定的concurrencyLevel不是2的冪次方,構造函數會找到最接近的一個不小于我們指定的值的一個2的冪次方數作為segment數組長度。例如:我們指定concurrencyLevel為15,則最終segment數組長度為16,也即是表一共有16把鎖。設想兩個線程同時向表中插入元素,線程1插入的第0個segment,線程2插入的是第1個segment,線程1和線程2互不影響,能夠同時并行。但是HashTable就做不到這一點。
put操作
public V put(K key, V value) {Segment<K,V> s;if (value == null)throw new NullPointerException();int hash = hash(key);int j = (hash >>> segmentShift) & segmentMask; // 通過位運算得到segment的索引位置if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegments = ensureSegment(j);return s.put(key, hash, value, false); } 復制代碼ConcurrentHashMap不支持插入null的值,因此首先校驗value是否為null。如果value是null則拋出異常。 注意這里計算segment索引方式是: (hash >>> segmentShift) & segmentMask; 而不是hash % segment數組長度。這兒是一個優化:因為取模"%"操作相對位運算來說是很慢的,因此這里是用位運算來得到segment索引。而當segment數組長度是2的冪次方記為segmentSize時:hash % segmentSize == hash & (segmentSize - 1)。這里不做證明。因此segmentSize必須是2的冪次方。來看看Segment中的put()方法:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {HashEntry<K,V> node = tryLock() ? null : // 獲取segment的鎖,這里會有一個優化:獲取鎖的時候首先會通過 `tryLock()` 嘗試若干次scanAndLockForPut(key, hash, value); // 如果若干次之后還沒有獲取鎖,則用 `lock()` 方法阻塞等待,直到獲取鎖V oldValue;try {HashEntry<K,V>[] tab = table;int index = (tab.length - 1) & hash; // 得到segment的table的索引,也是通過位運算HashEntry<K,V> first = entryAt(tab, index); // table中index位置的first節點for (HashEntry<K,V> e = first;;) {if (e != null) {K k;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) { // 對應的key已經有了valueoldValue = e.value;if (!onlyIfAbsent) { // 是否覆蓋原來的valuee.value = value; // 覆蓋原來的value++modCount;}break;}e = e.next; // 遍歷}else {if (node != null)node.setNext(first); // 如果node已經在scanAndLockForPut()方法中初始化過elsenode = new HashEntry<K,V>(hash, key, value, first); // 如果node為null,則初始化int c = count + 1;if (c > threshold && tab.length < MAXIMUM_CAPACITY)rehash(node); // 如果超過閾值,則擴容elsesetEntryAt(tab, index, node); // 通過UNSAFE設置table數組的index位置的元素為node++modCount;count = c;oldValue = null;break;}}} finally {unlock();}return oldValue; } 復制代碼首先,會獲取segment的鎖,然后判斷添加元素后是否需要擴容。注意這里的擴容是指Segment中的HashEntry[] table表數組擴容,而不是最外層的segment[]數組擴容。segment[]數組是不可擴展的,在構造函數中已經確定了segment[]數組的長度。 接著同樣通過位運算得到待添加元素在HashEntry[] table數組中的位置。接著再判斷這個鏈表中是否已經存在這個key,如果存在并且onlyIfAbsent為false,就覆蓋原value;如果鏈表不存在key,則將新的node通過UNSAFE放到table數組指定的位置。
get操作
get操作比較簡單,不需要加鎖。可見性由volatile來保證:HashEntry的value是volatile的,Segment中的HashEntry[] table數組也是volatile。這保證了其他線程對哈希表的修改能夠及時地被讀線程發現。
public V get(Object key) {Segment<K,V> s; // manually integrate access methods to reduce overheadHashEntry<K,V>[] tab;int h = hash(key);long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 計算key應該落在segments數組的哪個segment中if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); // 計算key應該落在table的哪個位置e != null; e = e.next) {K k;if ((k = e.key) == key || (e.hash == h && key.equals(k))) // 如果key和當前節點的key指向同一塊內存地址或者當前節點的hashreturn e.value; // 等于key的hash并且key"equals"當前節點的key則說明當前節點就是目標節點}}return null; // key不在當前哈希表中,返回null } 復制代碼rehash
private void rehash(HashEntry<K,V> node) {/** Reclassify nodes in each list to new table. Because we* are using power-of-two expansion, the elements from* each bin must either stay at same index, or move with a* power of two offset. We eliminate unnecessary node* creation by catching cases where old nodes can be* reused because their next fields won't change.* Statistically, at the default threshold, only about* one-sixth of them need cloning when a table* doubles. The nodes they replace will be garbage* collectable as soon as they are no longer referenced by* any reader thread that may be in the midst of* concurrently traversing table. Entry accesses use plain* array indexing because they are followed by volatile* table write.*/HashEntry<K,V>[] oldTable = table;int oldCapacity = oldTable.length;int newCapacity = oldCapacity << 1; // 每次擴容成原來capacity的2倍,這樣元素在新的table中的索引要么不變要么是原來的索引加上2的一個倍數threshold = (int)(newCapacity * loadFactor); // 新的擴容閾值HashEntry<K,V>[] newTable =(HashEntry<K,V>[]) new HashEntry[newCapacity]; // 新的segment table數組int sizeMask = newCapacity - 1;for (int i = 0; i < oldCapacity ; i++) {HashEntry<K,V> e = oldTable[i];if (e != null) {HashEntry<K,V> next = e.next;int idx = e.hash & sizeMask;if (next == null) // Single node on listnewTable[idx] = e;else { // Reuse consecutive sequence at same slotHashEntry<K,V> lastRun = e;int lastIdx = idx;for (HashEntry<K,V> last = next; // 在拷貝原來鏈表的元素到新的table中時有個優化:通過遍歷找到原先鏈表中的lastRun節點,這個節點以及它的后續節點都不需要重新拷貝,直接放到新的table中就行last != null;last = last.next) {int k = last.hash & sizeMask;if (k != lastIdx) {lastIdx = k;lastRun = last;}}newTable[lastIdx] = lastRun; // lastRun節點以及lastRun后續節點都不需要重新拷貝,直接賦值引用// Clone remaining nodesfor (HashEntry<K,V> p = e; p != lastRun; p = p.next) { // 循環拷貝原先鏈表lastRun之前的節點到新的table鏈表中V v = p.value;int h = p.hash;int k = h & sizeMask;HashEntry<K,V> n = newTable[k];newTable[k] = new HashEntry<K,V>(h, p.key, v, n);}}}}int nodeIndex = node.hash & sizeMask; // add the new nodenode.setNext(newTable[nodeIndex]); // rehash之后,執行添加新的節點newTable[nodeIndex] = node;table = newTable; } 復制代碼由于rehash過程中是加排它鎖的,這樣其他的寫入請求將被阻塞等待。而對于讀請求,需要分情況討論:讀請求在rehash之前,此時segment中的table數組指針還是指向原先舊的數組,所以讀取是安全的;如果讀請求在rehash之后,因為table數組和HashEntry的value都是volatile,所以讀線程也能及時讀取到更新的值,因此也是線程安全的。所以rehash不會影響到讀。
remove操作
public V remove(Object key) {int hash = hash(key);Segment<K,V> s = segmentForHash(hash); // key落在哪個segment中return s == null ? null : s.remove(key, hash, null); // 如果segment為null,則說明哈希表中沒有key,直接返回null,否則調用Segment的remove }final V remove(Object key, int hash, Object value) { // Segment的remove方法if (!tryLock()) // 獲取Segment的鎖,套路還是一樣的首先進行若干次 `tryLock()`, 如果失敗了則通過 `lock()` 方法阻塞等待直到獲取鎖scanAndLock(key, hash);V oldValue = null;try {HashEntry<K,V>[] tab = table;int index = (tab.length - 1) & hash;HashEntry<K,V> e = entryAt(tab, index); // 找到key具體在table的哪個鏈表中,e代表鏈表當前節點HashEntry<K,V> pred = null; // pred代表e節點的前置節點while (e != null) {K k;HashEntry<K,V> next = e.next;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) { // 找到了這個key對應的HashEntryV v = e.value;if (value == null || value == v || value.equals(v)) {if (pred == null) // 如果當前節點的前置節點為空,說明要刪除的節點是當前鏈表的頭節點,直接將當前鏈表的頭節點指向當前節點的next就可以了setEntryAt(tab, index, next);elsepred.setNext(next); // 否則修改前置節點的next指針,指向當前節點的next節點,這樣當前節點將不再"可達",可以被GC回收++modCount;--count;oldValue = v;}break;}pred = e;e = next;}} finally {unlock(); // 解鎖}return oldValue; } 復制代碼remove時,首先會找到這個key落在哪個Segment中,如果key沒有落在任何一個Segment中,說明key不存在,直接返回null。找到具體的Segment后,調用Segment的remove方法來進行刪除:找到key落在Segment的table數組中的哪個鏈表中,遍歷鏈表,如果要刪除的節點是當前鏈表的頭節點,則直接修改鏈表的頭指針為當前節點的next節點;如果要刪除的節點不是頭節點,繼續遍歷找到目標節點,修改目標節點的前置節點的next指針指向目標節點的next節點完成操作。 安全性分析:remove時首先會加鎖,其他mutable請求都是會被阻塞的,對于讀請求也是安全的。如果讀取的key不是當前要刪除的key不會有任何問題。如果讀取的key恰巧是當前需要刪除key:讀請求在remove之前,這時可以讀取到;如果讀請求在remove操作之后,由于HashEntry的next指針都是volatile的,所以讀線程也是可以及時發現這個key已經被刪除了的。也是安全的。
size操作
ConcurrentHashMap的size操作在Java7實現還是比較有意思的。其首先會進行若干次嘗試,每次對各個Segment的count求和,如果任意前后兩次求和結果相同,則說明在這段時間之內各個Segment的元素個數沒有改變,直接返回當前的求和結果就行了。如果超過一定重試次數之后,會采取悲觀策略,直接鎖定各個Segment,然后依次求和。注意這里是鎖定所有Segment,因此在采取悲觀策略時整個哈希表都不能有寫入操作。這里先樂觀再悲觀的策略和前面的put操作中的scanAndLockForPut有異曲同工之妙。
public int size() {// Try a few times to get accurate count. On failure due to// continuous async changes in table, resort to locking.final Segment<K,V>[] segments = this.segments; // 首先不加鎖,每次對各個Segment的count累加求和,如果任意兩次的累加結果相同,則直接返回這個結果;超過一定的次數之后悲觀鎖定所有的Segment,再求和。鎖定之后整個哈希表不能有任何的寫入操作。int size;boolean overflow; // true if size overflows 32 bitslong sum; // sum of modCountslong last = 0L; // previous sumint retries = -1; // first iteration isn't retrytry {for (;;) {if (retries++ == RETRIES_BEFORE_LOCK) { // 最大樂觀重試次數for (int j = 0; j < segments.length; ++j)ensureSegment(j).lock(); // force creation}sum = 0L;size = 0;overflow = false;for (int j = 0; j < segments.length; ++j) { // 對各個Segment的count累加,不加鎖Segment<K,V> seg = segmentAt(segments, j);if (seg != null) {sum += seg.modCount;int c = seg.count;if (c < 0 || (size += c) < 0)overflow = true;}}if (sum == last) // 如果本次累加結果和上次相同,說明這中間沒有插入或者刪除操作,直接返回這個結果break;last = sum;}} finally {if (retries > RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)segmentAt(segments, j).unlock();}}return overflow ? Integer.MAX_VALUE : size; // 如果溢出,返回最大整型作為結果,否則返回累加結果 } 復制代碼總結
總結
以上是生活随笔為你收集整理的Java7 ConcurrentHashMap源码浅析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 云计算时代,数据中心架构三层到大二层的演
- 下一篇: 44. xargs命令