ConcurrentHashMap 1.8 源码分析
2019獨角獸企業重金招聘Python工程師標準>>>
? ? ? ? ?前兩已經分析過幾篇關于CHM的源碼,本篇分析下1.8中的實現,已經棄用之前 segment 雙桶的機制。但是本質還是將鎖細化達到性能的提升,但是不是之前版本中定義的segment 上鎖,而是用了synchronized關鍵字鎖住 table (1.8中只維護了和hashmap中類似的數據結構。一個數據,數組內的結構是鏈表或者紅黑樹)中本次所操作對象的鏈表頭。同樣大量使用了UnSafe的 本地方法。如CAS,putOr并且最大的改進在于實現了并發的擴容。同時內部數據結構與Java8中HashMap一樣,增加了紅黑樹。當鏈表中的長度大于一定值后,轉化為紅黑樹(紅黑樹結構的同時也仍然保持了鏈表的結構,下面會詳細介紹)。
其實對于1.8的源碼分析 我所引用 的兩篇分章已經分析的十分詳細了,本篇本章就不對每個方法或參數在詳細說明了,只對其中我認為難以理解的對方加以概括,重點對如何實現并發擴容進行了分析。如有不對的地方,歡迎大家一起來交流。
?
? ? ? ?先簡單分析下put操作。我們可以猜想,在之前的實現中,首先定位segment,上鎖。而后操作在對segment中的map進一步處理。現在的實現中并沒有用segment,而是延用 hashMap中單桶,定位到鏈表后,直接 上鎖,而后對鏈表就行操作,同樣是將鎖細化。簡化了很多的計算操作。 從put的源碼可以看出總體與hashmap的put操作相差不大,除了加鎖,另外就是增加了判斷當前結點是否是ForwardingNode。表示當前正在擴容,且該結點已經擴容完畢。而后通過helpTransfer 判斷是否參與擴容的過程。
?
紅黑樹
與1.8中的HashMap一樣,當鏈表長度超過一定長度時,會轉換成紅黑樹。但是 兩者之間有一點細微的區別。如下代碼,表示HashMap的樹節點的數據結構。
HashMap /*** Entry for Tree bins. Extends LinkedHashMap.Entry (which in turn* extends Node) so can be used as extension of either regular or* linked node.*/ 也是繼承于HashMap中的Node,可以將該節點做為數組中的鏈表節點。 static final class TreeNode<K,V> extends LinkedHashMap.Entry<K,V> {TreeNode<K,V> parent; // red-black tree linksTreeNode<K,V> left;TreeNode<K,V> right;TreeNode<K,V> prev; // needed to unlink next upon deletionboolean red; 。。。}HashMap中TreeNode的實現了紅黑樹功能的方法。
ConcurrentHashMap中的TreeNode相對來說很簡單,只定義了基本的數據結構和一定查找的方法。如下
/*** Nodes for use in TreeBins*/ ConcurrentHashMap 樹的成員與上面類似,但是 static final class TreeNode<K,V> extends Node<K,V> {TreeNode<K,V> parent; // red-black tree linksTreeNode<K,V> left;TreeNode<K,V> right;TreeNode<K,V> prev; // needed to unlink next upon deletionboolean red;}并沒有實現紅黑樹應實現的方法。但是兩者有一個共通點,就是與單純的紅黑樹的數據結構多了一個變量,就是prev,增加這個變量的意義在于刪除紅黑樹的node時,需要找到被刪node的上一個結點,因為如果只是單向鏈表與紅黑樹的結構。刪除紅黑樹的結點時,單向鏈表就不能維護了,因為找不到被冊結點的上一個結點。因此這里增加了一個prev的結構,形成雙向鏈表。
而這樣做是其增加了一個TreeBin來包裝TreeNode,而這個容器不直接保存用戶的key,value信息。hash值為定值-2,在遍歷時可通過hash值判斷是當前Node是哪種結構。-1表示正在ForwardingNode,-2為TreeBin,大于0為鏈表。
ConcurrentHashMap /*** TreeNodes used at the heads of bins. TreeBins do not hold user* keys or values, but instead point to list of TreeNodes and* their root. They also maintain a parasitic read-write lock* forcing writers (who hold bin lock) to wait for readers (who do* not) to complete before tree restructuring operations.*/ static final class TreeBin<K,V> extends Node<K,V> {TreeNode<K,V> root; //紅黑樹的根結點。volatile TreeNode<K,V> first; // 指向鏈表的頭部,雖然為紅黑樹,但保持了鏈表的結構。在unTree化時簡化。volatile Thread waiter;volatile int lockState; 。。。}?
?
接下來介紹下由鏈表是怎么轉化為紅黑樹的。首先通過如下的方法將原來的Node轉換成TreeNode,從且從單身鏈表轉成雙向鏈表。此時還沒有發生紅黑樹的操作。
/* ---------------- Conversion from/to TreeBins -------------- *//*** Replaces all linked nodes in bin at given index unless table is* too small, in which case resizes instead.*/ private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; int n, sc;if (tab != null) {if ((n = tab.length) < MIN_TREEIFY_CAPACITY)tryPresize(n << 1);else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {synchronized (b) {if (tabAt(tab, index) == b) {TreeNode<K,V> hd = null, tl = null;for (Node<K,V> e = b; e != null; e = e.next) {TreeNode<K,V> p =new TreeNode<K,V>(e.hash, e.key, e.val,null, null);if ((p.prev = tl) == null)hd = p;elsetl.next = p;tl = p;}setTabAt(tab, index, new TreeBin<K,V>(hd));}}}} } setTabAt(tab, index, new TreeBin<K,V>(hd));然后new封裝轉換的TreeNode,在構造 方法里轉換成紅黑樹。hd為原鏈表的頭部。現在做為紅黑樹的first。在構造方法里執行的過程就和hashMap中轉紅黑樹的過程類似 ,根據key的 systemCode的大小,決定在樹中的位置,形成一個顆二叉樹。最后通過r = balanceInsertion(r, x); ?由二叉樹轉為紅黑樹。r為root指針。
而在HashMap中就沒有first指針的概念,雖然其內部同樣還是個雙向鏈表(通過prev實現)。它是在轉換成紅黑樹之后 ,通過將紅黑樹的root結點做為first node。
// HashMap 中treeify后執行的方法,將root做為鏈表的first node /*** Ensures that the given root is the first node of its bin.*/ static <K,V> void moveRootToFront(Node<K,V>[] tab, TreeNode<K,V> root) {int n;if (root != null && tab != null && (n = tab.length) > 0) {int index = (n - 1) & root.hash;TreeNode<K,V> first = (TreeNode<K,V>)tab[index];if (root != first) {Node<K,V> rn;tab[index] = root;TreeNode<K,V> rp = root.prev;if ((rn = root.next) != null)((TreeNode<K,V>)rn).prev = rp;if (rp != null)rp.next = rn;if (first != null)first.prev = root;root.next = first;root.prev = null;}assert checkInvariants(root);} }UNSAFE ??
如下,這三個操作是實現并發訪問的關鍵。通過UNSAFE的本地方法,1.7中就已經引入過,而在1.8中繼續扮演著重要的角色。并且大量使用了CAS的操作。
關于 UNSAFE方法 在之前1.7就已經分析過 @SuppressWarnings("unchecked") //定位tab中index為i的 Node static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } //CAS設置tab中index為i的結點為node static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v) {return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } //Volatile 寫 tab中的 Node。在加鎖的情況下調用 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }并發擴容
擴容這一操作相比于之前的實現,1.8中最大的不同在于實現了并發擴容。其核心 是通過了一個特殊的Node。其定義如下?如果遍歷時發現node的 hash值是 -1,表示當前正在擴容。且當前table中的該node已經容完成。遍歷下一個node進行擴容。
/*** A node inserted at head of bins during transfer operations.*/ static final class ForwardingNode<K,V> extends Node<K,V> {final Node<K,V>[] nextTable;ForwardingNode(Node<K,V>[] tab) {super(MOVED, null, null, null);this.nextTable = tab;}Node<K,V> find(int h, Object k) {。。。。} }而其并發到底又是怎么實現的呢 ? 接下來看,詳細代碼就不貼了,這里分析關鍵地方,
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range 。。。。。。。 }stride 非常關鍵,大致的意思 是每個處理器處理的node個數不小于16。而這一步的實現在下面的方法 。
i=0開始,第一步CAS ?設置 ?transferIndex,該值初始為原表的大小,假設n為64,第一個線程將其CAS設為64-16等于48.則第一個線程就先執行64到48之前的node的擴容操作,如果在此期間第二個線程要執行到transfer方法,則transferIndex為 32.其執行48到32之間。但是如果沒有其它線程進來執行,則它們就接著往下,爭取下一個stride數量的擴容操作。但是當最后transferIndex的值小于等于0時。表示此時已經不需要參與擴容了。如此通過CAS設置transferIndex的值,解決并發的沖突。同時每個node擴容時,要上鎖。以防其它操作改變該鏈表的結構。這樣就可以有多個線程并發擴容,且不會產生沖突。
Node<K,V> f; int fh; while (advance) {int nextIndex, nextBound;if (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;} } if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit} }如下是在transfer過程中,原node是鏈表的情況下擴容過程。
if (tabAt(tab, i) == f) {Node<K,V> ln, hn;if (fh >= 0) {int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}下圖表示遍歷到第i個結點,上鎖后,判斷是鏈表后,首先是找到lastRun,其表示尾部 ?hash&n ?相同的排在最前面的node。1表示擴容后在高位hn,0表示在低位ln(也就是原來的位置)。而之所以 用hash&n 的 表達式來決定原node在擴容后的位置,而沒有通過hash計算新數組的大小 來決定 位置 。這個實現十分的巧妙,直接將hash值與原表的大小& 一下。 這樣的做的原因是因為, put操作時決定結點所在位置時通過?hash& n-1。而n我們這里定義的是2的x次方。n-1顯然就是 11..11的結點,而n就是 1..00。舉個例子n為16,則n-1為 1111。而n為10000。對應擴容后的大小為100000,原結點在新數組的位置就是hash& 11111。現在我們發現 原之前的差別就在于 原數組的&操作。也就是 11111中第一個1。所以擴容時,不需要重新計算。而只需要將原來的hash值與n進行&操作(這樣的操作簡化了計算的復雜度)。就可以確定在新table的位置。0表示原索引,而1表示 i+n的位置。且不需要考慮并發的問題,nextTable的 ?i和 i+n的位置 只會由原table的i中的node給重新占據,而我們一開始就對table的 node ?i已經上鎖了。所以是安全的。
上面分析了鏈表的反序處理。紅黑樹類似,但是需要判讀是否unTree。
總結:進一步將鎖細化,不在是設置的并發級別,隨著擴容之后 ,鎖粒度進一步細化,并且提供了并發擴容,且大量使用了UNSAFE的本地方法,性能也進一步提升。
?
本文沒有對CHM的所有操作進行分析 ,如get,size,remove等,在下面引用的兩篇文章已經很詳細的分析,上文只是對部門關鍵點表明了我的一點點的看法。如有不正之處希望大家指出來。本文后面接著在完善。
?
?
?
?
http://www.importnew.com/22007.html
http://blog.csdn.net/u010723709/article/details/48007881
轉載于:https://my.oschina.net/ovirtKg/blog/777520
總結
以上是生活随笔為你收集整理的ConcurrentHashMap 1.8 源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SVNKit支持SSH连接
- 下一篇: php获取目录中的所有文件名