concurrenthashmap 1.7/1.8
一、ConcurrentHashMap1.7
1.7的ConcurrentHashMap采用分段鎖的機制,實現并發的更新操作,底層采用數組+鏈表的存儲結構。
其包含兩個核心靜態內部類 Segment和HashEntry。
-
Segment
該類繼承了ReentrantLock重入鎖來當作鎖的角色,每個Segment對象維護了每個散列映射表中的若干個桶(每個桶是由若干個 HashEntry 對象鏈接起來的鏈表) -
HashEntry
該類用來封裝映射表中的鍵值類
一個ConcurrentHashMap實例中包含若干個 (默認分配16個segment)Segment 對象組成的數組,1.7的ConcurrentHashMap的結構如下:
這樣的結構保證了:一個線程占用一把鎖(segment)訪問其中一段數據的時候,其他段的數據也能被其它的線程訪問。
二、ConcurrentHashMap1.8(重點)
ConcurrentHashMap在JDK1.8中取消了segment分段鎖,而采用CAS和synchronized來保證并發安全。數據結構跟HashMap1.8的結構一樣,數組+鏈表/紅黑二叉樹。synchronized只鎖定當前鏈表或紅黑二叉樹的首節點,這樣只要hash不沖突,就不會產生并發,效率又提升N倍。
結構示意圖:
CAS的操作:
重要參數源碼:
初始化源碼put()方法分析:
public V put(K key, V value) {return putVal(key, value, false); }/** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) {//ConcurrentHashMap 不允許插入null鍵,HashMap允許插入一個null鍵if (key == null || value == null) throw new NullPointerException();//計算key的hash值int hash = spread(key.hashCode());int binCount = 0;//for循環的作用:因為更新元素是使用CAS機制更新,需要不斷的失敗重試,直到成功為止。for (Node<K,V>[] tab = table;;) {// f:鏈表或紅黑二叉樹頭結點,向鏈表中添加元素時,需要synchronized獲取f的鎖。Node<K,V> f; int n, i, fh;//判斷Node[]數組是否初始化,沒有則進行初始化操作if (tab == null || (n = tab.length) == 0)tab = initTable();//通過hash定位Node[]數組的索引坐標,是否有Node節點,如果沒有則使用CAS進行添加(鏈表的頭結點),添加失敗則進入下次循環。else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}//檢查到內部正在移動元素(Node[] 數組擴容)else if ((fh = f.hash) == MOVED)//幫助擴容tab = helpTransfer(tab, f);else {V oldVal = null;//鎖住鏈表或紅黑二叉樹的頭結點synchronized (f) {//判斷f是否是鏈表的頭結點if (tabAt(tab, i) == f) {//如果fh>=0 是鏈表節點if (fh >= 0) {binCount = 1;//遍歷鏈表所有節點for (Node<K,V> e = f;; ++binCount) {K ek;//如果節點存在,則更新valueif (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}//不存在則在鏈表尾部添加新節點。Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}//TreeBin是紅黑二叉樹節點else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;//添加樹節點if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {//如果鏈表長度已經達到臨界值8 就需要把鏈表轉換為樹結構if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}//將當前ConcurrentHashMap的size數量+1,看要不要擴容addCount(1L, binCount);return null; }get()方法源碼:
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;//用key的hash重新散列,用來獲取node[]的位置(下標)int h = spread(key.hashCode());if ((tab = table) != null && (n = tab.length) > 0 &&//獲取下標的位置(e = tabAt(tab, (n - 1) & h)) != null) {// e 為對應下標的初始Nodeif ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}//擴容中else if (eh < 0)//在槽中遍歷查找,正在擴容所以節點為ForwardingNode,ForwardingNode中的find實際上是在擴容后的新表中進行查找return (p = e.find(h, key)) != null ? p.val : null;//在槽遍歷查找while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null; }節點Node類:
//節點的靜態內部類,鍵值對存儲的地方static class Node<K,V> implements Map.Entry<K,V> {final int hash;final K key;//val值和下一個節點Node<K,V> next都被volatile關鍵字修飾,保證線程安全volatile V val;volatile Node<K,V> next;//初始化方法Node(int hash, K key, V val, Node<K,V> next) {this.hash = hash;this.key = key;this.val = val;this.next = next;}public final K getKey() { return key; }public final V getValue() { return val; }public final int hashCode() { return key.hashCode() ^ val.hashCode(); }public final String toString(){ return key + "=" + val; }//為了線程安全setValue不允許調用,會直接拋異常public final V setValue(V value) {throw new UnsupportedOperationException();}//重寫equals方法public final boolean equals(Object o) {Object k, v, u; Map.Entry<?,?> e;return ((o instanceof Map.Entry) &&(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&(v = e.getValue()) != null &&(k == key || k.equals(key)) &&(v == (u = val) || v.equals(u)));}//用以支持map.get()方法,會在子類中重寫Node<K,V> find(int h, Object k) {Node<K,V> e = this;if (k != null) {do {K ek;if (e.hash == h &&((ek = e.key) == k || (ek != null && k.equals(ek))))return e;} while ((e = e.next) != null);}return null;}}樹節點(TreeNode)類:
//樹節點的靜態內部類,與TreeBin共同提供紅黑樹功能static final class TreeNode<K,V> extends Node<K,V> {//紅黑樹的基本參數TreeNode<K,V> parent; TreeNode<K,V> left;TreeNode<K,V> right;//其實還維護著鏈表指針TreeNode<K,V> prev; boolean red;//構造方法TreeNode(int hash, K key, V val, Node<K,V> next,TreeNode<K,V> parent) {super(hash, key, val, next);this.parent = parent;} //重寫find方法Node<K,V> find(int h, Object k) {return findTreeNode(h, k, null);}//find方法實現,從樹的根部開始遍歷節點final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {if (k != null) {TreeNode<K,V> p = this;do {int ph, dir; K pk; TreeNode<K,V> q;TreeNode<K,V> pl = p.left, pr = p.right;if ((ph = p.hash) > h)p = pl;else if (ph < h)p = pr;else if ((pk = p.key) == k || (pk != null && k.equals(pk)))return p;else if (pl == null)p = pr;else if (pr == null)p = pl;else if ((kc != null ||(kc = comparableClassFor(k)) != null) &&(dir = compareComparables(kc, k, pk)) != 0)p = (dir < 0) ? pl : pr;//遞歸遍歷右子樹else if ((q = pr.findTreeNode(h, k, kc)) != null)return q;elsep = pl;} while (p != null);}return null;}}包含紅黑樹根結點的Treeibin類:
//擁有紅黑樹的根節點,維護著紅黑樹的讀寫鎖static final class TreeBin<K,V> extends Node<K,V> {TreeNode<K,V> root;volatile TreeNode<K,V> first;volatile Thread waiter;volatile int lockState;//持有寫鎖狀態static final int WRITER = 1;//等待寫鎖狀態static final int WAITER = 2; //持有讀鎖狀態 static final int READER = 4; // 在hashCode相等并且不是Comparable類時才使用此方法進行判斷大小static int tieBreakOrder(Object a, Object b) {int d;if (a == null || b == null ||(d = a.getClass().getName().compareTo(b.getClass().getName())) == 0)d = (System.identityHashCode(a) <= System.identityHashCode(b) ?-1 : 1);return d;}//構造方法,根據頭節點定義紅黑樹TreeBin(TreeNode<K,V> b) {super(TREEBIN, null, null, null);this.first = b;TreeNode<K,V> r = null;for (TreeNode<K,V> x = b, next; x != null; x = next) {next = (TreeNode<K,V>)x.next;x.left = x.right = null;if (r == null) {x.parent = null;x.red = false;r = x;}else {K k = x.key;int h = x.hash;Class<?> kc = null;for (TreeNode<K,V> p = r;;) {int dir, ph;K pk = p.key;if ((ph = p.hash) > h)dir = -1;else if (ph < h)dir = 1;else if ((kc == null &&(kc = comparableClassFor(k)) == null) ||(dir = compareComparables(kc, k, pk)) == 0)dir = tieBreakOrder(k, pk);TreeNode<K,V> xp = p;if ((p = (dir <= 0) ? p.left : p.right) == null) {x.parent = xp;if (dir <= 0)xp.left = x;elsexp.right = x;r = balanceInsertion(r, x);break;}}}}this.root = r;assert checkInvariants(root);}//根節點加寫鎖private final void lockRoot() {if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))contendedLock(); // offload to separate method}//根節點釋放寫鎖private final void unlockRoot() {lockState = 0;}//因為ConcurrentHashMap的寫方法會給頭節點加鎖,所以讀寫鎖不用考慮寫寫競爭的情況,只用考慮讀寫競爭的情況private final void contendedLock() {boolean waiting = false;for (int s;;) {//沒有線程持有讀鎖時嘗試獲取寫鎖if (((s = lockState) & ~WAITER) == 0) {//沒有線程持有寫鎖時嘗試獲取寫鎖if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {//拿到鎖后將等待線程清空(等待線程是它自己)if (waiting)waiter = null;return;}}//有線程持有寫鎖且本線程狀態不為WAITER時else if ((s & WAITER) == 0) {//嘗試占有waiting線程if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {waiting = true;waiter = Thread.currentThread();}}//有線程持有寫鎖且本線程狀態為WAITER時,堵塞自己else if (waiting)LockSupport.park(this);}}//重寫find方法,當寫鎖被持有時使用鏈表查詢的方法final Node<K,V> find(int h, Object k) {if (k != null) {for (Node<K,V> e = first; e != null; ) {int s; K ek;//寫鎖被持有時使用鏈表的方法遍歷if (((s = lockState) & (WAITER|WRITER)) != 0) {if (e.hash == h &&((ek = e.key) == k || (ek != null && k.equals(ek))))return e;e = e.next;}//寫鎖沒被持有時,持有一個讀鎖,用紅黑樹的方法遍歷else if (U.compareAndSwapInt(this, LOCKSTATE, s,s + READER)) {TreeNode<K,V> r, p;try {p = ((r = root) == null ? null :r.findTreeNode(h, k, null));} finally {Thread w;//當當前線程持有最后一個讀鎖的時候通知waiter線程獲取寫鎖if (U.getAndAddInt(this, LOCKSTATE, -READER) ==(READER|WAITER) && (w = waiter) != null)LockSupport.unpark(w);}return p;}}}return null;}//用以實現Map.putVal的樹部分final TreeNode<K,V> putTreeVal(int h, K k, V v) {Class<?> kc = null;boolean searched = false;for (TreeNode<K,V> p = root;;) {int dir, ph; K pk;if (p == null) {first = root = new TreeNode<K,V>(h, k, v, null, null);break;}else if ((ph = p.hash) > h)dir = -1;else if (ph < h)dir = 1;else if ((pk = p.key) == k || (pk != null && k.equals(pk)))return p;else if ((kc == null &&(kc = comparableClassFor(k)) == null) ||(dir = compareComparables(kc, k, pk)) == 0) {if (!searched) {TreeNode<K,V> q, ch;searched = true;if (((ch = p.left) != null &&(q = ch.findTreeNode(h, k, kc)) != null) ||((ch = p.right) != null &&(q = ch.findTreeNode(h, k, kc)) != null))return q;}dir = tieBreakOrder(k, pk);}TreeNode<K,V> xp = p;if ((p = (dir <= 0) ? p.left : p.right) == null) {TreeNode<K,V> x, f = first;first = x = new TreeNode<K,V>(h, k, v, f, xp);if (f != null)f.prev = x;if (dir <= 0)xp.left = x;elsexp.right = x;//當父節點是黑節點時候,直接掛一個紅節點,不用加鎖if (!xp.red)x.red = true;//其余時候可能需要旋轉紅黑樹,重新平衡,這里加寫鎖else {lockRoot();try {root = balanceInsertion(root, x);} finally {unlockRoot();}}break;}}assert checkInvariants(root);return null;}//移除紅黑樹節點final boolean removeTreeNode(TreeNode<K,V> p) {TreeNode<K,V> next = (TreeNode<K,V>)p.next;TreeNode<K,V> pred = p.prev; // unlink traversal pointersTreeNode<K,V> r, rl;if (pred == null)first = next;elsepred.next = next;if (next != null)next.prev = pred;if (first == null) {root = null;return true;}//如果紅黑樹規模太小,返回True,轉換為鏈表if ((r = root) == null || r.right == null ||(rl = r.left) == null || rl.left == null)return true;//紅黑樹規模大時,加寫鎖,在樹上刪除節點lockRoot();try {TreeNode<K,V> replacement;TreeNode<K,V> pl = p.left;TreeNode<K,V> pr = p.right;if (pl != null && pr != null) {TreeNode<K,V> s = pr, sl;while ((sl = s.left) != null) // find successors = sl;boolean c = s.red; s.red = p.red; p.red = c; // swap colorsTreeNode<K,V> sr = s.right;TreeNode<K,V> pp = p.parent;if (s == pr) { // p was s's direct parentp.parent = s;s.right = p;}else {TreeNode<K,V> sp = s.parent;if ((p.parent = sp) != null) {if (s == sp.left)sp.left = p;elsesp.right = p;}if ((s.right = pr) != null)pr.parent = s;}p.left = null;if ((p.right = sr) != null)sr.parent = p;if ((s.left = pl) != null)pl.parent = s;if ((s.parent = pp) == null)r = s;else if (p == pp.left)pp.left = s;elsepp.right = s;if (sr != null)replacement = sr;elsereplacement = p;}else if (pl != null)replacement = pl;else if (pr != null)replacement = pr;elsereplacement = p;if (replacement != p) {TreeNode<K,V> pp = replacement.parent = p.parent;if (pp == null)r = replacement;else if (p == pp.left)pp.left = replacement;elsepp.right = replacement;p.left = p.right = p.parent = null;}root = (p.red) ? r : balanceDeletion(r, replacement);if (p == replacement) { // detach pointersTreeNode<K,V> pp;if ((pp = p.parent) != null) {if (p == pp.left)pp.left = null;else if (p == pp.right)pp.right = null;p.parent = null;}}} finally {unlockRoot();}assert checkInvariants(root);return false;}ForwardingNode類(用以標記已經處理過的Hash桶)
//一個在擴容方法中使用的內部類,用以標記已經處理過的Hash桶static final class ForwardingNode<K,V> extends Node<K,V> {final Node<K,V>[] nextTable;//構造方法,ForwardingNode節點的Hash值為MOVED,nextTable指向擴容后的新MapForwardingNode(Node<K,V>[] tab) {super(MOVED, null, null, null);this.nextTable = tab;}//重寫了Node中的find方法Node<K,V> find(int h, Object k) {//使用循環,避免多次碰到ForwardingNode導致遞歸過深outer: for (Node<K,V>[] tab = nextTable;;) {Node<K,V> e; int n;if (k == null || tab == null || (n = tab.length) == 0 ||(e = tabAt(tab, (n - 1) & h)) == null)return null;for (;;) {int eh; K ek;if ((eh = e.hash) == h &&((ek = e.key) == k || (ek != null && k.equals(ek))))return e;if (eh < 0) {//遇到ForwardingNode節點的處理,相當于遞歸操作if (e instanceof ForwardingNode) {tab = ((ForwardingNode<K,V>)e).nextTable;continue outer;}elsereturn e.find(h, k);}if ((e = e.next) == null)return null;}}}}減少hash碰撞的方法:
//先用高16位異或然后和HASH_BITS進行&計算 ,減少碰撞 static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS; }重點講一下擴容機制:
當往hashMap中成功插入一個key/value節點時,有可能觸發擴容動作:
1、如果新增節點之后,所在鏈表的元素個數達到了閾值 8,則會調用treeifyBin方法把鏈表轉換成紅黑樹,不過在結構轉換之前,會對數組長度進行判斷,如果數組長度n小于閾值MIN_TREEIFY_CAPACITY,默認是64,則會調用tryPresize方法把數組長度擴大到原來的兩倍,并觸發transfer方法,重新調整節點的位置。實現如下:
2、新增節點之后,會調用addCount方法記錄元素個數,并檢查是否需要進行擴容,當數組元素個數達到閾值時,會觸發transfer方法,重新調整節點的位置。
transfer實現
transfer方法實現了在并發的情況下,高效的從原始組數往新數組中移動元素,假設擴容之前節點的分布如下,這里區分藍色節點和紅色節點,是為了后續更好的分析:
在上圖中,第14個槽位插入新節點之后,鏈表元素個數已經達到了8,且數組長度為16,優先通過擴容來緩解鏈表過長的問題,實現如下:
1、根據當前數組長度n,新建一個兩倍長度的數組nextTable;
2、初始化ForwardingNode節點,其中保存了新數組nextTable的引用,在處理完每個槽位的節點之后當做占位節點,表示該槽位已經處理過了;
3、通過for自循環處理每個槽位中的鏈表元素,默認advace為真,通過CAS設置transferIndex屬性值,并初始化i和bound值,i指當前處理的槽位序號,bound指需要處理的槽位邊界,先處理槽位15的節點;
4、在當前假設條件下,槽位15中沒有節點,則通過CAS插入在第二步中初始化的ForwardingNode節點,用于告訴其它線程該槽位已經處理過了;
5、如果槽位15已經被線程A處理了,那么線程B處理到這個節點時,取到該節點的hash值應該為MOVED,值為-1,則直接跳過,繼續處理下一個槽位14的節點;
6、處理槽位14的節點,是一個鏈表結構,先定義兩個變量節點ln和hn,按我的理解應該是lowNode和highNode,分別保存hash值的第X位為0和1的節點,具體實現如下:
使用fn&n可以快速把鏈表中的元素區分成兩類,A類是hash值的第X位為0,B類是hash值的第X位為1,并通過lastRun記錄最后需要處理的節點,A類和B類節點可以分散到新數組的槽位14和30中,在原數組的槽位14中,藍色節點第X為0,紅色節點第X為1,把鏈表拉平顯示如下:
1、通過遍歷鏈表,記錄runBit和lastRun,分別為1和節點6,所以設置hn為節點6,ln為null;
2、重新遍歷鏈表,以lastRun節點為終止條件,根據第X位的值分別構造ln鏈表和hn鏈表:
ln鏈:和原來鏈表相比,順序已經不一樣了
hn鏈:
通過CAS把ln鏈表設置到新數組的i位置,hn鏈表設置到i+n的位置;
7、如果該槽位是紅黑樹結構,則構造樹節點lo和hi,遍歷紅黑樹中的節點,同樣根據hash&n算法,把節點分為兩類,分別插入到lo和hi為頭的鏈表中,根據lo和hi鏈表中的元素個數分別生成ln和hn節點,其中ln節點的生成邏輯如下:
(1)如果lo鏈表的元素個數小于等于UNTREEIFY_THRESHOLD,默認為6,則通過untreeify方法把樹節點鏈表轉化成普通節點鏈表;
(2)否則判斷hi鏈表中的元素個數是否等于0:如果等于0,表示lo鏈表中包含了所有原始節點,則設置原始紅黑樹給ln,否則根據lo鏈表重新構造紅黑樹。
最后,同樣的通過CAS把ln設置到新數組的i位置,hn設置到i+n位置。
本文參考
本文參考
本文參考
總結
以上是生活随笔為你收集整理的concurrenthashmap 1.7/1.8的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HashSet、TreeSet、Tree
- 下一篇: 2021年度公有云安全报告