JUC锁-Semaphore(八)
Semaphore簡(jiǎn)介
Semaphore是一個(gè)信號(hào)計(jì)數(shù)量,它的本質(zhì)其實(shí)是一個(gè)”共享鎖”。
信號(hào)量維護(hù)了一個(gè)信號(hào)量許可集。線程可以通過(guò)調(diào)用acquire()來(lái)獲取信號(hào)量的許可;當(dāng)信號(hào)量中有可用的許可時(shí),線程能獲取該許可;否則線程必須等待,直到有可用的許可為止。 線程可以通過(guò)release()來(lái)釋放它所持有的信號(hào)量許可。
它的uml圖如下:
是不是感覺(jué)有點(diǎn)熟悉呢?沒(méi)錯(cuò),它跟“ReetrantLock”是一樣的,通過(guò)組件sync(繼承AQS),得到了它的模版方法。
Sync包括兩個(gè)子類:”公平信號(hào)量”FairSync 和 “非公平信號(hào)量”NonfairSync。sync是”FairSync的實(shí)例”,或者”NonfairSync的實(shí)例”;默認(rèn)情況下,sync是NonfairSync(默認(rèn)是非公平信號(hào)量)。
Semaphore方法
構(gòu)造方法
public Semaphore(int permits) {sync = new NonfairSync(permits); }public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits); }從中,我們可以信號(hào)量分為“公平信號(hào)量(FairSync)”和“非公平信號(hào)量(NonfairSync)”。Semaphore(int permits)函數(shù)會(huì)默認(rèn)創(chuàng)建“非公平信號(hào)量”。
我們來(lái)看一下FairSync和NonFaireSync的構(gòu)造方法:
static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}}我們可以看到,他們區(qū)別在于tryAcquireShared方法的不同。
下面會(huì)對(duì)他們作出解釋。
公平信號(hào)量獲取和釋放
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1); }public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits); }信號(hào)量中的acquire()獲取函數(shù),實(shí)際上是調(diào)用的AQS中的acquireSharedInterruptibly()。
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 如果線程是中斷狀態(tài),則拋出異常。if (Thread.interrupted())throw new InterruptedException();// 否則,嘗試獲取“共享鎖”;獲取成功則直接返回,獲取失敗,則通過(guò)doAcquireSharedInterruptibly()獲取。if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); }Semaphore中”公平鎖“對(duì)應(yīng)的tryAcquireShared()實(shí)現(xiàn)如下:
protected int tryAcquireShared(int acquires) {for (;;) {// 判斷“當(dāng)前線程”是不是CLH隊(duì)列中的第一個(gè)線程線程,// 若是的話,則返回-1。if (hasQueuedPredecessors())return -1;// 設(shè)置“可以獲得的信號(hào)量的許可數(shù)”int available = getState();// 設(shè)置“獲得acquires個(gè)信號(hào)量許可之后,剩余的信號(hào)量許可數(shù)”int remaining = available - acquires;// 如果“剩余的信號(hào)量許可數(shù)>=0”,則設(shè)置“可以獲得的信號(hào)量許可數(shù)”為remaining。if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;} }說(shuō)明:tryAcquireShared()的作用是嘗試獲取acquires個(gè)信號(hào)量許可數(shù)。
對(duì)于Semaphore而言,state表示的是“當(dāng)前可獲得的信號(hào)量許可數(shù)”。
下面看看AQS中doAcquireSharedInterruptibly()的實(shí)現(xiàn):
private void doAcquireSharedInterruptibly(long arg)throws InterruptedException {// 創(chuàng)建”當(dāng)前線程“的Node節(jié)點(diǎn),且Node中記錄的鎖是”共享鎖“類型;并將該節(jié)點(diǎn)添加到CLH隊(duì)列末尾。final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {// 獲取上一個(gè)節(jié)點(diǎn)。// 如果上一節(jié)點(diǎn)是CLH隊(duì)列的表頭,則”嘗試獲取共享鎖“。final Node p = node.predecessor();if (p == head) {long r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// 當(dāng)前線程一直等待,直到獲取到共享鎖。// 如果線程在等待過(guò)程中被中斷過(guò),則再次中斷該線程(還原之前的中斷狀態(tài))。if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }公平信號(hào)量的釋放
public void release() {sync.releaseShared(1); }public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits); }信號(hào)量的releases()釋放函數(shù),實(shí)際上是調(diào)用的AQS中的releaseShared()。
releaseShared()在AQS中實(shí)現(xiàn),源碼如下:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false; }復(fù)制代碼
說(shuō)明:releaseShared()的目的是讓當(dāng)前線程釋放它所持有的共享鎖。
它首先會(huì)通過(guò)tryReleaseShared()去嘗試釋放共享鎖。嘗試成功,則直接返回;嘗試失敗,則通過(guò)doReleaseShared()去釋放共享鎖。
Semaphore重寫了tryReleaseShared(),它的源碼如下:
protected final boolean tryReleaseShared(int releases) {for (;;) {// 獲取“可以獲得的信號(hào)量的許可數(shù)”int current = getState();// 獲取“釋放releases個(gè)信號(hào)量許可之后,剩余的信號(hào)量許可數(shù)”int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");// 設(shè)置“可以獲得的信號(hào)量的許可數(shù)”為next。if (compareAndSetState(current, next))return true;} }如果tryReleaseShared()嘗試釋放共享鎖失敗,則會(huì)調(diào)用doReleaseShared()去釋放共享鎖。doReleaseShared()的源碼如下:
private void doReleaseShared() {for (;;) {// 獲取CLH隊(duì)列的頭節(jié)點(diǎn)Node h = head;// 如果頭節(jié)點(diǎn)不為null,并且頭節(jié)點(diǎn)不等于tail節(jié)點(diǎn)。if (h != null && h != tail) {// 獲取頭節(jié)點(diǎn)對(duì)應(yīng)的線程的狀態(tài)int ws = h.waitStatus;// 如果頭節(jié)點(diǎn)對(duì)應(yīng)的線程是SIGNAL狀態(tài),則意味著“頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)所對(duì)應(yīng)的線程”需要被unpark喚醒。if (ws == Node.SIGNAL) {// 設(shè)置“頭節(jié)點(diǎn)對(duì)應(yīng)的線程狀態(tài)”為空狀態(tài)。失敗的話,則繼續(xù)循環(huán)。if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;// 喚醒“頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)所對(duì)應(yīng)的線程”。unparkSuccessor(h);}// 如果頭節(jié)點(diǎn)對(duì)應(yīng)的線程是空狀態(tài),則設(shè)置“文件點(diǎn)對(duì)應(yīng)的線程所擁有的共享鎖”為其它線程獲取鎖的空狀態(tài)。else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}// 如果頭節(jié)點(diǎn)發(fā)生變化,則繼續(xù)循環(huán)。否則,退出循環(huán)。if (h == head) // loop if head changedbreak;} }說(shuō)明:doReleaseShared()會(huì)釋放“共享鎖”。它會(huì)從前往后的遍歷CLH隊(duì)列,依次“喚醒”然后“執(zhí)行”隊(duì)列中每個(gè)節(jié)點(diǎn)對(duì)應(yīng)的線程;最終的目的是讓這些線程釋放它們所持有的信號(hào)量。
非公平信號(hào)量獲取和釋放
Semaphore中的非公平信號(hào)量是NonFairSync。在Semaphore中,“非公平信號(hào)量許可的釋放(release)”與“公平信號(hào)量許可的釋放(release)”是一樣的。
不同的是它們獲取“信號(hào)量許可”的機(jī)制不同,下面是非公平信號(hào)量獲取信號(hào)量許可的代碼。
非公平信號(hào)量的tryAcquireShared()實(shí)現(xiàn)如下:
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires); }nonfairTryAcquireShared()的實(shí)現(xiàn)如下:
final int nonfairTryAcquireShared(int acquires) {for (;;) {// 設(shè)置“可以獲得的信號(hào)量的許可數(shù)”int available = getState();// 設(shè)置“獲得acquires個(gè)信號(hào)量許可之后,剩余的信號(hào)量許可數(shù)”int remaining = available - acquires;// 如果“剩余的信號(hào)量許可數(shù)>=0”,則設(shè)置“可以獲得的信號(hào)量許可數(shù)”為remaining。if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;} }說(shuō)明:非公平信號(hào)量的tryAcquireShared()調(diào)用AQS中的nonfairTryAcquireShared()。而在nonfairTryAcquireShared()的for循環(huán)中,它都會(huì)直接判斷“當(dāng)前剩余的信號(hào)量許可數(shù)”是否足夠;足夠的話,則直接“設(shè)置可以獲得的信號(hào)量許可數(shù)”,進(jìn)而再獲取信號(hào)量。
而公平信號(hào)量的tryAcquireShared()中,在獲取信號(hào)量之前會(huì)通過(guò)if (hasQueuedPredecessors())來(lái)判斷“當(dāng)前線程是不是在CLH隊(duì)列的頭部”,是的話,則返回-1。
Semaphore例子
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreTest1 { private static final int SEM_MAX = 10;public static void main(String[] args) { Semaphore sem = new Semaphore(SEM_MAX);//創(chuàng)建線程池ExecutorService threadPool = Executors.newFixedThreadPool(3);//在線程池中執(zhí)行任務(wù)threadPool.execute(new MyThread(sem, 5));threadPool.execute(new MyThread(sem, 4));threadPool.execute(new MyThread(sem, 7));//關(guān)閉池threadPool.shutdown();} }class MyThread extends Thread {private volatile Semaphore sem; // 信號(hào)量private int count; // 申請(qǐng)信號(hào)量的大小 MyThread(Semaphore sem, int count) {this.sem = sem;this.count = count;}public void run() {try {// 從信號(hào)量中獲取count個(gè)許可sem.acquire(count);Thread.sleep(2000);System.out.println(Thread.currentThread().getName() + " acquire count="+count);} catch (InterruptedException e) {e.printStackTrace();} finally {// 釋放給定數(shù)目的許可,將其返回到信號(hào)量。sem.release(count);System.out.println(Thread.currentThread().getName() + " release " + count + "");}} }運(yùn)行結(jié)果:
pool-1-thread-1 acquire count=5 pool-1-thread-2 acquire count=4 pool-1-thread-1 release 5 pool-1-thread-2 release 4 pool-1-thread-3 acquire count=7 pool-1-thread-3 release 7總結(jié)
以上是生活随笔為你收集整理的JUC锁-Semaphore(八)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: JUC锁-CountDownLatch(
- 下一篇: JUC队列-ArrayBlockingQ