聊聊高并发(三十一)解析java.util.concurrent各个组件(十三) 理解Exchanger交换器
這篇講講Exchanger交互器,它是一種比較特殊的兩方(Two-Party)柵欄,可以理解成Exchanger是一個柵欄,兩邊一方是生產者,一方是消費者,
1. 生產者和消費者各自維護了一個容器,生產者往容器里生產東西,消費者從容器里消費東西。
2. 當生產者的容器是滿的時候,它需要通過Exchanger向消費者交換,把滿的容器交換給消費者,從消費者手里拿到空的容器繼續生產。
3. 當消費者的容器是空的時候,它需要通過Exchanger向生產者交換,把空的容器交換給生產者,從生產者手里拿到滿的容器繼續消費。
?
所以我們看到這個過程中至少有5個組件
1. Exchanger柵欄
2. 生產者
3. 消費者
4. 生產者的容器
5. 消費者的容器
?
更復雜的情況是生產者有多個人在生產,消費者有多個人在消費,每個人都有自己的容器。這里有一個隱含的意思是生產者和消費者不挑容器,只要是空的或者滿的都能用。Exchanger的匹配是根據Hash來的,所以可能出現不同的人生產者或消費者對應到同一個Hash值。
Exchanger使用了Slot槽來表示一個位置,生產者和消費者都可以被Hash到一個槽中。
?
?private static final class Slot extends AtomicReference<Object> {
// Improve likelihood of isolation on <= 64 byte cache lines
long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
}
/**
* Slot array. Elements are lazily initialized when needed.
* Declared volatile to enable double-checked lazy construction.
*/
private volatile Slot[] arena = new Slot[CAPACITY];
創建了一個內部類Node來封裝要交互者的線程和要交換的容器
?
?
?private static final class Node extends AtomicReference<Object> {
/** The element offered by the Thread creating this node. */
public final Object item;
/** The Thread waiting to be signalled; null until waiting. */
public volatile Thread waiter;
/**
* Creates node with given item and empty hole.
* @param item the item
*/
public Node(Object item) {
this.item = item;
}
}
?
?
算法的主要部分就是交換的過程,下面簡單說說交互的邏輯
1. 先根據當前線程的id計算出一個Hash值作為索引index
2. 然后輪詢,如果index對應的Slot槽是null就生成一個,表示還沒有人使用這個槽位
3.?如果對應的Slot已經有線程了,并且CAS設置它為null也成功了,表示生產者和消費者匹配上了,再通過CAS把自己的item設置給對方Node引用,然后把之前等待的一方喚醒,把對方Node里面的item返回給自己。這樣相當于后來者拿到了之前等待者的item,并把后來者自己的item設置成了之前等待者的Node引用
當先來者被從自旋狀態喚醒后,會從自己的Node引用中獲取item,如果非空并且不是CANCEL,就證明有人跟它交換了,也拿到了對方的item返回了,否則就是超時取消了
4. 如果對應的Slot沒有線程,說明它是先來的那個,如果是0號位置的Slot,就進行阻塞,如果是非0的Slot,就自旋,直到超時或取消
5. 如果一個進入在它自己選擇的槽上CAS失敗,它選擇一個供替代的槽。如果一個線程成功CAS到一個槽但沒有其他線程到達,它嘗試其他,前往 0 號槽
?private Object doExchange(Object item, boolean timed, long nanos) {
Node me = new Node(item); // Create in case occupying
int index = hashIndex(); // Index of current slot
int fails = 0; // Number of CAS failures
for (;;) {
Object y; // Contents of current slot
Slot slot = arena[index];
if (slot == null) // Lazily initialize slots
createSlot(index); // Continue loop to reread
else if ((y = slot.get()) != null && // Try to fulfill
slot.compareAndSet(y, null)) {
Node you = (Node)y; // Transfer item
if (you.compareAndSet(null, item)) {
LockSupport.unpark(you.waiter);
return you.item;
} // Else cancelled; continue
}
else if (y == null && // Try to occupy
slot.compareAndSet(null, me)) {
if (index == 0) // Blocking wait for slot 0
return timed ?
awaitNanos(me, slot, nanos) :
await(me, slot);
Object v = spinWait(me, slot); // Spin wait for non-0
if (v != CANCEL)
return v;
me = new Node(item); // Throw away cancelled node
int m = max.get();
if (m > (index >>>= 1)) // Decrease index
max.compareAndSet(m, m - 1); // Maybe shrink table
}
else if (++fails > 1) { // Allow 2 fails on 1st slot
int m = max.get();
if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
index = m + 1; // Grow on 3rd failed slot
else if (--index < 0)
index = m; // Circularly traverse
}
}
}
更多Exchanger算法的細節請參考這篇?http://coderbee.net/index.php/concurrent/20140424/897
?
?
下面用一個測試用例來測試Exchanger的功能。最簡單的一個Exchanger的使用場景有5個組件
1個Exchanger, 1個生產者,1個生產者容器,1個消費者,1個消費者容器
當生產者把自己的容器生產滿了,就在Exchanger柵欄處等待消費者拿空的容器和它交換
當消費者把自己的容器消費空了,就在Exchanger柵欄處等待生產者拿滿的容器和它交換
?
?package com.lock.test;
import java.util.concurrent.Exchanger;
public class ExchangerUsecase {
private static Exchanger<Buffer<Integer>> exchanger = new Exchanger<Buffer<Integer>>();
private static Buffer<Integer> emptyBuffer = new Buffer<Integer>();
private static Buffer<Integer> fullBuffer = new Buffer<Integer>();
private static class Buffer<T>{
private T[] cache = (T[])(new Object[2]);
private int index = 0;
public void add(T item){
cache[index++] = item;
}
public T take(){
return cache[--index];
}
public boolean isEmpty(){
return index == 0;
}
public boolean isFull(){
return index == cache.length;
}
}
public static void main(String[] args){
Runnable provider = new Runnable(){
Buffer<Integer> currentBuffer = emptyBuffer;
private int exchangeCount = 0;
@Override
public void run() {
while(currentBuffer != null && exchangeCount <= 1){
if(!currentBuffer.isFull()){
System.out.println("Provider added one item");
currentBuffer.add(1);
}else{
try {
currentBuffer = exchanger.exchange(currentBuffer);
exchangeCount ++;
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
};
Runnable consumer = new Runnable(){
Buffer<Integer> currentBuffer = fullBuffer;
private int exchangeCount = 0;
@Override
public void run() {
while(currentBuffer != null && exchangeCount <= 2){
if(!currentBuffer.isEmpty()){
System.out.println("Consumer took one item");
currentBuffer.take();
}else{
try {
currentBuffer = exchanger.exchange(currentBuffer);
exchangeCount ++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
};
new Thread(provider).start();
new Thread(consumer).start();
}
}
private static Object spinWait(Node node, Slot slot) {
??????? int spins = SPINS;
??????? for (;;) {
??????????? Object v = node.get();
??????????? if (v != null)
??????????????? return v;
??????????? else if (spins > 0)
??????????????? --spins;
??????????? else
??????????????? tryCancel(node, slot);
??????? }
??? }?
測試結果顯示生產者先生成了兩個,然后滿了,就等待消費者和它交換。交換后消費者消費了兩個,再次等待交換。生產者又生成滿了一次,再次交換。如果不設置退出機制,雙方會一直生產和消費下去,所以在測試用例中限制了交換兩次
?
?
?Provider added one item
Provider added one item
Consumer took one item
Consumer took one item
Provider added one item
Provider added one item
Consumer took one item
Consumer took one item
總結
以上是生活随笔為你收集整理的聊聊高并发(三十一)解析java.util.concurrent各个组件(十三) 理解Exchanger交换器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 聊聊高并发(二十九)解析java.uti
- 下一篇: 聊聊高并发(三十三)Java内存模型那些