聊聊高并发(二十五)解析java.util.concurrent各个组件(七) 理解Semaphore
前幾篇分析了一下AQS的原理和實(shí)現(xiàn),這篇拿Semaphore信號(hào)量做例子看看AQS實(shí)際是如何使用的。
?
Semaphore表示了一種可以同時(shí)有多個(gè)線程進(jìn)入臨界區(qū)的同步器,它維護(hù)了一個(gè)狀態(tài)表示可用的票據(jù),只有拿到了票據(jù)的線程盡可以進(jìn)入臨界區(qū),否則就等待,直到獲得釋放出的票據(jù)。Semaphore常用在資源池中來(lái)管理資源。當(dāng)狀態(tài)只有1個(gè)0兩個(gè)值時(shí),它退化成了一個(gè)互斥的同步器,類似鎖。
?
下面來(lái)看看Semaphore的代碼。
它維護(hù)了一個(gè)內(nèi)部類Sync來(lái)繼承AQS,定制tryXXX方法來(lái)使用AQS。我們之前提到過(guò)AQS支持獨(dú)占和共享兩種模式,Semaphore明顯就是共享模式,它支持多個(gè)線程可以同時(shí)進(jìn)入臨界區(qū)。所以Sync擴(kuò)展了Shared相關(guān)的方法。
可以看到Sync的主要操作都是對(duì)狀態(tài)的無(wú)鎖修改,它不需要處理AQS隊(duì)列相關(guān)的操作。在聊聊高并發(fā)(二十四)解析java.util.concurrent各個(gè)組件(六) 深入理解AQS(四)?我們說(shuō)了AQS提供了tryXXX接口給子類擴(kuò)展,相當(dāng)于給子類一個(gè)機(jī)會(huì),可以自己處理狀態(tài),決定是否入同步隊(duì)列。
1. nonfailTryAcquireShared()非公平的tryAcquire,它立刻修改了票據(jù)狀態(tài),而不需要管是否有先來(lái)的線程正在等待,而一旦有可用的票據(jù),就直接獲得了鎖,不需要進(jìn)入AQS的隊(duì)列等待同步。
2. tryReleaseShared()方法負(fù)責(zé)釋放共享狀態(tài)的資源,它只修改了票據(jù)狀態(tài),由AQS的releaseShared()方法來(lái)負(fù)責(zé)喚醒在AQS隊(duì)列等待的線程
3. reducePermits()和drainPermits()方法都是直接修改了狀態(tài),從而限制可用的資源
?
?abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
?
Sync也是一個(gè)抽象類,具體的實(shí)現(xiàn)是NonfailSync和FairSync,代表了非公平實(shí)現(xiàn)和公平實(shí)現(xiàn)。在上一篇已經(jīng)提到,所謂的非公平只是說(shuō)在獲取資源時(shí)開(kāi)了一個(gè)口子,可以讓后來(lái)的線程不需要管在AQS隊(duì)列中的先來(lái)的線程來(lái)獲取資源,而一旦獲取失敗,就得進(jìn)入AQS隊(duì)列等待,而AQS隊(duì)列是先來(lái)先服務(wù)的FIFO隊(duì)列。
可以看到,NonfailSync和FairSync只是在tryAcquireShared方法的實(shí)現(xiàn)上不同,其他都是一樣的。
?
?/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
再來(lái)看看Semaphore自己提供的方法,
1.支持可中斷和不可中斷的獲取/釋放
2.支持限時(shí)獲取
3.支持tryXX獲取/釋放
4. 支持同時(shí)獲取/釋放多個(gè)資源
?
可以看到Semaphore的實(shí)現(xiàn)都是基于AQS的方法來(lái)作的,單個(gè)資源的獲取/釋放操作都是請(qǐng)求1個(gè)資源,所以參數(shù)傳遞的是1,多個(gè)資源獲取傳遞了一個(gè)int個(gè)數(shù)。
?
?public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquireUninterruptibly() {
??????? sync.acquireShared(1);
??? }
public boolean tryAcquire() {
??????? return sync.nonfairTryAcquireShared(1) >= 0;
??? }
public boolean tryAcquire(long timeout, TimeUnit unit)
??????? throws InterruptedException {
??????? return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
??? }
public void release() {
??????? sync.releaseShared(1);
??? }
public void acquire(int permits) throws InterruptedException {
??????? if (permits < 0) throw new IllegalArgumentException();
??????? sync.acquireSharedInterruptibly(permits);
??? }
public void acquireUninterruptibly(int permits) {
??????? if (permits < 0) throw new IllegalArgumentException();
??????? sync.acquireShared(permits);
??? }
public boolean tryAcquire(int permits) {
??????? if (permits < 0) throw new IllegalArgumentException();
??????? return sync.nonfairTryAcquireShared(permits) >= 0;
??? }
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
??????? throws InterruptedException {
??????? if (permits < 0) throw new IllegalArgumentException();
??????? return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
??? }
public void release(int permits) {
??????? if (permits < 0) throw new IllegalArgumentException();
??????? sync.releaseShared(permits);
??? }
下面用一個(gè)實(shí)例來(lái)測(cè)試一下Semaphore的功能。
1. 創(chuàng)建一個(gè)有兩個(gè)票據(jù)的Semaphore
2. 創(chuàng)建20個(gè)線程來(lái)競(jìng)爭(zhēng)執(zhí)行race()方法
3. 在race()方法里先打印一句等待獲取資源的話,再獲取資源,獲得資源后打印一句話,最后釋放資源,釋放資源前打印一句話
?
?package com.lock.test;
import java.util.concurrent.Semaphore;
public class SemaphoreUsecase {
private Semaphore semaphore = new Semaphore(2);
public void race(){
System.out.println("Thread " + Thread.currentThread().getName() + " is waiting the resource");
semaphore.acquireUninterruptibly();
try{
System.out.println("Thread " + Thread.currentThread().getName() + " got the resource");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}finally{
System.out.println("Thread " + Thread.currentThread().getName() + " is releasing the resource");
semaphore.release();
}
}
public static void main(String[] args){
final SemaphoreUsecase usecase = new SemaphoreUsecase();
for(int i = 0; i < 10; i++){
Thread t = new Thread(new Runnable(){
@Override
public void run() {
usecase.race();
}
}, String.valueOf(i));
t.start();
}
}
}
測(cè)試結(jié)果:
可以看到先來(lái)的兩個(gè)線程先獲得了資源,后來(lái)的線程都在等待,當(dāng)有線程釋放資源之后,等待的線程才會(huì)去獲得資源,直到都獲得/釋放資源
?
?Thread 0 is waiting the resource
Thread 0 got the resource
Thread 2 is waiting the resource
Thread 2 got the resource
Thread 1 is waiting the resource
Thread 4 is waiting the resource
Thread 3 is waiting the resource
Thread 5 is waiting the resource
Thread 6 is waiting the resource
Thread 7 is waiting the resource
Thread 8 is waiting the resource
Thread 9 is waiting the resource
Thread 2 is releasing the resource
Thread 0 is releasing the resource
Thread 1 got the resource
Thread 4 got the resource
Thread 1 is releasing the resource
Thread 4 is releasing the resource
Thread 3 got the resource
Thread 5 got the resource
Thread 3 is releasing the resource
Thread 5 is releasing the resource
Thread 6 got the resource
Thread 7 got the resource
Thread 7 is releasing the resource
Thread 6 is releasing the resource
Thread 8 got the resource
Thread 9 got the resource
Thread 8 is releasing the resource
Thread 9 is releasing the resource
?
總結(jié)
以上是生活随笔為你收集整理的聊聊高并发(二十五)解析java.util.concurrent各个组件(七) 理解Semaphore的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 聊聊高并发(二十四)解析java.uti
- 下一篇: 聊聊高并发(二十六)解析java.uti