JDK1.7ConcurrentHashMap源码分析
機制:分段加鎖
Segment
Segment的定義如下:
/*** Segments are specialized versions of hash tables. This* subclasses from ReentrantLock opportunistically, just to* simplify some locking and avoid separate construction.*/static final class Segment<K,V> extends ReentrantLock implements Serializable {/** Segments maintain a table of entry lists that are always* kept in a consistent state, so can be read (via volatile* reads of segments and tables) without locking. This* requires replicating nodes when necessary during table* resizing, so the old lists can be traversed by readers* still using old version of table.** This class defines only mutative methods requiring locking.* Except as noted, the methods of this class perform the* per-segment versions of ConcurrentHashMap methods. (Other* methods are integrated directly into ConcurrentHashMap* methods.) These mutative methods use a form of controlled* spinning on contention via methods scanAndLock and* scanAndLockForPut. These intersperse tryLocks with* traversals to locate nodes. The main benefit is to absorb* cache misses (which are very common for hash tables) while* obtaining locks so that traversal is faster once* acquired. We do not actually use the found nodes since they* must be re-acquired under lock anyway to ensure sequential* consistency of updates (and in any case may be undetectably* stale), but they will normally be much faster to re-locate.* Also, scanAndLockForPut speculatively creates a fresh node* to use in put if no node is found.*/private static final long serialVersionUID = 2249069246763182397L;/*** The maximum number of times to tryLock in a prescan before* possibly blocking on acquire in preparation for a locked* segment operation. On multiprocessors, using a bounded* number of retries maintains cache acquired while locating* nodes.*/static final int MAX_SCAN_RETRIES =Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;/*** The per-segment table. Elements are accessed via* entryAt/setEntryAt providing volatile semantics.*/transient volatile HashEntry<K,V>[] table;/*** The number of elements. Accessed only either within locks* or among other volatile reads that maintain visibility.*/transient int count;/*** The total number of mutative operations in this segment.* Even though this may overflows 32 bits, it provides* sufficient accuracy for stability checks in CHM isEmpty()* and size() methods. Accessed only either within locks or* among other volatile reads that maintain visibility.*/transient int modCount;/*** The table is rehashed when its size exceeds this threshold.* (The value of this field is always <tt>(int)(capacity ** loadFactor)</tt>.)*/transient int threshold;/*** The load factor for the hash table. Even though this value* is same for all segments, it is replicated to avoid needing* links to outer object.* @serial*/final float loadFactor;Segment(float lf, int threshold, HashEntry<K,V>[] tab) {this.loadFactor = lf;this.threshold = threshold;this.table = tab;}final V put(K key, int hash, V value, boolean onlyIfAbsent) {HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);V oldValue;try {HashEntry<K,V>[] tab = table;int index = (tab.length - 1) & hash;HashEntry<K,V> first = entryAt(tab, index);for (HashEntry<K,V> e = first;;) {if (e != null) {K k;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {oldValue = e.value;if (!onlyIfAbsent) {e.value = value;++modCount;}break;}e = e.next;}else {if (node != null)node.setNext(first);elsenode = new HashEntry<K,V>(hash, key, value, first);int c = count + 1;if (c > threshold && tab.length < MAXIMUM_CAPACITY)rehash(node);elsesetEntryAt(tab, index, node);++modCount;count = c;oldValue = null;break;}}} finally {unlock();}return oldValue;}/*** Doubles size of table and repacks entries, also adding the* given node to new table*/@SuppressWarnings("unchecked")private void rehash(HashEntry<K,V> node) {/** Reclassify nodes in each list to new table. Because we* are using power-of-two expansion, the elements from* each bin must either stay at same index, or move with a* power of two offset. We eliminate unnecessary node* creation by catching cases where old nodes can be* reused because their next fields won't change.* Statistically, at the default threshold, only about* one-sixth of them need cloning when a table* doubles. The nodes they replace will be garbage* collectable as soon as they are no longer referenced by* any reader thread that may be in the midst of* concurrently traversing table. Entry accesses use plain* array indexing because they are followed by volatile* table write.*/HashEntry<K,V>[] oldTable = table;int oldCapacity = oldTable.length;int newCapacity = oldCapacity << 1;threshold = (int)(newCapacity * loadFactor);HashEntry<K,V>[] newTable =(HashEntry<K,V>[]) new HashEntry[newCapacity];int sizeMask = newCapacity - 1;for (int i = 0; i < oldCapacity ; i++) {HashEntry<K,V> e = oldTable[i];if (e != null) {HashEntry<K,V> next = e.next;int idx = e.hash & sizeMask;if (next == null) // Single node on listnewTable[idx] = e;else { // Reuse consecutive sequence at same slotHashEntry<K,V> lastRun = e;int lastIdx = idx;for (HashEntry<K,V> last = next;last != null;last = last.next) {int k = last.hash & sizeMask;if (k != lastIdx) {lastIdx = k;lastRun = last;}}newTable[lastIdx] = lastRun;// Clone remaining nodesfor (HashEntry<K,V> p = e; p != lastRun; p = p.next) {V v = p.value;int h = p.hash;int k = h & sizeMask;HashEntry<K,V> n = newTable[k];newTable[k] = new HashEntry<K,V>(h, p.key, v, n);}}}}int nodeIndex = node.hash & sizeMask; // add the new nodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex] = node;table = newTable;}/*** Scans for a node containing given key while trying to* acquire lock, creating and returning one if not found. Upon* return, guarantees that lock is held. UNlike in most* methods, calls to method equals are not screened: Since* traversal speed doesn't matter, we might as well help warm* up the associated code and accesses as well.** @return a new node if key not found, else null*/private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {HashEntry<K,V> first = entryForHash(this, hash);HashEntry<K,V> e = first;HashEntry<K,V> node = null;int retries = -1; // negative while locating nodewhile (!tryLock()) {HashEntry<K,V> f; // to recheck first belowif (retries < 0) {if (e == null) {if (node == null) // speculatively create nodenode = new HashEntry<K,V>(hash, key, value, null);retries = 0;}else if (key.equals(e.key))retries = 0;elsee = e.next;}else if (++retries > MAX_SCAN_RETRIES) {lock();break;}else if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) {e = first = f; // re-traverse if entry changedretries = -1;}}return node;}/*** Scans for a node containing the given key while trying to* acquire lock for a remove or replace operation. Upon* return, guarantees that lock is held. Note that we must* lock even if the key is not found, to ensure sequential* consistency of updates.*/private void scanAndLock(Object key, int hash) {// similar to but simpler than scanAndLockForPutHashEntry<K,V> first = entryForHash(this, hash);HashEntry<K,V> e = first;int retries = -1;while (!tryLock()) {HashEntry<K,V> f;if (retries < 0) {if (e == null || key.equals(e.key))retries = 0;elsee = e.next;}else if (++retries > MAX_SCAN_RETRIES) {lock();break;}else if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) {e = first = f;retries = -1;}}}/*** Remove; match on key only if value null, else match both.*/final V remove(Object key, int hash, Object value) {if (!tryLock())scanAndLock(key, hash);V oldValue = null;try {HashEntry<K,V>[] tab = table;int index = (tab.length - 1) & hash;HashEntry<K,V> e = entryAt(tab, index);HashEntry<K,V> pred = null;while (e != null) {K k;HashEntry<K,V> next = e.next;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {V v = e.value;if (value == null || value == v || value.equals(v)) {if (pred == null)setEntryAt(tab, index, next);elsepred.setNext(next);++modCount;--count;oldValue = v;}break;}pred = e;e = next;}} finally {unlock();}return oldValue;}final boolean replace(K key, int hash, V oldValue, V newValue) {if (!tryLock())scanAndLock(key, hash);boolean replaced = false;try {HashEntry<K,V> e;for (e = entryForHash(this, hash); e != null; e = e.next) {K k;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {if (oldValue.equals(e.value)) {e.value = newValue;++modCount;replaced = true;}break;}}} finally {unlock();}return replaced;}final V replace(K key, int hash, V value) {if (!tryLock())scanAndLock(key, hash);V oldValue = null;try {HashEntry<K,V> e;for (e = entryForHash(this, hash); e != null; e = e.next) {K k;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {oldValue = e.value;e.value = value;++modCount;break;}}} finally {unlock();}return oldValue;}final void clear() {lock();try {HashEntry<K,V>[] tab = table;for (int i = 0; i < tab.length ; i++)setEntryAt(tab, i, null);++modCount;count = 0;} finally {unlock();}}}Segment內部持有HashMap的字段,比如長度、加載因子、閾值等等,并且其中方法包含put、remove、replace,所以可以猜測,ConcurrentHashMap將方法接口委托給了Segment,下面可以具體分析。Segment繼承自ReentrantLock,擁有可重入鎖的性質。
ConcurrentHashMap構造方法
ConcurrentHashMap的構造方法主要需要對三個字段進行賦值,分別是容量、加載因子和并發參數,其中前兩個參數好理解,后一個參數下面具體介紹。
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (concurrencyLevel > MAX_SEGMENTS)concurrencyLevel = MAX_SEGMENTS;// ssize最終是2的指數倍數,如果并發因素為16,那么該ssize將會得到16int sshift = 0;int ssize = 1;while (ssize < concurrencyLevel) {++sshift;ssize <<= 1;}this.segmentShift = 32 - sshift;this.segmentMask = ssize - 1;//判斷初始容量if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;int c = initialCapacity / ssize;if (c * ssize < initialCapacity)++c;int cap = MIN_SEGMENT_TABLE_CAPACITY;while (cap < c)cap <<= 1;// create segments and segments[0]Segment<K,V> s0 =new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]);Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]this.segments = ss;}public ConcurrentHashMap(int initialCapacity, float loadFactor) {this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);}public ConcurrentHashMap(int initialCapacity) {this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);}public ConcurrentHashMap() {this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);}public ConcurrentHashMap(Map<? extends K, ? extends V> m) {this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,DEFAULT_INITIAL_CAPACITY),DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);putAll(m);}從上面可以看到,默認的ConcurrentHashMap的初始容量為16,加載因子為0.75,并發參數為16。其余的方法分別可以設置這幾個值,最終都會調用第一個構造方法。
如果調用默認的構造方法,那么最終得到的segments將是一個尺寸為16的數組,并且第一個元素為s0,其第三個參數是一個尺寸為2的數組。
可以發現ConcurrentHashMap和1.8中區別,1.7中沒有table數組這樣的字段,只有segments這樣的字段
put(K,V)
ConcureentHashMap的put方法如下所示:
public V put(K key, V value) {Segment<K,V> s;//不允許value為nullif (value == null)throw new NullPointerException();//計算hash值int hash = hash(key);int j = (hash >>> segmentShift) & segmentMask;//如果Segment不存在,調用ensureSegment方法if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegments = ensureSegment(j);//調用Segment的put方法插入鍵值對return s.put(key, hash, value, false);}從上面的put方法可以得出幾點:
1. value值不允許為null
2. UNSAFE采取的是CAS算法實現的線程安全,一旦getObject為null了,即說明有并發了,那么將調用ensureSegment()使用自旋的方式獲取Segment。
下面首先看一下,ensureSegment()方法是如何最終返回一個Segment的。該方法的實現如下:
從ensureSegment()方法可以看到,根據索引去Segments中取Segment,如果還沒有創建Segment,那么將執行新建,然后自旋插入;而如果存在Segment,那么將自旋獲取該Segment。
其中創建的Segment與s0相同,而s0最初的狀態是在構造方法中指定的。
當Segment創建好后,再看Segment的put方法,其實現如下:
從上面可以看到Segment的put操作的流程:
1. 調用tryLock()方法獲取鎖,一旦獲取到鎖后,node為null,那么執行下面的插入操作;
2. 如果tryLock()方法獲取失敗,即目前有線程正在持有該Segment,那么調用scanAndLockForPut()方法;
3. 插入過程中,需要遍歷鏈表,如果是新節點,則會作為鏈表的頭節點
4. 插入一個節點后,如果需要進行rehash操作,則會調用rehash()方法,否則就是將鏈表更新到表中
5. 最后釋放鎖
下面看一下scanAndLockForPut()方法是如何在Segment被其他線程使用時掃描獲取到鎖的,其實現如下:
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {//得到待插入桶的頭節點HashEntry<K,V> first = entryForHash(this, hash);HashEntry<K,V> e = first;HashEntry<K,V> node = null;int retries = -1; // negative while locating node//不斷嘗試tryLock()方法while (!tryLock()) {//如果失敗HashEntry<K,V> f; // to recheck first belowif (retries < 0) {if (e == null) {if (node == null) // speculatively create nodenode = new HashEntry<K,V>(hash, key, value, null);retries = 0;}else if (key.equals(e.key))retries = 0;elsee = e.next;}//如果重試次數很多后,那么調用lock()方法加入到ReetrantLock的等待隊列中,跳出循環else if (++retries > MAX_SCAN_RETRIES) {lock();break;}else if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) {e = first = f; // re-traverse if entry changedretries = -1;}}return node;}從scanAndLockForPut()方法主要完成掃描和獲取鎖,一旦該方法返回,表明已經獲取到鎖了。
下面看一下rehash方法,看一個Segment中是如何進行rehash操作的,其實現如下:
從上面可以看到,rehash時容量會擴大一倍。在對舊元素重新hash獲取桶的位置時,不太明白為什么要做兩次遍歷,區分出連續的序列。完全可以使用另外的方法進行區分,比如1.8中的分配方法。
get(K)
看完了ConcurrentHashMap的put方法后,可以再看一下get方法是如何實現的,get方法是不加鎖的,其實現如下:
public V get(Object key) {Segment<K,V> s; // manually integrate access methods to reduce overheadHashEntry<K,V>[] tab;int h = hash(key);long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;//根據位置取Segment的索引if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {//遍歷for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) {K k;if ((k = e.key) == key || (e.hash == h && key.equals(k)))return e.value;}}return null;}ConcurrentHashMap的get方法是不加鎖的,所以只要Segment不為null,那么就做一個遍歷即可。
size()方法
由于ConcureentHashMap中管理Segment,而Segment又管理HashEntry數組,所以ConcurrentHashMap的size()方法應該是累加每一個Segment中的元素個數,其實現如下:
public int size() {//復制一份拷貝final Segment<K,V>[] segments = this.segments;int size;boolean overflow; // true if size overflows 32 bitslong sum; // sum of modCountslong last = 0L; // previous sumint retries = -1; // first iteration isn't retrytry {//死循環for (;;) {if (retries++ == RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)ensureSegment(j).lock(); // force creation}sum = 0L;size = 0;overflow = false;//遍歷Segmentsfor (int j = 0; j < segments.length; ++j) {Segment<K,V> seg = segmentAt(segments, j);if (seg != null) {sum += seg.modCount;int c = seg.count;if (c < 0 || (size += c) < 0)overflow = true;}}//如果兩次值一樣,那么認為該值一樣,返回if (sum == last)break;last = sum;}} finally {//如果之前加鎖了,那么需要對每一個Segment釋放鎖if (retries > RETRIES_BEFORE_LOCK) {for (int j = 0; j < segments.length; ++j)segmentAt(segments, j).unlock();}}//如果size超過了Integer.MAX_VALUE,那么將返回Integer.MAX_VALUEreturn overflow ? Integer.MAX_VALUE : size;}從上面可以看到size()方法如果在retries為0和1時兩次計算的sum值一樣,那么將會跳出循環返回該值;而如果兩次該值不相同,那么就會嘗試鎖住每一個Segment,然后再累加每一個segment中的數量。
最后在返回值的時候需要注意,如果值超過了Integer.MAX_VALUE,那么只會Integer.MAX_VALUE。
那么為什么會超過Integer.MAX_VALUE值呢?
這是因為每一個Segment中的最大元素個數為MAXIMUM_CAPACITY(2^30),而ConcurrentHashMap最多有MAX_SEGMENTS(2^16)個Segment,那么一個ConcurrentHashMap最多將會有(2^46)個元素,自然是可能超過int的最大值的。
總結
JDK1.7中ConcurrentHashMap采用的是借助于Segment的分段加鎖機制+CAS實現的線程安全,每一個Segment負責管理其內部的Table,每一個Segment其實類似于一個HashMap,其內部是線程安全的,因為其線程安全是外部Segment所提供的。
總結
以上是生活随笔為你收集整理的JDK1.7ConcurrentHashMap源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux中的高级网络控制
- 下一篇: 淘宝4位数七段显示器模块 (74HC59