Java并发包中Semaphore的工作原理、源码分析及使用示例
簡(jiǎn)介:
在多線程程序設(shè)計(jì)中有三個(gè)同步工具需要我們掌握,分別是Semaphore(信號(hào)量),countDownLatch(倒計(jì)數(shù)門(mén)閘鎖),CyclicBarrier(可重用柵欄)
歡迎探討,如有錯(cuò)誤敬請(qǐng)指正
如需轉(zhuǎn)載,請(qǐng)注明出處?http://www.cnblogs.com/nullzx/
1. 信號(hào)量Semaphore的介紹
我們以一個(gè)停車(chē)場(chǎng)運(yùn)作為例來(lái)說(shuō)明信號(hào)量的作用。假設(shè)停車(chē)場(chǎng)只有三個(gè)車(chē)位,一開(kāi)始三個(gè)車(chē)位都是空的。這時(shí)如果同時(shí)來(lái)了三輛車(chē),看門(mén)人允許其中它們進(jìn)入進(jìn)入,然后放下車(chē)攔。以后來(lái)的車(chē)必須在入口等待,直到停車(chē)場(chǎng)中有車(chē)輛離開(kāi)。這時(shí),如果有一輛車(chē)離開(kāi)停車(chē)場(chǎng),看門(mén)人得知后,打開(kāi)車(chē)攔,放入一輛,如果又離開(kāi)一輛,則又可以放入一輛,如此往復(fù)。
?
在這個(gè)停車(chē)場(chǎng)系統(tǒng)中,車(chē)位是公共資源,每輛車(chē)好比一個(gè)線程,看門(mén)人起的就是信號(hào)量的作用。信號(hào)量是一個(gè)非負(fù)整數(shù),表示了當(dāng)前公共資源的可用數(shù)目(在上面的例子中可以用空閑的停車(chē)位類(lèi)比信號(hào)量),當(dāng)一個(gè)線程要使用公共資源時(shí)(在上面的例子中可以用車(chē)輛類(lèi)比線程),首先要查看信號(hào)量,如果信號(hào)量的值大于1,則將其減1,然后去占有公共資源。如果信號(hào)量的值為0,則線程會(huì)將自己阻塞,直到有其它線程釋放公共資源。
?
在信號(hào)量上我們定義兩種操作: acquire(獲取) 和 release(釋放)。當(dāng)一個(gè)線程調(diào)用acquire操作時(shí),它要么通過(guò)成功獲取信號(hào)量(信號(hào)量減1),要么一直等下去,直到有線程釋放信號(hào)量,或超時(shí)。release(釋放)實(shí)際上會(huì)將信號(hào)量的值加1,然后喚醒等待的線程。
信號(hào)量主要用于兩個(gè)目的,一個(gè)是用于多個(gè)共享資源的互斥使用,另一個(gè)用于并發(fā)線程數(shù)的控制。
?
2. 信號(hào)量Semaphore的源碼分析
在Java的并發(fā)包中,Semaphore類(lèi)表示信號(hào)量。Semaphore內(nèi)部主要通過(guò)AQS(AbstractQueuedSynchronizer)實(shí)現(xiàn)線程的管理。Semaphore有兩個(gè)構(gòu)造函數(shù),參數(shù)permits表示許可數(shù),它最后傳遞給了AQS的state值。線程在運(yùn)行時(shí)首先獲取許可,如果成功,許可數(shù)就減1,線程運(yùn)行,當(dāng)線程運(yùn)行結(jié)束就釋放許可,許可數(shù)就加1。如果許可數(shù)為0,則獲取失敗,線程位于AQS的等待隊(duì)列中,它會(huì)被其它釋放許可的線程喚醒。在創(chuàng)建Semaphore對(duì)象的時(shí)候還可以指定它的公平性。一般常用非公平的信號(hào)量,非公平信號(hào)量是指在獲取許可時(shí)先嘗試獲取許可,而不必關(guān)心是否已有需要獲取許可的線程位于等待隊(duì)列中,如果獲取失敗,才會(huì)入列。而公平的信號(hào)量在獲取許可時(shí)首先要查看等待隊(duì)列中是否已有線程,如果有則入列。
?
構(gòu)造函數(shù)源代碼
//非公平的構(gòu)造函數(shù) public Semaphore(int permits) {sync = new NonfairSync(permits); }//通過(guò)fair參數(shù)決定公平性 public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits); }?
acquire源代碼
?
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1); }public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); }final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;} }可以看出,如果remaining <0 即獲取許可后,許可數(shù)小于0,則獲取失敗,在doAcquireSharedInterruptibly方法中線程會(huì)將自身阻塞,然后入列。
?
release源代碼
public void release() {sync.releaseShared(1); }public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false; }protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;} }可以看出釋放許可就是將AQS中state的值加1。然后通過(guò)doReleaseShared喚醒等待隊(duì)列的第一個(gè)節(jié)點(diǎn)。可以看出Semaphore使用的是AQS的共享模式,等待隊(duì)列中的第一個(gè)節(jié)點(diǎn),如果第一個(gè)節(jié)點(diǎn)成功獲取許可,又會(huì)喚醒下一個(gè)節(jié)點(diǎn),以此類(lèi)推。
?
3. 使用示例
package javalearning;import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore;public class SemaphoreDemo {private Semaphore smp = new Semaphore(3); private Random rnd = new Random();class TaskDemo implements Runnable{private String id;TaskDemo(String id){this.id = id;}@Overridepublic void run(){try {smp.acquire();System.out.println("Thread " + id + " is working");Thread.sleep(rnd.nextInt(1000));smp.release();System.out.println("Thread " + id + " is over");} catch (InterruptedException e) {}}}public static void main(String[] args){SemaphoreDemo semaphoreDemo = new SemaphoreDemo();//注意我創(chuàng)建的線程池類(lèi)型,ExecutorService se = Executors.newCachedThreadPool();se.submit(semaphoreDemo.new TaskDemo("a"));se.submit(semaphoreDemo.new TaskDemo("b"));se.submit(semaphoreDemo.new TaskDemo("c"));se.submit(semaphoreDemo.new TaskDemo("d"));se.submit(semaphoreDemo.new TaskDemo("e"));se.submit(semaphoreDemo.new TaskDemo("f"));se.shutdown();} }運(yùn)行結(jié)果
Thread c is working
Thread b is working
Thread a is working
Thread c is over
Thread d is working
Thread b is over
Thread e is working
Thread a is over
Thread f is working
Thread d is over
Thread e is over
Thread f is over
可以看出,最多同時(shí)有三個(gè)線程并發(fā)執(zhí)行,也可以認(rèn)為有三個(gè)公共資源(比如計(jì)算機(jī)的三個(gè)串口)。
?
4. 參考內(nèi)容
[1] http://my.oschina.net/cloudcoder/blog/362974
總結(jié)
以上是生活随笔為你收集整理的Java并发包中Semaphore的工作原理、源码分析及使用示例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Hi3516A开发--视频输入和输出接口
- 下一篇: 编程的7个主要步骤: