并发编程-AQS
概念
AQS是AbstactQueuedSynchronizer的簡稱,它是一個Java提供的底層同步工具類,用一個int類型的變量表示同步狀態,并提供了一系列的CAS操作來管理這個同步狀態。AQS的主要作用是為Java中的并發同步組件提供統一的底層支持,例如ReentrantLock,CountdowLatch就是基于AQS實現的,用法是通過繼承AQS實現其模版方法,然后將子類作為同步組件的內部類。
同步隊列
一個雙端隊列,遵循FIFO原則,主要作用是用來存放在鎖上阻塞的線程,當一個線程嘗試獲取鎖時,如果已經被占用,那么當前線程就會被構造成一個Node節點加到同步隊列的尾部,隊列的頭節點是成功獲取鎖的節點,當頭節點線程釋放鎖時,會喚醒后面的節點并釋放當前頭節點的引用。
獨占鎖的獲取和釋放流程
######獲取
1. 調用入口方法acquire(arg)
2. 調用模版方法tryAcquire(arg)嘗試獲取鎖,若成功則返回,若失敗則走下一步
3. 將當前線程構造成一個Node節點,并利用CAS將其加入到同步隊列到尾部,然后該節點對應的線程進入自旋狀態。
自旋時,首先判斷其前驅節點是否為頭節點&是否成功獲取同步狀態,兩個條件都成立,則將當前線程的節點設置為頭節點,如果不是,則利用LockSupport.park(this)將當前線程掛起 ,等待被前驅節點喚醒
######釋放
1. 調用入口方法release(arg)
2. 調用模版方法tryRelease(arg)釋放同步狀態
3. 獲取當前節點的下一個節點
4. 利用LockSupport.unpark(currentNode.next.thread)喚醒后繼節點(接獲取的第四步)
共享鎖的獲取和釋放流程
######獲取鎖
1. 調用acquireShared(arg)入口方法
2. 進入tryAcquireShared(arg)模版方法獲取同步狀態,如果返返回值>=0,則說明同步狀態(state)有剩余,獲取鎖成功直接返回
如果tryAcquireShared(arg)返回值<0,說明獲取同步狀態失敗,向隊列尾部添加一個共享類型的Node節點,隨即該節點進入自旋狀態
自旋時,首先檢查前驅節點釋放為頭節點&tryAcquireShared()是否>=0(即成功獲取同步狀態)
如果是,則說明當前節點可執行,同時把當前節點設置為頭節點,并且喚醒所有后繼節點
如果否,則利用LockSupport.unpark(this)掛起當前線程,等待被前驅節點喚醒
######釋放鎖
調用releaseShared(arg)模版方法釋放同步狀態
如果釋放成,則遍歷整個隊列,利用LockSupport.unpark(nextNode.thread)喚醒所有后繼節點
獨占鎖和共享鎖在實現上的區別
獨占鎖的同步狀態值為1,即同一時刻只能有一個線程成功獲取同步狀態
共享鎖的同步狀態>1,取值由上層同步組件確定
獨占鎖隊列中頭節點運行完成后釋放它的直接后繼節點
共享鎖隊列中頭節點運行完成后釋放它后面的所有節點
共享鎖中會出現多個線程(即同步隊列中的節點)同時成功獲取同步狀態的情況
重入鎖
重入鎖指的是當前線成功獲取鎖后,如果再次訪問該臨界區,則不會對自己產生互斥行為。Java中對ReentrantLock和synchronized都是可重入鎖,synchronized由jvm實現可重入即使,ReentrantLock都可重入性基于AQS實現。
同時,ReentrantLock還提供公平鎖和非公平鎖兩種模式。
######ReentrantLock重入鎖
重入鎖的基本原理是判斷上次獲取鎖的線程是否為當前線程,如果是則可再次進入臨界區,如果不是,則阻塞。
由于ReentrantLock是基于AQS實現的,底層通過操作同步狀態來獲取鎖,下面看一下非公平鎖的實現邏輯:
final boolean nonfairTryAcquire(int acquires) {//獲取當前線程final Thread current = Thread.currentThread();//通過AQS獲取同步狀態int c = getState();//同步狀態為0,說明臨界區處于無鎖狀態,if (c == 0) {//修改同步狀態,即加鎖if (compareAndSetState(0, acquires)) {//將當前線程設置為鎖的owner setExclusiveOwnerThread(current);return true;}}//如果臨界區處于鎖定狀態,且上次獲取鎖的線程為當前線程else if (current == getExclusiveOwnerThread()) {//則遞增同步狀態int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}######非公平鎖
非公平鎖是指當鎖狀態為可用時,不管在當前鎖上是否有其他線程在等待,新近線程都有機會搶占鎖。
上述代碼即為非公平鎖和核心實現,可以看到只要同步狀態為0,任何調用lock的線程都有可能獲取到鎖,而不是按照鎖請求的FIFO原則來進行的。
######公平鎖
公平鎖是指當多個線程嘗試獲取鎖時,成功獲取鎖的順序與請求獲取鎖的順序相同,下面看一個ReentrantLock的實現:
從上面的代碼中可以看出,公平鎖與非公平鎖的區別僅在于是否判斷當前節點是否存在前驅節點!hasQueuedPredecessors() &&,由AQS可知,如果當前線程獲取鎖失敗就會被加入到AQS同步隊列中,那么,如果同步隊列中的節點存在前驅節點,也就表明存在線程比當前節點線程更早的獲取鎖,故只有等待前面的線程釋放鎖后才能獲取鎖。
#讀寫鎖
Java提供了一個基于AQS到讀寫鎖實現ReentrantReadWriteLock,該讀寫鎖到實現原理是:將同步變量state按照高16位和低16位進行拆分,高16位表示讀鎖,低16位表示寫鎖。
######寫鎖的獲取與釋放
寫鎖是一個獨占鎖,所以我們看一下ReentrantReadWriteLock中tryAcquire(arg)的實現:
上述代碼的處理流程已經非常清晰:
獲取同步狀態,并從中分離出低16為的寫鎖狀態
如果同步狀態不為0,說明存在讀鎖或寫鎖
如果存在讀鎖(c !=0 && w == 0),則不能獲取寫鎖(保證寫對讀的可見性)
如果當前線程不是上次獲取寫鎖的線程,則不能獲取寫鎖(寫鎖為獨占鎖)
如果以上判斷均通過,則在低16為寫鎖同步狀態上利用CAS進行修改(增加寫鎖同步狀態,實現可重入)
將當前線程設置為寫鎖的獲取線程
寫鎖的釋放過程與獨占鎖基本相同:
在釋放的過程中,不斷減少讀鎖同步狀態,只為同步狀態為0時,寫鎖完全釋放。
######讀鎖的獲取與釋放
讀鎖是一個共享鎖,獲取讀鎖的步驟如下:
獲取當前同步狀態
計算高16為讀鎖狀態+1后的值
如果大于能夠獲取到的讀鎖的最大值,則拋出異常
如果存在寫鎖并且當前線程不是寫鎖的獲取者,則獲取讀鎖失敗
如果上述判斷都通過,則利用CAS重新設置讀鎖的同步狀態
讀鎖的獲取步驟與寫鎖類似,即不斷的釋放寫鎖狀態,直到為0時,表示沒有線程獲取讀鎖。
在JDK1.6以后,讀鎖的實現比上述過程更加復雜,有興趣的同學可以看一下最新的后去讀鎖的源碼。
用AQS寫一個實例:
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock;public class MyLock implements Lock {private Helper helper=new Helper();private class Helper extends AbstractQueuedSynchronizer{//獲取鎖 @Overrideprotected boolean tryAcquire(int arg) {int state=getState();if(state==0){//利用CAS原理修改stateif(compareAndSetState(0,arg)){//設置當前線程占有資源 setExclusiveOwnerThread(Thread.currentThread());return true;}}else if(getExclusiveOwnerThread()==Thread.currentThread()){setState(getState()+arg);return true;}return false;}//釋放鎖 @Overrideprotected boolean tryRelease(int arg) {int state=getState()-arg;boolean flag=false;//判斷釋放后是否為0if(state==0){setExclusiveOwnerThread(null);setState(state);return true;}setState(state);//存在線程安全嗎?重入性的問題,當前已經獨占了資源()statereturn false;}public Condition newConditionObjecct(){return new ConditionObject();}}@Overridepublic void lock() {helper.acquire(1);}@Overridepublic void lockInterruptibly() throws InterruptedException {helper.acquireInterruptibly(1);}@Overridepublic boolean tryLock() {return helper.tryAcquire(1);}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return helper.tryAcquireNanos(1,unit.toNanos(time));}@Overridepublic void unlock() {helper.release(1);}@Overridepublic Condition newCondition() {return helper.newConditionObjecct();} } public class Demo01 {private MyLock lock=new MyLock();private int m=0;public int next(){lock.lock();try {return m++;} finally {lock.unlock();}}public static void main(String[] args) {Demo01 demo=new Demo01();Thread[] th=new Thread[20];for (int i = 0; i < 20; i++) {th[i]=new Thread(()->{System.out.println(demo.next());});th[i].start();}} } public class Demo02 { //可重入private MyLock lock=new MyLock();private int m=0;public void a(){lock.lock();System.out.println("a");b();lock.unlock();}public void b(){lock.lock();System.out.println("b");lock.unlock();}public static void main(String[] args) {Demo02 demo=new Demo02();new Thread(()->{demo.a();}).start();} } import java.util.concurrent.locks.ReentrantLock;public class Demo03 {private ReentrantLock lock=new ReentrantLock();private int m=0;public void a(){lock.lock();System.out.println("a");b();lock.unlock();}public void b(){lock.lock();System.out.println("b");lock.unlock();}public static void main(String[] args) {Demo03 demo=new Demo03();new Thread(()->{demo.a();}).start();} }CountDownLatch:
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit;public class FightQueryDemo {private static List<String> company= Arrays.asList("東方航空","南方航空","海南航空");private static List<String> fightList=new ArrayList<>();public static void main(String[] args) throws InterruptedException{String origin="BJ";String dest="SH";Thread[] threads=new Thread[company.size()]; CountDownLatch latch=new CountDownLatch(company.size());for (int i = 0; i < threads.length; i++) {String name=company.get(i);threads[i]=new Thread(()->{System.out.printf("%s 查詢從%s到%s的機票\n",name,origin,dest);//隨機產生票數int val=new Random().nextInt(10);try {TimeUnit.SECONDS.sleep(val);fightList.add(name+"--"+val);System.out.printf("%s公司查詢成功!\n",name);latch.countDown();} catch (InterruptedException e) {e.printStackTrace();}});threads[i].start();} latch.await();System.out.println("==============查詢結果如下:================");fightList.forEach(System.out::println);} }CyclicBarrier
import java.util.Random; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit;public class RaceDemo {public static void main(String[] args) { CyclicBarrier barrier=new CyclicBarrier(8);Thread[] play=new Thread[8];for (int i = 0; i < 8; i++) {play[i]=new Thread(()->{try {TimeUnit.SECONDS.sleep(new Random().nextInt(10));System.out.println(Thread.currentThread().getName()+"準備好了"); barrier.await();} catch (Exception e) {e.printStackTrace();}System.out.println("選手"+Thread.currentThread().getName()+"起跑");},"play["+i+"]");play[i].start();}} }?
轉載于:https://www.cnblogs.com/yintingting/p/11427824.html
總結