java 并发组件_Java 并发计数组件Striped64详解
作者: 一字馬胡
轉載標志 【2017-11-03】
更新日志
日期
更新內容
備注
2017-11-03
添加轉載標志
持續更新
Java Striped64
Striped64是在java8中添加用來支持累加器的并發組件,它可以在并發環境下使用來做某種計數,Striped64的設計思路是在競爭激烈的時候盡量分散競爭,在實現上,Striped64維護了一個base Count和一個Cell數組,計數線程會首先試圖更新base變量,如果成功則退出計數,否則會認為當前競爭是很激烈的,那么就會通過Cell數組來分散計數,Striped64根據線程來計算哈希,然后將不同的線程分散到不同的Cell數組的index上,然后這個線程的計數內容就會保存在該Cell的位置上面,基于這種設計,最后的總計數需要結合base以及散落在Cell數組中的計數內容。這種設計思路類似于java7的ConcurrentHashMap實現,也就是所謂的分段鎖算法,ConcurrentHashMap會將記錄根據key的hashCode來分散到不同的segment上,線程想要操作某個記錄只需要鎖住這個記錄對應著的segment就可以了,而其他segment并不會被鎖住,其他線程任然可以去操作其他的segment,這樣就顯著提高了并發度,雖然如此,java8中的ConcurrentHashMap實現已經拋棄了java7中分段鎖的設計,而采用更為輕量級的CAS來協調并發,效率更佳。關于java8中的ConcurrentHashMap的分析可以參考文章Java 8 ConcurrentHashMap源碼分析。
雖然Striped64的設計類似于分段鎖算法,但是任然有其獨到之處,本文將分析Striped64的實現細節,并且會分析基于Striped64的計數類LongAdder。Striped64的實現還是較為復雜的,本文會盡量分析,對于沒有充分了解的內容,或者分析有誤的內容,會在未來不斷修改補充。
下面首先展示了Striped64中的Cell類:
Cell類中僅有一個保存計數的變量value,并且為該變量提供了CAS操作方法,Cell類的實現雖然看起來很簡單,但是它的作用是非常大的,它是Striped64實現分散計數的最為基礎的數據結構,當然為了達到并發環境下的線程安全以及高效,Striped64做了很多努力。Striped64中有兩個提供計數的api方法,分別為longAccumulate和doubleAccumulate,兩者的實現思路是一致的,只是前者對long類型計數,而后者對double類型計數,本文只分析前者的實現,下面是longAccumulate方法的代碼:
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) { //獲取當前線程的probe值,如果為0,則需要初始化該線程的probe值
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) { //獲取cell數組
if ((a = as[(n - 1) & h]) == null) { // 通過(hashCode & (length - 1))這種算法來實現取模
if (cellsBusy == 0) { // 如果當前位置為null說明需要初始化
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//運行到此說明cell的對應位置上已經有想相應的Cell了,不需要初始化了
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//嘗試去修改a上的計數,a為Cell數組中index位置上的cell
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//cell數組最大為cpu的數量,cells != as表面cells數組已經被更新了
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1]; //Cell數組擴容,每次擴容為原來的兩倍
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
僅從代碼量上就可以意識到longAccumulate的實現時異常復雜的,下面來梳理一下該方法的運行邏輯:
longAccumulate會根據當前線程來計算一個哈希值,然后根據算法(hashCode & (length - 1))來達到取模的效果以定位到該線程被分散到的Cell數組中的位置
如果Cell數組還沒有被創建,那么就去獲取cellBusy這個共享變量(相當于鎖,但是更為輕量級),如果獲取成功,則初始化Cell數組,初始容量為2,初始化完成之后將x保證成一個Cell,哈希計算之后分散到相應的index上。如果獲取cellBusy失敗,那么會試圖將x累計到base上,更新失敗會重新嘗試直到成功。
如果Cell數組以及被初始化過了,那么就根據線程的哈希值分散到一個Cell數組元素上,獲取這個位置上的Cell并且賦值給變量a,這個a很重要,如果a為null,說明該位置還沒有被初始化,那么就初始化,當然在初始化之前需要競爭cellBusy變量。
如果Cell數組的大小已經最大了(CPU的數量),那么就需要重新計算哈希,來重新分散當前線程到另外一個Cell位置上再走一遍該方法的邏輯,否則就需要對Cell數組進行擴容,然后將原來的計數內容遷移過去。這里面需要注意的是,因為Cell里面保存的是計數值,所以在擴容之后沒有必要做其他的處理,直接根據index將舊的Cell數組內容直接復制到新的Cell數組中就可以了。
當然,上面的流程是高度概括的,longAccumulate的實際分支還要更多,并且為了保證線程安全做的判斷更多。longAccumulate會根據不同的狀態來執行不同的分支,比如在線程競爭非常激烈的時候,會通過對cells數組擴容或者從新計算哈希值來重新分散線程,這些做法的目的是將多個線程的計數請求分散到不同的cells的index上,其實這和java7中的ConcurrentHashMap的設計思路是完全一致的,但是java7中的ConcurrentHashMap實現在segment加鎖使用了比較重的synchronized,而Striped64使用了java中較為底層的Unsafe類的CAS操作來進行并發操作,這種方式更為輕量級,因為它會不停的嘗試,失敗會返回,而加鎖的方式會阻塞線程,線程需要被喚醒,這涉及到了線程的狀態的改變,需要上下文切換,所以是比較重量級的。
Unsafe
在這里添加一點關于java中底層操作的類Unsafe類的使用方法,首先看下面的代碼:
Unsafe需要關注的是Field的offset,然后在CAS的時候需要oldValue和expectValue以及newValue,它會在比較了oldValue == exceptValue的時候將oldValue設置為newValue,否則不會改變。這也是CAS的定義,(compare And set)下面的代碼展示了CAS操作的示例:
UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val)
this是需要改變的對象,valueOffset為需要修改的Field在該對象中的offset,這個值的獲取可以參考上面展示的
圖片,cmp為exceptValue,也就是我們希望他的舊值為cmp值,如果相等,則將該Field設置為val,否則別修改。
LongAdder實現細節
上文中分析了Striped64的實現細節,下面來分析一下LongAdder的實現細節,LongAdder的實現基于Striped64,理解了Striped64就很好理解LongAdder了。下面先來看一下LongAdder的add方法:
首先判斷cells是否為null,如果為null,則會嘗試將本次計數累計到base上,如果cells不為null,或者操作base失敗,那么就會通過哈希值來獲取當前線程對應的cells數組中的位置,獲取該位置上的cell,如果該cell不為null,那么就試圖將本次計數累計到該cell上,如果不成功,那么就需要借助Striped64類的longAccumulate方法來進行計數累計,關于longAccumulate的分析見上文。
當我們想要獲得當前的總計數的時候,需要調用sum方法來獲取,下面展示了該方法的細節:
它需要累計base和Cell數組中的Cell中的計數,base中的計數為線程競爭不是很激烈的時候累計的數,而在線程競爭比較激烈的時候就會將計數的任務分散到Cell數組中,所以在sum方法里,需要合并兩處的計數值。
除了獲取總計數,我們有時候想reset一下,下面的代碼展示了這種操作:
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}
同樣注意點在于需要同時將base和Cell數組都reset。
Striped64在ConcurrentHashMap中的使用
Striped64的計數方法在java8的ConcurrentHashMap中也有使用,具體的實現細節可以參考addCount方法,下面來看一下ConcurrentHashMap的size方法的實現細節:
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
ConcurrentHashMap中的baseCount對應著Striped64中的base變量,而counterCells則對應著Striped64中的cells數組,他們的實現時一樣的,更為詳細的內容可以參考java8中的ConcurrentHashMap實現。
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的java 并发组件_Java 并发计数组件Striped64详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: echarts自定义showlading
- 下一篇: python文件地址拼接