java concurrent 框架,java.util.concurrent 包下的 Synchronizer 框架
看完書?java concurrency in practice 當然是想找點啥好玩的東東玩玩。 當看到了Doug Lee 的論文 << The java.util.concurrent Synchronizer Framework >> 大呼來的太晚喔, 前段時間看那個ReentrantLock 的代碼真的是痛苦啊,不過現在也不晚不是。 ?呵呵, 上菜:這個框架的核心是一個AbstractQueuedSynchronizer 類 (下面簡稱AQS) ?它基本上的思路是:
采用Template Method Pattern. ?它實現了non-contended 的synchronization 算法;
繼承 它的Subclass ?一般不直接作為Synchronzier, 而是作為私有的實現 被用來delegate. ?比如 他舉了個例子:
class Mutex implements Lock, java.io.Serializable {
Java代碼
//?Our?internal?helper?class
private?static?class?Sync?extends?AbstractQueuedSynchronizer?{
.....
}
//?The?sync?object?does?all?the?hard?work.?We?just?forward?to?it.
private?final?Sync?sync?=?new?Sync();
public?void?lock()????????????????{?sync.acquire(1);?}
public?boolean?tryLock()??????????{?return?sync.tryAcquire(1);?}
public?void?unlock()??????????????{?sync.release(1);?}
public?Condition?newCondition()???{?return?sync.newCondition();?}
public?boolean?isLocked()?????????{?return?sync.isHeldExclusively();?}
public?boolean?hasQueuedThreads()?{?return?sync.hasQueuedThreads();?}
public?void?lockInterruptibly()?throws?InterruptedException?{
sync.acquireInterruptibly(1);
}
public?boolean?tryLock(long?timeout,?TimeUnit?unit)
throws?InterruptedException?{
return?sync.tryAcquireNanos(1,?unit.toNanos(timeout));
}
在這個 AQS 類中 提供了 兩大類方法
acquire ? 其中還包括 ?非阻塞的 tryAcquire; ?帶 time out 的;可以通過interruption 來cancellality 的。
release ? 相對簡單點, 因為在調用release 方法的時候基本上暗含了個意思當前的線程是獲得了鎖的。
在JUC 包中沒有為各種Synchronizer 類提供統一的API 比如 Lock.lock, Semaphore.acquire, CountDownLatch.await ?and ?FutureTask.get 都是映射到這個AQS 的 acquire 方法。
要實現一個Synchronizer 的基本思路非常直接的
acquire 操作
采用CAS 操作去更新同步狀態 ?如果不成功 ?{
enqueue 當前線程
block 當前線程
}
dequeue ?當前線程 ?if it was queued ? ? ?// 此時應該是當前線程被別的線程 release 來喚醒的。
release 操作 :
更新同步狀態, 此時可以直接 set 而不通過 CAS。
假如 等待隊列中還有線程 ?unblock ?one or more
要支持這些操作 需要 三類基本的組件:
1, ? Atomically managing synchronization state。 ? 這個通過
private ? volatile ?int ? state;
在處理acquire 的時候基本是通過用CAS 實現的 ?compareAndSetState 來
2, ? Block /Unblock 線程 。 這個是通過 JUC 里面的一個封裝類 LockSupport.park / unpark 來實現的。LockSupport 都deleget 到了 Unsafte 對應的方法
3, 維護 一個隊列來存放 等待線程。 這個是通過一個 CLH queue 的變種。 它是一種linked queue 的通過 head 和tail 。
三個里面最復雜的就是這個 CLH queue 的維護了, queue 中節點被定義為 ?:
static final class Node {
Java代碼
volatile?int?waitStatus;
volatile?Node?prev;
volatile?Node?next;
volatile?Thread?thread;
Node?nextWaiter;
final?boolean?isShared()?{
return?nextWaiter?==?SHARED;
}
final?Node?predecessor()?throws?NullPointerException?{
Node?p?=?prev;
if?(p?==?null)
throw?new?NullPointerException();
else
return?p;
}
Node()?{????//?Used?to?establish?initial?head?or?SHARED?marker
}
Node(Thread?thread,?Node?mode)?{?????//?Used?by?addWaiter
this.nextWaiter?=?mode;
this.thread?=?thread;
}
Node(Thread?thread,?int?waitStatus)?{?//?Used?by?Condition
this.waitStatus?=?waitStatus;
this.thread?=?thread;
}
AQS 中定義了
Java代碼
private?Node?addWaiter(Node?mode)?{
Node?node?=?new?Node(Thread.currentThread(),?mode);
//?Try?the?fast?path?of?enq;?backup?to?full?enq?on?failure
Node?pred?=?tail;
if?(pred?!=?null)?{
node.prev?=?pred;
if?(compareAndSetTail(pred,?node))?{
pred.next?=?node;
return?node;
}
}
enq(node);
return?node;
}
這個方法通常在 acquire 的時候如果 tryAcquire 失敗就會將insert 一個 Node 到 tail 之后, 并且 node.prev 指向先前的 tail。 當然這個是在?compareAndSetTail(pred, node) ?返回true 的時候比較簡單, 否則就進入 enq(node) 方法 差不多也是這個邏輯insert 一個 Node 到 tail 之后。
在 1.6.0_25 版本里面AQS 就沒有在 release 的時候 通過喚醒等待隊列里面head指向的線程, 然后然后該線程是在 方法里面繼續resume, 基本上會
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}。
就是將head的下一個節點設為Head, 原來的head 呢 p.next = null, ?p 不再引用任何對象, gc 就會發生作用。 ?這個基本上會的例外是啥呢, 新線程acquire 調用 tryAcquire 搶占成功 。 這個線程就只有繼續park 了。
基本上來講 有了 這個 AQS ?, 我們要想實現一個 Synchronizer 就比較簡單了, ?最簡單的情況下 ?treAcquire , tryRelease 實現下。AQS 里面提供的需要繼承的方法不是采用abstract 的方式 而是用拋出 notImplementedException的 方式, 如果像那些tryAcquireShared 之類的 在 exclusive 模式下根本就不會被調用到, ?我們也就根本就不必override。 ?比如 先前我們所的那個 Mutex 我們只需要定義它的內部類 Sync
Java代碼
//?Our?internal?helper?class
private?static?class?Sync?extends?AbstractQueuedSynchronizer?{
//?Report?whether?in?locked?state
protected?boolean?isHeldExclusively()?{
return?getState()?==?1;
}
//?Acquire?the?lock?if?state?is?zero
public?boolean?tryAcquire(int?acquires)?{
assert?acquires?==?1;?//?Otherwise?unused
if?(compareAndSetState(0,?1))?{
setExclusiveOwnerThread(Thread.currentThread());
return?true;
}
return?false;
}
//?Release?the?lock?by?setting?state?to?zero
protected?boolean?tryRelease(int?releases)?{
assert?releases?==?1;?//?Otherwise?unused
if?(getState()?==?0)?throw?new?IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return?true;
}
//?Provide?a?Condition
Condition?newCondition()?{?return?new?ConditionObject();?}
}
這個框架是非常精巧的, ?還有很多地方比如那個 ConditionObject 里面涉及到另外一個單獨的condition queue。只有再慢慢的啃了。
另外貼一張本人手工畫的圖,非常潦草多包涵了。
總結
以上是生活随笔為你收集整理的java concurrent 框架,java.util.concurrent 包下的 Synchronizer 框架的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: php json转数组后并在前端展示,0
- 下一篇: 毛坯45外圆30锥度20总长40锥长30