currenthashmap扩容原理_ConcurrentHashMap实现原理和源码解读
前言
HashMap是java編程中最常用的數據結構之一,由于HashMap非線程安全,因此不適用于并發訪問的場景。JDK1.5之前,通常使用HashTable作為HashMap的線程安全版本,HashTable對讀寫進行全局加鎖,在高并發情況下會造成嚴重的鎖競爭和等待,極大地降低了系統的吞吐量,ConcurrentHashMap應運而生。
相比于Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap在線程安全的基礎上提供了更好的寫并發能力,并且讀操作(?get)通常不會阻塞,使得讀寫操作可并行執行,支持客戶端修改ConcurrentHashMap的并發訪問度,迭代期間也不會拋出?ConcurrentModificationException等等,ConcurrentHashMap有這么多優點,那么它有什么缺點嗎?有,一致性問題,這是當前所有分布式系統都面臨的問題。下面開始分析ConcurrentHashMap的實現原理。
ConcurrentHashMap實現原理
ConcurrentHashMap的基本策略是將table細分為多個Segment保存在數組segments中,每個Segment本身又是一個可并發的哈希表,同時每個Segment都是一把ReentrantLock鎖,只有在同一個Segment內才存在競態關系,不同的Segment之間沒有鎖競爭,這就是分段鎖機制。Segment內部擁有一個HashEntry數組,數組中的每個元素又是一個鏈表。下面看看ConcurrentHashMap整體結構圖:
為了減少占用空間,除了第一個Segment之外,剩余的Segment采用的是延遲初始化的機制,僅在第一次需要時才會創建(通過ensureSegment實現)。為了保證延遲初始化存在的可見性,訪問segments數組及table數組中的元素均通過volatile訪問,主要借助于Unsafe中原子操作getObjectVolatile來實現,此外,segments中segment的寫入,以及table中元素和next域的寫入均使用UNSAFE.putOrderedObject來完成。這些操作提供了AtomicReferenceArrays的功能。下面開始源碼之旅吧
成員變量
下面介紹ConcurrentHashMap中用到的成員域:
/**
* 默認初始容量
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;
/**
* 默認加載因子
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
/**
* 默認并發度,該參數會影響segments數組的長度
*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* 最大容量,構造ConcurrentHashMap時指定的大小超過該值則會使用該值替換,
* ConcurrentHashMap的大小必須是2的冪,且小于等于1<<30,以確??墒褂胕nt索引條目
*/
static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* 每個segment中table數組的最小長度,必須是2的冪,至少為2,以免延遲構造后立即調整大小
*/
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
/**
* 允許的最大segment數量,用于限定構造函數參數concurrencyLevel的邊界,必須是2的冪
*/
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
/**
* 非鎖定情況下調用size和containsValue方法的重試次數,避免由于table連續被修改導致無限重試
*/
static final int RETRIES_BEFORE_LOCK = 2;
/**
* 與當前實例相關聯的,用于key哈希碼的隨機值,以減少哈希沖突
*/
private transient final int hashSeed = randomHashSeed(this);
private static int randomHashSeed(ConcurrentHashMap instance) {
if (sun.misc.VM.isBooted() && Holder.ALTERNATIVE_HASHING) {
return sun.misc.Hashing.randomHashSeed(instance);
}
return 0;
}
/**
* 用于索引segment的掩碼值,key哈希碼的高位用于選擇segment
*/
final int segmentMask;
/**
* 用于索引segment偏移值
*/
final int segmentShift;
/**
* segments數組
*/
final Segment[] segments;
構造方法
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)//
concurrencyLevel = MAX_SEGMENTS;
// 尋找與給定參數concurrencyLevel匹配的最佳Segment數組ssize,必須是2的冪
// 如果concurrencyLevel是2的冪,那么最后選定的ssize就是concurrencyLevel
// 否則concurrencyLevel,ssize為大于concurrencyLevel最小2的冪值
// concurrencyLevel為7,則ssize為2的3次冪,為8
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//計算每個Segment中,table數組的初始大小
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 創建segments和第一個segment
Segment s0 =
new Segment(loadFactor, (int)(cap * loadFactor),
(HashEntry[])new HashEntry[cap]);
Segment[] ss = (Segment[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); //原子按順序寫入segments[0]
this.segments = ss;
}
/**
* map轉化為ConcurrentHashMap
*
* @param m the map
*/
public ConcurrentHashMap(Map extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
DEFAULT_INITIAL_CAPACITY),
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
putAll(m);
}
構造器中各個參數的含義:
initialCapacity:創建ConccurentHashMap對象的初始容量,即ConccurentHashMap中HashEntity的總數量,創建時未指定initialCapacity則默認為16,最大容量為MAXIMUM_CAPACITY。
loadFactor:負載因子,用于計算Segment的threshold域,
concurrencyLevel:即ConccurentHashMap的并發度,支持同時更新ConccurentHashMap且不發生鎖競爭的最大線程數。concurrencyLevel不能代表ConccurentHashMap實際并發度,ConccurentHashMap會使用大于等于該值的2的冪指數的最小值作為實際并發度,實際并發度即為segments數組的長度。創建時未指定concurrencyLevel則默認為16。
并發度對ConccurentHashMap性能具有舉足輕重的作用,如果并發度設置的過小,會帶來嚴重的鎖競爭問題;如果并發度設置的過大,原本位于同一個Segment內的訪問會擴散到不同的Segment中,CPU cache命中率會下降,從而引起程序性能下降。
原子方法
先看源碼:
/**
* 獲取給定table的第i個元素,使用volatile讀語義
*/
@SuppressWarnings("unchecked")
static final HashEntry entryAt(HashEntry[] tab, int i) {
return (tab == null) ? null :
(HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}
/**
* 設置給定table的第i個元素,使用volatile寫入語義
*/
static final void setEntryAt(HashEntry[] tab, int i,
HashEntry e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
/**
* 通過Unsafe提供的具有volatile元素訪問語義的操作獲取給定Segment數組的第j個元素(如果ss非空)
* 注意:因為Segment數組的每個元素只能設置一次(使用完全有序的寫入),
* 所以一些性能敏感的方法只能依靠此方法作為對空讀取的重新檢查。
*/
@SuppressWarnings("unchecked")
static final Segment segmentAt(Segment[] ss, int j) {
long u = (j << SSHIFT) + SBASE;
return ss == null ? null :
(Segment) UNSAFE.getObjectVolatile(ss, u);
}
/**
* 根據給定hash獲取segment
*/
@SuppressWarnings("unchecked")
private Segment segmentForHash(int h) {
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
return (Segment) UNSAFE.getObjectVolatile(segments, u);
}
/**
* 根據給定segment和hash獲取table entry
*/
@SuppressWarnings("unchecked")
static final HashEntry entryForHash(Segment seg, int h) {
HashEntry[] tab;
return (seg == null || (tab = seg.table) == null) ? null :
(HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
}
ConcurrentHashMap主要使用以上幾個方法對segments數組和table數組進行讀寫,并保證線程安全性。
其主要使用了UNSAFE.getObjectVolatile提供Volatile讀語義,UNSAFE.putOrderedObject提供了Volatile寫語義。下面分析這兩個方法帶來的好處:
UNSAFE.getObjectVolatile使得非volatile聲明的對象具有volatile讀的語義,那么要使非volatile聲明的對象具有volatile寫的語義則需要借助操作UNSAFE.putObjectvolatile。
那么UNSAFE.putOrderedObject操作的含義和作用又是什么呢?
為了控制特定條件下的指令重排序和內存可見性問題,Java編譯器使用一種叫內存屏障(Memory Barrier,或叫做內存柵欄,Memory Fence)的CPU指令來禁止指令重排序。java中volatile寫入使用了內存屏障中的LoadStore屏障規則,對于這樣的語句Load1; LoadStore; Store2,在Store2及后續寫入操作被刷出前,保證Load1要讀取的數據被讀取完畢。volatile的寫所插入的storeLoad是一個耗時的操作,因此出現了一個對volatile寫的升級版本,利用lazySet方法進行性能優化,在實現上對volatile的寫只會在之前插入StoreStore屏障,對于這樣的語句Store1; StoreStore; Store2,在Store2及后續寫入操作執行前,保證Store1的寫入操作對其它處理器可見,也就是按順序的寫入。UNSAFE.putOrderedObject正是提供了這樣的語義,避免了寫寫指令重排序,但不保證內存可見性,因此讀取時需借助volatile讀保證可見性。
ConcurrentHashMap正是利用了這些高性能的原子讀寫來避免加鎖帶來的開銷,從而大幅度提高了性能。
ensureSegment
ensureSegment用于確定指定的Segment是否存在,不存在則會創建。源碼如下:
@SuppressWarnings("unchecked")
private Segment ensureSegment(int k) {
final Segment[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment seg;
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry[] tab = (HashEntry[])new HashEntry[cap];
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment s = new Segment(lf, threshold, tab);
while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
使用getObjectVolatile()方法提供的原子讀語義獲取指定Segment,如果為空,以構造ConcurrentHashMap對象時創建的Segment為模板,創建新的Segment。ensureSegment在創建Segment期間為不斷使用getObjectVolatile()檢查指定Segment是否為空,防止其他線程已經創建了Segment。
HashEntry
開始介紹Segment之前,我們先看看HashEntry的結構:
static final class HashEntry {
final int hash;
final K key;
volatile V value;
volatile HashEntry next;
HashEntry(int hash, K key, V value, HashEntry next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
/**
* 使用volatile寫入語義設置next域
*/
final void setNext(HashEntry n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
// Unsafe mechanics
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
HashEntry將value和next聲明為volatile ,是為了保證內存可見性,也就是每次讀取都將從內存讀取最新的值,而不會從緩存讀取。同時,寫入next域也使用volatile寫入語義保證原子性。寫入使用原子性操作,讀取使用volatile,從而保證了多線程訪問的線程安全性。
Segment
Segment作為ConccurentHashMap的專用數據結構,同時擴展了ReentrantLock,使得Segment本身就是一把可重入鎖,方便執行鎖定。Segment內部持有一個始終處于一致狀態的entry列表,使得讀取操作無需加鎖(通過volatile讀table數組)。調整tables大小期間通過復制節點實現,使得舊版本的table仍然可以遍歷。
Segment僅定義需要加鎖的可變方法,針對ConcurrentHashMap中相應方法的調用都會被代理到Segment中的方法。這些可變方法使用scanAndLock和scanAndLockForPut在競爭中使用受控旋轉,也就是自旋次數受限制的自旋鎖。由于線程的阻塞與喚醒通常伴隨著上下文切換、CPU搶占等,都是開銷比較大的操作,使用自旋次數受限制的自旋鎖,可以提高獲取鎖的概率,降低線程阻塞的概率,這樣可極大地提升性能。之所以自旋次數受限制,是因為自旋會不斷的消耗CPU的時間,無限制的自旋會導致開銷增長。因此自旋鎖適用于多核CPU下,同時線程等待鎖的時間非常短;若等待某個鎖需要的時間較長,讓線程盡早進入阻塞才是正確的選擇。下面開始進入源碼:
Segment成員變量
/**
* 對segment加鎖時,在阻塞之前進行自旋的最大次數。
* 在多處理器上,使用有限數量的重試來維護在定位節點時獲取的高速緩存。
*/
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
/**
* 每個segment的table數組,訪問數組中的元素通過entryAt/setEntryAt提供的volatile語義來完成
*/
transient volatile HashEntry[] table;
/**
* 元素的數量,只能在鎖中或其他volatile讀保證可見性之間進行訪問
*/
transient int count;
/**
* 當前segment中可變操作發生的次數,put,remove等,可能會溢出32位
* 它為CHM isEmpty()和size()方法中的穩定性檢查提供了足夠的準確性。
* 只能在鎖中或其他volatile讀保證可見性之間進行訪問
*/
transient int modCount;
/**
* 當table大小超過此閾值時,對table進行擴容,值為(int)(capacity *loadFactor)
*/
transient int threshold;
/**
* 負載因子
*/
final float loadFactor;
/**
* 構造方法
*/
Segment(float lf, int threshold, HashEntry[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
scanAndLockForPut
先看流程圖:
看源碼:
private HashEntry scanAndLockForPut(K key, int hash, V value) {
HashEntry first = entryForHash(this, hash);//根據key的hash值查找頭節點
HashEntry e = first;
HashEntry node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {//嘗試獲取鎖,成功則直接返回,失敗則開始自旋
HashEntry f; // 用于后續重新檢查頭節點
if (retries < 0) {
if (e == null) {//結束遍歷節點
if (node == null) // 創建節點
node = new HashEntry(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))//找到節點,結束遍歷
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {//達到最大嘗試次數
lock();//進入加鎖方法,失敗則會進入排隊,阻塞當前線程
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // 頭節點變化,需要重新遍歷,說明有新節點加入或被移除
retries = -1;
}
}
return node;
}
分析:while循環每執行一次,都會嘗試獲取鎖,成功則會返回。retries 初始值設為-1是為了遍歷當前hash對應桶的鏈表,找到則停止遍歷,未找到則會預創建一個節點;同時,如果頭節點發生變化,則會重新進行遍歷,直到自旋次數大于MAX_SCAN_RETRIES,使用lock加鎖,獲取鎖失敗則會進入等待隊列。
為什么scanAndLockForPut中要遍歷一次鏈表?
前面已經提過scanAndLockForPut使用自旋次數受限制的自旋鎖進行優化加鎖的方式,此外,遍歷一次鏈表也是一種優化方法,主要是盡可能使當前鏈表中的節點進入CPU高速緩存,提高緩存命中率,以便獲取鎖定后的遍歷速度更快。實際上加鎖后并沒有使用已經找到的節點,因為它們必須在鎖定下重新獲取,以確保更新的順序一致性,但是遍歷一次后通??梢愿斓刂匦露ㄎ?。這是一種預熱優化的方式,scanAndLock中也使用了該優化方式。
scanAndLock內部實現方式與scanAndLockForPut相似,但比scanAndLockForPut更簡單,scanAndLock不需要預創建節點。因此scanAndLock主要用于remove和replace操作,而scanAndLockForPut則用于put,這里就不再貼源碼。
Segment?put
先看流程圖:
put源碼:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry first = entryAt(tab, index);
for (HashEntry e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
Segment中put的大體流程是先對Segment加鎖,然后根據(tab.length-1)&hash找到對應的slot,然后遍歷slot對應的鏈表,如果key對應的entry已經存在,根據onlyIfAbsent標志決定是否替換舊值,如果key對應的entry不存在,創建新節點插入鏈表頭部,期間若容量超過限制,判斷是否需要進行rehash。
put實現還是比較簡單的,下面談談其中主要的幾個優化點。
scanAndLockForPut的作用已經介紹過了,如果鎖能很快的獲取,有限次數的自旋可防止線程進入阻塞,有助于提升性能;此外,自旋期間會遍歷鏈表,希望遍歷的鏈表被CPU Cache所緩存,為后續實際put過程中的鏈表遍歷操作提升性能;最后scanAndLockForPut還會預創建節點。
HashEntry[] tab = table有什么好處?
從Segment源碼可知,table被聲明為volatile,為了保證內存可見性,table上的修改都必須立即更新到主存,volatile寫實際是具有一定開銷的。由于put中的代碼都在加鎖區執行,鎖既能保證可見性,也能保證原子性,因此,不需要針對table進行volatile寫,將table引用賦值給局部變量以實現編譯、運行時的優化。
node.setNext(first)也是同樣的道理,HashEntry的next同樣被聲明為volatile,因此這里使用優化的方式UNSAFE.putOrderedObject進行volatile寫入。
既然put已在加鎖區運行,為何訪問tab中的元素不直接通過數組索引,而是用entryAt(tab, index)?
加鎖保證了table引用的同步語義,但是對table數組中元素的寫入使用UNSAFE.putOrderedObject進行順序寫,該操作只是禁止寫寫指令重排序,不能保證寫入后的內存可見性。因此,必須使用entryAt(tab, index)提供的volatile讀來獲取最新的數據。
remove
remove源碼相對比較簡單,這里就直接分析源碼了。
final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry e = entryAt(tab, index);
HashEntry pred = null;
while (e != null) {
K k;
HashEntry next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
remove首先會嘗試獲取鎖,失敗則會進入scanAndLock代碼塊,scanAndLock和scanAndLockForPut實現相似。獲取鎖成功后,然后根據hash找到對應的slot,然后遍歷slot對應的鏈表,找到要移除元素的key,若value為空或與鏈表中元素的value相等則移除元素,否則退出。
remove和put中使用的優化相同,這里就不再重復說明。
replace操作實現方式也非常簡單,這里不再說明。下面詳細分析rehash實現。
rehash
rehash主要的作用的是擴容,將擴容前table中的節點重新分配到新的table。由于table的capacity都是2的冪,按照2的冪次方擴容為原來的一倍,擴容前在slot i中的元素,擴容后要么還是在slot i里,或者i+擴容前table的capacity的slot中,這樣使得只需要移動原來桶中的部分元素即可將所有節點分配到新的table。
為了提高效率,rehash首先找到第一個后續所有節點在擴容后index都保持不變的節點,將這個節點加入擴容后的table的index對應的slot中,然后將這個節點之前的所有節點重排即可。
先看流程圖:
rehash源碼:
@SuppressWarnings("unchecked")
private void rehash(HashEntry node) {
HashEntry[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry[] newTable =
(HashEntry[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry e = oldTable[i];
if (e != null) {
HashEntry next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // slot中只有一個元素
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry lastRun = e;
int lastIdx = idx;
for (HashEntry last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
//復制lastRun之前的所有節點
for (HashEntry p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry n = newTable[k];
newTable[k] = new HashEntry(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; //添加新節點
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
為了理解這段代碼的原理,現構造一個例子來說明,假設擴容前oldTable的oldCapacity為4,則擴容后newTable的newCapacity為8,假設oldTable[0]的元素如下:
由index=hash & (oldCapacity-1)可知,擴容前以上幾個節點都在slot?oldTable[0]中。下面開始分析
計算頭節點idx = e.hash & sizeMask=e.hash & (newCapacity-1)=16 & 7=0;
將idx賦值給lastIdx = idx=0,last = next(hash=4);
last不為空,第一輪循環:k = last.hash & sizeMask=4 & 7=4,滿足k != lastIdx,執行語句:
lastIdx = k;?lastRun = last(hash=4);
last = last.next,last不為空,第二輪循環:k =8 & 7=0,滿足k != lastIdx,執行語句:
lastIdx = k;?lastRun = last(hash=8);
last = last.next,last不為空,第三輪循環:k = 32 & 7=0,不滿足k != lastIdx;
last = last.next,last為空,結束循環。
到此找到第一個后續所有節點在擴容后index都保持不變的節點,lastRun為hash=8的節點,將lastRun加入lastIdx對應slot中,這樣lastRun的后續節點也會自動加入lastIdx對應slot。然后將節點lastRun之前的所有節點重排,通過新建節點,使用hash?& sizeMask計算index,則將節點插入index對應slot中鏈表頭部。
綜上,最好的情況,每個slot鏈表的所有節點在擴容后index都保持不變,那么只需移動頭節點,不用創建新節點即可完成擴容和節點重新分配;最差的情況,每個鏈表的倒數兩個節點在擴容后index不同,那么需要重建并復制所有節點。
到此,Segment實現原理和源碼分析完成,下面進入ConccurentHashMap主體代碼中。
主要操作
本節主要介紹ConccurentHashMap中的方法,get、containsKey、containsValue、size、put、putAll、putIfAbsent、remove等等。
get、containsKey
get與containsKey兩個方法的實現幾乎完全一致,都不需要加鎖讀數據,下面以get源碼說明:
public V get(Object key) {
Segment s;
HashEntry[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
首先計算key的hash碼,計算Segment的index,使用getObjectVolatile()方法提供的原子讀語義獲取Segment,再計算Segment中slot的索引,使用getObjectVolatile()方法提供的原子讀語義獲取slot頭節點,遍歷鏈表,判斷是否存在key相同的節點以及獲得該節點的value。由于遍歷過程中其他線程可能對鏈表結構做了調整,因此get和containsKey返回的可能是過時的數據,這就是ConcurrentHashMap的弱一致性。如果要求強一致性,那么必須使用Collections.synchronizedMap()。
size、containsValue、isEmpty
size用于返回ConcurrentHashMap中HashEntry的數量,containsValue用于判斷ConcurrentHashMap中是否存在給定的value,isEmpty用于判斷ConcurrentHashMap是否為空,size、containsValue、isEmpty都是基于整個ConcurrentHashMap來進行操作的,因此實現原理基本相似。這里以size方法來說明實現原理。先看流程圖:
看源碼:
public int size() {
final Segment[] segments = this.segments;
int size;
boolean overflow; // 為true表示size溢出32位
long sum; // 所有segment中modCount的總和
long last = 0L;
int retries = -1; // 第一次迭代不計入重試,因此總共會重試3次
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
首先不加鎖循環所有的Segment(通過Unsafe的getObjectVolatile()以保證原子讀語義),計算所有Segment的count之后,同時計算所有Segment的modcount之和sum,如果sum與last相等,說明迭代期間沒有發生其他線程修改ConcurrentHashMap的情況,返回size,;當重試次數超過預定義的值(RETRIES_BEFORE_LOCK為2)時,對所有的Segment依次進行加鎖,再計算size的值。需要注意的是,加鎖過程中會強制創建所有的不存在Segment,否則容易出現其他線程創建Segment并進行put,remove等操作。由于retries初始值為-1,因此會嘗試3次才會對所有的Segment加鎖。
注意:modcount在put, replace, remove以及clear等方法中都會被修改。
containsValue實現只是在重試問題上稍微不同,在第一次嘗試時,sum與last相等也不會返回,默認會嘗試試第二次,只有第二次嘗試時sum與last也相等才返回。此外,在循環所有的Segment期間,一旦找到匹配value的HashEntry,立即返回,只有value不存在時,才會多次嘗試確認。這里不貼源碼了。
isEmpty與containsValue、size實現稍有不同,isEmpty重試失敗不會進行加鎖,而是直接返回false,isEmpty在循環所有的Segment期間,一旦某個Segment中的entry數量不為0,立即返回true;第一次循環所有的Segment期間,計算每個Segment的modCount之和sum;當sum不為0,進行第二次循環,循環期間,使用sum減去每個Segment的modCount,如果sum不為0,返回false,否則返回true。這樣做的好處是,加入第一次循環期間發生了一個put,第二次循環發生了一個remove,那么isEmpty也能檢查出當前map為空。
put、putAll、putIfAbsent
put和putIfAbsent實現原理基本相似,區別在于當key存在時,put會替換舊值,更新modCount,putIfAbsent則不會。putAll通過循環調用put來實現。這里以put源碼來進行說明。
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
由源碼實現可知,首先計算key的hash碼,再計算segments的index,獲取Segment,如果Segment為空,則會進入ensureSegment創建Segment,最后將put操作代理給Segment中的put方法實現數據寫入。
注意:put中獲取指定Segment沒有使用原子讀語義,在ensureSegment中會使用原子讀語義重新檢查。
remove、replace、clear
remove和replace都是先通過key計算hash碼,定位到Segment,如果Segment為空,則不做任何操作,否則將操作代理給Segment的remove和replace方法。
clear會循環所有的Segment,如果Segment不空,將操作代理給Segment的clear,Segment的clear操作會直接對Segment加鎖,使用UNSAFE.putOrderedObject操作將table數組中的元素置為null。由于clear只是清除元素,Segment指向table的引用不會發生變化,使得clear期間仍然可以進行遍歷。
弱一致性
ConcurrentHashMap是弱一致的。 ConcurrentHashMap的get,containsKey,clear,iterator 都是弱一致性的。
get和containsKey都是無鎖的操作,均通過getObjectVolatile()提供的原子讀語義來獲得Segment以及對應的鏈表,然后遍歷鏈表。由于遍歷期間其他線程可能對鏈表結構做了調整,因此get和containsKey返回的可能是過時的數據。如在get執行UNSAFE.getObjectVolatile(segments, u)之后,其他線程若執行了clear操作,則get將讀到失效的數據。
由于clear沒有使用全局的鎖,當清除完一個segment之后,開始清理下一個segment的時候,已經清理segments可能又被加入了數據,因此clear返回的時候,ConcurrentHashMap中是可能存在數據的。因此,clear方法是弱一致的。
ConcurrentHashMap中的迭代器主要包括entrySet、keySet、values方法,迭代器在遍歷期間如果已經遍歷的table上的內容變化了,迭代器不會拋出ConcurrentModificationException異常。如果未遍歷的數組上的內容發生了變化,則有可能反映到迭代過程中,這就是ConcurrentHashMap迭代器弱一致的表現。
到此,jdk1.7中ConcurrentHashMap的實現分析完成。
歡迎指出本文有誤的地方,若對您有幫助,點個贊支持一下唄!轉載請注明原文出處
總結
以上是生活随笔為你收集整理的currenthashmap扩容原理_ConcurrentHashMap实现原理和源码解读的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 苹果向开发者推出iOS 17.2 Bet
- 下一篇: 发布H200芯片之际,英伟达股价已连涨十