ConcurrentHashMap源码学习
ConcurrentHashMap源碼學(xué)習
自從學(xué)習了AQS之后,想著重新讀一下ConcurrentHashMap的源碼來加深下理解,所以有了這篇文章,針對ConcurrentHashMap常用的方法進行分析。
0,基礎(chǔ)知識以及一些字段的含義
1.8中的ConcurrentHashMap使用了比較多的CAS操作,例如設(shè)置每個tab。設(shè)置一些變量值,用UnSafe類來操作。
這三個字段值表示了特殊的Hash值, -1時說明這個節(jié)點正在transfer(resize), -2則說明該節(jié)點已經(jīng)是一個樹結(jié)構(gòu)的根。 -3
static final int MOVED = -1; // hash for forwarding nodes static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient reservationssizeCtl,顧名思義就是size Control,控制了大小轉(zhuǎn)變的參數(shù),當初始化時,會默認賦值為0.75cap,每次擴容完成后也是0.75Cap,在擴容中會變成負數(shù),如,初始化為-1,或者為-(1+擴容線程的數(shù)量)。
/*** Table initialization and resizing control. When negative, the* table is being initialized or resized: -1 for initialization,* else -(1 + the number of active resizing threads). Otherwise,* when table is null, holds the initial table size to use upon* creation, or 0 for default. After initialization, holds the* next element count value upon which to resize the table.*/ private transient volatile int sizeCtl;transferIndex table里下一個去轉(zhuǎn)換的節(jié)點。
/*** The next table index (plus one) to split while resizing.*/ private transient volatile int transferIndex;1,put方法的流程
put方法與hashMap的流程很類似。
為空則初始化
不為空,則計算該key的hash出來的位置,如果該位置Node為null,則CAS給該節(jié)點賦值。
然后比較特殊的一點是,防止正在擴容的操作,如果發(fā)現(xiàn)正在擴容,則helpTransfer。
如果沒有在擴容,則鎖住該節(jié)點,正常遍歷鏈表查詢,沒有則插入到最后,如果鏈表太長,則轉(zhuǎn)化為紅黑樹。
最后增加新節(jié)點后,增加size,然后判斷是否擴容。
看一下Node節(jié)點以及Node節(jié)點的子類:
上圖是ConcurrentHashMap的uml圖中的一點關(guān)于Node節(jié)點的,一共有四個實現(xiàn):TreeNode和ForwardingNode以及 TreeBin,ReservationNode。
Node為正常存儲時的節(jié)點類型。
TreeBin為鏈表轉(zhuǎn)化為樹結(jié)構(gòu)時的樹的根節(jié)點。
TreeNode為轉(zhuǎn)化為樹結(jié)構(gòu)時的樹的子節(jié)點。
ReservationNode為調(diào)用compute方法和computeIfAbsent方法時的占位節(jié)點,占位是為了防止多線程下的錯誤,當然也可以自旋+cas解決,1.8貌似是對 sychronized 進行了優(yōu)化,很多可以用自旋+cas的都用了sychronized來解決。
下面看正常put過程中,如下圖:
ConcurrentHashMap 的數(shù)據(jù)結(jié)構(gòu)是Node數(shù)組,但是部分的Node會變成他的實現(xiàn)類,在鏈表長度超過8時,形成樹形結(jié)構(gòu),樹的根為TreeBin,TreeBin上不存key value,指向后續(xù)的TreeNode節(jié)點。快速查找。
接著看看擴容時的變化,如下圖:
首先會先創(chuàng)建一個NewTab,但是這時候ConcurrentHashMap 的table還是只想oldTab的,接著依次從最后一個節(jié)點開始轉(zhuǎn)變,如果節(jié)點無值,直接把該節(jié)點轉(zhuǎn)化為ForwardingNode,如果有值,則將Node鏈表上的值依次轉(zhuǎn)移到NewTab中。
逐漸的,oldTab中的Node數(shù)組會變成ForwardingNode數(shù)組。而newTab數(shù)組則轉(zhuǎn)移完成。
此時,ConcurrentHashMap的table會指向newTab。廢棄之前的oldTab。
需要注意的是,在轉(zhuǎn)移的過程中,如果put,則會先helpTransfer后put
轉(zhuǎn)移的過程中,如果get,則會用ForwardingNode的find方法來查找所有已有的節(jié)點進行返回。
2,get方法的流程
get方法比較簡單,除了**“轉(zhuǎn)移的過程中,如果get,則會用ForwardingNode的find方法來查找所有已有的節(jié)點進行返回。”**這一點以外,就是Node節(jié)點的next和value都設(shè)置為volatile,保證數(shù)據(jù)的可見性。
3,計算size方法
代碼如下:
public int size() {long n = sumCount();return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n); }final long sumCount() {CounterCell[] as = counterCells; CounterCell a;long sum = baseCount;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum; }可以看出來,ConcurrentHashMap的size是靠baseCount和CounterCell求和得到。那么這兩個值是怎么賦值的呢?
回顧之前put時的最后一步,addCount方法中,如下列代碼所示,counterCells == null時,如果CAS無誤,則baseCount計算中所有節(jié)點的個數(shù)。
但是在并發(fā)環(huán)境下,CAS失敗,則會進入if條件體:調(diào)用fullAddCount方法來設(shè)置CounterCell里的值。
@sun.misc.Contended static final class CounterCell {volatile long value;CounterCell(long x) { value = x; } }。private final void addCount(long x, int check) {CounterCell[] as; long b, s;if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);return;}if (check <= 1)return;s = sumCount();}if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}} }簡單看下fullAddCount方法的內(nèi)容:
自旋判斷,如果 counterCells 已經(jīng)不為空,則給counterCell的值進行修改,成功則break;
如果 cellsBusy 為0,則用CAS修改這個值為1,然后對counterCells開始修改,修改完畢cellsBusy重新變成0,成功則break;
如果上一步判斷仍然不成立,則試著修改baseCount的值,成功則break;
private final void fullAddCount(long x, boolean wasUncontended) {int h;if ((h = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit(); // force initializationh = ThreadLocalRandom.getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonemptyfor (;;) {CounterCell[] as; CounterCell a; int n; long v;if ((as = counterCells) != null && (n = as.length) > 0) {if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) { // Try to attach new CellCounterCell r = new CounterCell(x); // Optimistic createif (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean created = false;try { // Recheck under lockCounterCell[] rs; int m, j;if ((rs = counterCells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty}}collide = false;}else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehashelse if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))break;else if (counterCells != as || n >= NCPU)collide = false; // At max size or staleelse if (!collide)collide = true;else if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {try {if (counterCells == as) {// Expand table unless staleCounterCell[] rs = new CounterCell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];counterCells = rs;}} finally {cellsBusy = 0;}collide = false;continue; // Retry with expanded table}h = ThreadLocalRandom.advanceProbe(h);}else if (cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean init = false;try { // Initialize tableif (counterCells == as) {CounterCell[] rs = new CounterCell[2];rs[h & 1] = new CounterCell(x);counterCells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))break; // Fall back on using base} }總結(jié)
以上是生活随笔為你收集整理的ConcurrentHashMap源码学习的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: AQS理解之七——AQS中的条件队列
- 下一篇: LinkedHashMap 的理解以及借