死磕java concurrent包系列(六)基于AQS解析信号量Semaphore
Semaphore
之前分析AQS的時候,內部有兩種模式,獨占模式和共享模式,前面的ReentrantLock都是使用獨占模式,而Semaphore同樣作為一個基于AQS實現的并發組件,它是基于共享模式實現的,我們先看看它的使用場景
Semaphore共享鎖的基本使用
假設有20個人去銀行柜面辦理業務,銀行只有3個柜面,同時只能辦理三個人,如果基于這種有限的、我們需要控制資源的情況,使用Semaphore比較方便:
public class SemaphoreTest {//排隊總人數private static final int COUNT =20;//只有三個柜臺private static final Semaphore AVALIABLECOUNT = new Semaphore(3);public static void main(String[] args) {//創建一個線程池BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(COUNT);BasicThreadFactory.Builder builder = new BasicThreadFactory.Builder().namingPattern("線程池");ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(COUNT, COUNT, 30L, TimeUnit.SECONDS, workQueue,builder.build());for (int i = 0; i < COUNT; i++) {final int count = i;//排隊的人都需要被服務,所以所有的人直接提交線程池處理threadPoolExecutor.execute(() -> {try {//使用acquire獲取共享鎖AVALIABLECOUNT.acquire();System.out.println(Thread.currentThread().getName());System.out.println("服務號"+count+"正在服務");Thread.sleep(1000);}catch (Exception e){System.out.println(e.getMessage());}finally {//獲取完了之后釋放資源AVALIABLECOUNT.release();}});}threadPoolExecutor.shutdown();} } 復制代碼輸出如下:我們執行代碼,可以發現每隔1秒幾乎同一時間出現3條線程訪,如下圖
Semaphore內部原理解析
Semaphore的內部結構
在深入分析Semaphore的內部原理前先看看一張類圖結構
這個結構和ReentrantLock基本上完全一致,Semaphore內部同樣存在繼承自AQS的內部類Sync以及繼承自Sync的公平鎖(FairSync)和非公平鎖(NofairSync),從這點也足以說明Semaphore的內部實現原理也是基于AQS并發組件的。 在之前的文章中,我們提到過,AQS是基礎組件,只負責核心并發操作,如加入或維護同步隊列,控制同步狀態等,而具體的加鎖和解鎖操作交由子類完成,因此子類Semaphore共享鎖的獲取與釋放需要自己實現,這兩個方法分別是獲取鎖的tryAcquireShared(int arg)方法和釋放鎖的tryReleaseShared(int arg)方法,這點從Semaphore的內部結構完全可以看出來。 我們在調用Semaphore的方法時,其內部則是通過間接調用其內部類或AQS執行的。下面我們就從Semaphore的源碼入手分析共享鎖實現原理,這里先從非公平鎖入手。非公平鎖的共享鎖
同樣的,我們先看看構造方法:
public Semaphore(int permits) {sync = new NonfairSync(permits);}/*** Creates a {@code Semaphore} with the given number of* permits and the given fairness setting.** @param permits the initial number of permits available.* This value may be negative, in which case releases* must occur before any acquires will be granted.* @param fair {@code true} if this semaphore will guarantee* first-in first-out granting of permits under contention,* else {@code false}*/public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);} 復制代碼我們通過默認構造函數創建時,誕生的就是非公平鎖,接下來我們看一下構造方法的入參permits的傳遞:
static final class NonfairSync extends Sync {NonfairSync(int permits) {super(permits);}//調用父類Sync的nonfairTryAcquireSharedprotected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);} }復制代碼在Sync中:
abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 1192457210091910933L;//直接將該值設置為AQS中的state的值Sync(int permits) {setState(permits);} 復制代碼所以Semaphore的入參permit直接傳入設置到AQS中的state中。 接下來我們看看acquire()方法,我們先通俗的解釋一下它的執行過程: 當一個線程請求到來時,state值代表的許可數,那么請求線程將會獲得同步狀態即對共享資源的訪問權,并更新state的值(一般是對state值減1),但如果請求線程過多,state值代表的許可數已減為0,則請求線程將無法獲取同步狀態,線程將被加入到同步隊列并阻塞,直到其他線程釋放同步狀態(一般是對state值加1)才可能獲取對共享資源的訪問權。 調用Semaphore的acquire()方法后將會調用到AQS的acquireSharedInterruptibly():
//Semaphore的acquire()public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {//判斷是否被中斷if (Thread.interrupted())throw new InterruptedException();//如果tryAcquireShared(arg)不小于0,則說明當前還有permit可被使用if (tryAcquireShared(arg) < 0)//如果許可被用完了,沒有剩余許可 則加入同步隊列等待doAcquireSharedInterruptibly(arg);} 復制代碼在acquireSharedInterruptibly()方法內部先進行了線程中斷的判斷,那么先嘗試調用tryAcquireShared(arg)方法獲取同步狀態,如果此時許可獲取成功,那么方法執行結束,如果獲取失敗,則說明沒有剩余許可了,那么調用doAcquireSharedInterruptibly(arg);方法加入同步隊列等待。 這里的tryAcquireShared(arg)是個模板方法設計模式,AQS內部沒有提供具體實現,由子類實現,也就是有Semaphore內部自己實現,該方法在Semaphore內部非公平鎖的實現如下
final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;//remaining < 0說明許可已經供不應求了,這個時候進來的線程需要被阻塞//否則CAS操作更新avaliable的值,它表示剩余的許可數if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}} 復制代碼nonfairTryAcquireShared(int acquires)方法內部,先獲取state的值,并執行減法操作,得到remaining值,它可以理解為剩余的許可數,如果remaining<0,說明請求的許可數過大,此時直接返回一個負數的remaining;如果remaining大于0,說明還有剩余的許可數,則可以訪問共享資源,后續將被加入同步隊列(通過doAcquireSharedInterruptibly(arg))。 注意Semaphore的acquire()可能存在并發操作,因此nonfairTryAcquireShared()方法體內部采用死循環+無鎖(CAS)并發的操作保證對state值修改的安全性。 例如:假設permit值為5,有多個線程并發accquire獲取許可,線程1運行時得到的remainin是5-1=4,線程2運行時,得到的remaining同樣是5-1=4,但是執行compareAndSetState時,線程2 更快一點,執行CAS操作:判斷state現在是否為5,如果為5,則CAS更新為4. 這個時候線程1也執行CAS操作,判斷state現在是否為5,發現不為5,所以CAS失敗,這時候需要這個死循環去重試。
如果remaining大于0,說明還有剩余的許可數,則可以訪問共享資源,后續將被加入同步隊列,接下來看入隊的操作,這一部分與ReentrantLock差不多:
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//使用SHARED類型創建共享模式的Nodefinal Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {//獲取前序節點final Node p = node.predecessor();//如果前序節點是頭節點,說明自己的Node在隊列最前端,此時可能共享資源隨時被釋放//所以需要再次嘗試獲取共享資源if (p == head) {int r = tryAcquireShared(arg);//如果獲取共享資源成功if (r >= 0) {//已經獲取資源后,node已經沒有意義,所以清理head節點并傳播setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//如果不是頭節點if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}} 復制代碼在方法中,由于當前線程沒有獲取同步狀態,因此創建一個共享模式類型(Node.SHARED)的結點并通過addWaiter(Node.SHARED)加入同步隊列,加入完成后,當前線程進入自旋狀態,首先判斷前驅結點是否為head,如果是,那么嘗試獲取同步狀態并返回r值,如果r大于0,則說明獲取同步狀態成功,將當前線程設置為head并傳播,傳播指的是,通知后續結點繼續獲取同步狀態,到此return結束,獲取到同步狀態的線程將會執行原定的任務。
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; setHead(node);//設置為頭結點/* * 嘗試去喚醒隊列中的下一個節點,如果滿足如下條件: * 還有剩余許可(propagate > 0), * 或者h.waitStatus為PROPAGATE(被上一個操作設置) * 并且 * 下一個節點處于共享模式或者為null。 * * 這兩項檢查中的保守主義可能會導致不必要的喚醒,但只有在有* 有在多個線程爭取獲得/釋放同步狀態時才會發生,所以大多* 數情況下會立馬獲得需要的信號*/ if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())//喚醒后繼節點,因為是共享模式,所以允許多個線程同時獲取同步狀態doReleaseShared();}}復制代碼但如果前驅結點不為head或前驅結點為head并嘗試獲取同步狀態失敗(與),那么調用shouldParkAfterFailedAcquire(p, node)方法判斷前驅結點的waitStatus值是否為SIGNAL并調整同步隊列中的node結點狀態,如果返回true,那么執行parkAndCheckInterrupt()方法,將當前線程掛起。 shouldParkAfterFailedAcquire方法與ReentrantLock中的如出一轍:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//獲取當前結點的等待狀態int ws = pred.waitStatus;//如果為等待喚醒(SIGNAL)狀態則返回trueif (ws == Node.SIGNAL)return true;//如果ws>0 則說明是結束狀態,//遍歷前驅結點直到找到沒有結束狀態的結點if (ws > 0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {//如果ws小于0又不是SIGNAL狀態,說明是node是首次加入的線程//則將其前驅節點設置為SIGNAL狀態。下次執行shouldParkAfterFailedAcquire方法時就//滿足ws == Node.SIGNAL條件了compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}復制代碼這個方法是AQS中的,如果不懂的話,可以參考之前在ReentrantLock中也分析過:juejin.im/post/5c021b… 中自旋的部分。 到此,加入同步隊列的整個過程完成。
總結
在AQS中存在一個volatile變量state,當我們創建Semaphore對象傳入許可數值時,最終會賦值給state,state的數值代表可同時操作共享數據的線程數量,每當一個線程請求(如調用Semaphored的acquire()方法)獲取同步狀態成功,state的值將會減少1,直到state為0時,表示已沒有可用的許可數,也就是對共享數據進行操作的線程數已達到最大值,其他后來線程將被阻塞,此時AQS內部會將線程封裝成共享模式的Node結點,加入同步隊列中等待并開啟自旋操作。只有當持有對共享數據訪問權限的線程執行完成任務并釋放同步狀態后,同步隊列中的對于的結點線程才有可能獲取同步狀態并被喚醒執行同步操作,注意在同步隊列中獲取到同步狀態的結點將被設置成head并清空相關線程數據(畢竟線程已在執行也就沒有必要保存信息了),AQS通過這種方式便實現共享鎖,用圖表示如下:
##非公平鎖的釋放鎖 接下來看一下釋放鎖:
public void release() {sync.releaseShared(1); }//調用到AQS中的releaseShared(int arg) public final boolean releaseShared(int arg) {//調用子類Semaphore實現的tryReleaseShared方法嘗試釋放同步狀態if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;} 復制代碼顯然Semaphore間接調用了AQS中的releaseShared(int arg)方法,通過tryReleaseShared(arg)方法嘗試釋放同步狀態,如果釋放成功,那么將調用doReleaseShared()喚醒同步隊列中后繼結點的線程,tryReleaseShared(int releases)方法如下:
//在Semaphore的內部類Sync中實現的 protected final boolean tryReleaseShared(int releases) {for (;;) {//獲取當前stateint current = getState();//釋放狀態state增加releasesint next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");//通過CAS更新state的值if (compareAndSetState(current, next))return true;}} 復制代碼邏輯很簡單,釋放同步狀態,更新state的值,同樣的,通過for死循環和CAS操作來保證線程安全問題,因為可能存在多個線程同時釋放同步狀態的場景。釋放成功后通過doReleaseShared()方法喚醒后繼結點。
private void doReleaseShared() {/* * 如果頭節點的后繼節點需要喚醒,那么執行喚醒 * 動作;如果不需要,將頭結點的等待狀態設置為PROPAGATE保證 * 喚醒傳遞。另外,為了防止過程中有新節點進入(隊列),這里必 * 需做循環,所以,和其他unparkSuccessor方法使用方式不一樣 * 的是,如果(頭結點)等待狀態設置失敗,重新檢測。 */ for (;;) {Node h = head;if (h != null && h != tail) {// 獲取頭節點對應的線程的狀態int ws = h.waitStatus;// 如果頭節點對應的線程是SIGNAL狀態,則意味著頭//結點的后繼結點所對應的線程需要被unpark喚醒。if (ws == Node.SIGNAL) {// 修改頭結點對應的線程狀態設置為0。失敗的話,則繼續循環。if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;// 喚醒頭結點h的后繼結點所對應的線程unparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}// 如果頭結點發生變化,則繼續循環。否則,退出循環。if (h == head) // loop if head changedbreak;} }//喚醒傳入結點的后繼結點對應的線程 private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);//拿到后繼結點Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)//喚醒該線程LockSupport.unpark(s.thread);}復制代碼顯然doReleaseShared()方法中通過調用unparkSuccessor(h)方法喚醒head的后繼結點對應的線程。這個方法在之前獲取資源時也會被調用:
if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}復制代碼兩種情況下都是為喚醒后繼節點,因為是共享模式,所以允許多個線程同時獲取同步狀態。釋放操作的過程還是相對簡單些的,即嘗試更新state值,更新成功調用doReleaseShared()方法喚醒后繼結點對應的線程。
公平鎖的共享鎖
公平鎖的中的共享模式實現除了在獲取同步狀態時與非公平鎖不同外,其他基本一樣:
static final class FairSync extends Sync {FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {//這里是重點,先判斷隊列中是否有結點再執行//同步狀態獲取。if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}相比之下,對于非公平鎖:final int nonfairTryAcquireShared(int acquires) {//使用死循環for (;;) {//每當有線程獲取共享資源時,就直接嘗試CAS操作int available = getState();int remaining = available - acquires;//判斷信號量是否已小于0或者CAS執行是否成功if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}復制代碼從代碼中可以看出,與非公平鎖tryAcquireShared(int acquires)方法實現的唯一不同是,在嘗試獲取同步狀態前,先調用了hasQueuedPredecessors()方法判斷同步隊列中是否存在結點,如果存在則返回-1,即將線程加入同步隊列等待,后續通過Node結構保證喚醒的順序。從而保證先到來的線程請求一定會先執行,也就是所謂的公平鎖。其他操作,與前面分析的非公平鎖一樣。
總結
AQS作為核心并發組件,它通過state值來控制對共享資源訪問的線程數,內部的Node有獨占模式(EXCLUSIVE)和共享模式(SHARED):
- 對于ReenTrantLock:state默認為0,每次加鎖后state更新為1,更新為1之后如果還有線程嘗試獲取鎖,則加入同步隊列等待;每當線程釋放鎖時,再更新為0并喚醒隊列中的線程
- 對于Semaphore:State默認為許可數,每當線程請求同步狀態成功,state值將會減1,如果超過限制數量的線程將被封裝共享模式的Node結點加入同步隊列封裝成獨占模式(EXCLUSIVE)等待,直到其他執行線程釋放同步狀態,才有機會獲得執行權,而每個線程執行完成任務釋放同步狀態后,state值將會增加1,這就是共享鎖的基本實現模型。
AQS是采用模板方法的設計模式構建的,它作為基礎組件,封裝的是核心并發操作,但是實現上分為兩種模式,即共享模式(如Semaphore)與獨占模式(如ReetrantLock,這兩個模式的本質區別在于多個線程能不能共享一把鎖),而這兩種模式的加鎖與解鎖實現方式是不一樣的,但AQS只關注內部公共方法實現并不關心外部不同模式的實現,所以提供了模板方法給子類使用:也就是說實現獨占鎖,如ReentrantLock需要自己實現tryAcquire()方法和tryRelease()方法,而實現共享模式的Semaphore,則需要實現tryAcquireShared()方法和tryReleaseShared()方法,這樣做的好處是顯而易見的,無論是共享模式還是獨占模式,其基礎的實現都是同一套組件(AQS),只不過是加鎖解鎖的邏輯不同罷了,更重要的是如果我們需要自定義鎖的話,也變得非常簡單,只需要選擇不同的模式實現不同的加鎖和解鎖的模板方法即可。 不管是ReentrantLock還是Semaphore,公平鎖與非公平鎖的不同之處在于公平鎖會在線程請求同步狀態前,判斷同步隊列是否存在Node,如果存在就將請求線程封裝成Node結點加入同步隊列,從而保證每個線程獲取同步狀態都是先到先得的順序執行的。非公平鎖則是通過競爭的方式獲取,不管同步隊列是否存在Node結點,只有通過競爭獲取就可以獲取線程執行權。
總結
以上是生活随笔為你收集整理的死磕java concurrent包系列(六)基于AQS解析信号量Semaphore的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 史玉柱:退休后我交了100多个男性朋友
- 下一篇: 国产VR厂商小派科技完成2亿元C1轮融资