生活随笔
收集整理的這篇文章主要介紹了
Java AQS
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
AQS
- AbstractQueuedSynchronizer (AQS)抽象的隊(duì)列式的同步器,AQS定義了一套多線程訪問共享資源的同步器框架,許多同步類實(shí)現(xiàn)都依賴于它,比如ReentrantLock/Semaphore/CountDownLatch
- 它維護(hù)了一個(gè)volatile int state(代表共享資源)和一個(gè)FIFO線程等待隊(duì)列(多線程爭用資源被阻塞時(shí)會進(jìn)入此隊(duì)列)。這里volatile是核心關(guān)鍵詞,具體volatile的語義
- state的訪問方式有三種: getState() 、setState() 、compareAndSetState()
- AQS定義兩種資源共享方式:Exclusive(獨(dú)占,只有一個(gè)線程能執(zhí)行,如ReentrantLock)和Share (共享,多個(gè)線程可同時(shí)執(zhí)行,例如Semaphore/CountDownLatch)不同的自定義同步器爭用共享資源的方式也不同
- 線程搶占同一份資源,只有被標(biāo)桿節(jié)點(diǎn)選中的才可以訪問資源,其余的進(jìn)入排隊(duì)隊(duì)列,如果是公平鎖,則按照先后順序進(jìn)行對于資源的訪問;如果是非公平鎖,則當(dāng)標(biāo)桿節(jié)點(diǎn)釋放完之后,大家開始搶占資源,誰搶到算誰的,沒有先來后到之分
自定義的同步容器
- 自定義同步器在實(shí)現(xiàn)時(shí)只需要實(shí)現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊(duì)列的維護(hù)(如獲取資源失敗入隊(duì)/喚醒出隊(duì)等),AQS已經(jīng)在底層實(shí)現(xiàn)好了
主要實(shí)現(xiàn)以下幾種方法
- isHeldExclusively():該線程是否正在獨(dú)占資源。只有用到condition才需要去實(shí)現(xiàn)它
- tryAcquire(int):獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false?
- tryRelease(int):獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false
- tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失敗;0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源
- tryReleaseShared(int):共享方式。嘗試釋放資源,成功則返回true,失敗則返回false
例子
- 以ReentrantLock為例,state初始化為0,表示未鎖定狀態(tài)。A線程lock()時(shí),會調(diào)用tryAcquire()獨(dú)占該鎖并將state+1。此后,其他線程再tryAcquire()時(shí)就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機(jī)會獲取該鎖。當(dāng)然,釋放鎖之前,A線程自己是可以重復(fù)獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多么次,這樣才能保證state是能回到零態(tài)的
- 重入鎖,在需要進(jìn)行同步的代碼部分加上鎖定,但不要忘記最后一定要釋放鎖定,不然會造成鎖永遠(yuǎn)無法釋放,其他線程永遠(yuǎn)進(jìn)不來的結(jié)果
- t2搶占線程,只有等t2線程結(jié)束,線程t1才有資格搶占資源
package com.example.core.aqs;import java.util.concurrent.locks.ReentrantLock;public class UseReentrantLock {private ReentrantLock reentrantLock = new ReentrantLock();public void method(){reentrantLock.lock();try{System.out.println("當(dāng)前線程:"+Thread.currentThread().getName()+"進(jìn)入...");Thread.sleep(2000);System.out.println("當(dāng)前線程:"+Thread.currentThread().getName()+"退出...");}catch(InterruptedException e){e.printStackTrace();}finally {reentrantLock.unlock();}}public static void main(String[] args) {UseReentrantLock useLock = new UseReentrantLock();Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {useLock.method();}},"t1");Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {useLock.method();}},"t2");t1.start();t2.start();}
}
/*
output:
當(dāng)前線程:t2進(jìn)入...
當(dāng)前線程:t2退出...
當(dāng)前線程:t1進(jìn)入...
當(dāng)前線程:t1退出...*/
公平鎖和非公平鎖
- Lock lock = new ReentrantLock(boolean isFair);
lock用法
- tryLock():嘗試獲得鎖,獲得結(jié)果用true/false返回
- tryLock():在給定的時(shí)間內(nèi)嘗試獲得鎖,獲得結(jié)果用true/false返回
- isFair():是否是公平鎖
- isLocked():是否鎖定
- getHoldCount(): 查詢當(dāng)前線程保持此鎖的個(gè)數(shù),也就是調(diào)用lock()次數(shù)
- lockInterruptibly():優(yōu)先響應(yīng)中斷的鎖
- getQueueLength():返回正在等待獲取此鎖定的線程數(shù)
- getWaitQueueLength():返回等待與鎖定相關(guān)的給定條件Condition的線程數(shù)
- hasQueuedThread(Thread thread): 查詢指定的線程是否正在等待此鎖
- hasQueuedThreads():查詢是否有線程正在等待此鎖
- hasWaiters():查詢是否有線程正在等待與此鎖定有關(guān)的condition條件
- 再以CountDownLatch以例,任務(wù)分為N個(gè)子線程去執(zhí)行,state也初始化為N(注意N要與線程個(gè)數(shù)一致)。這N個(gè)子線程是并行執(zhí)行的,每個(gè)子線程執(zhí)行完后countDown()一次,state會CAS減1。等到所有子線程都執(zhí)行完后(即state=0),會unpark()調(diào)用線程,然后主調(diào)用線程就會從await()函數(shù)返回,繼續(xù)后余動作
AQS Condition
- 使用synchronized的時(shí)候,如果需要多線程間進(jìn)行協(xié)作工作則需要Object的wait()和notify()、notifyAll()方法進(jìn)行配合工作
- 那么同樣,我們在使用Lock的時(shí)候,可以使用一個(gè)新的等待/通知的類,它就是Condition。這個(gè)Condition一定是針對具體某一把鎖的。也就是在只有鎖的基礎(chǔ)之上才會產(chǎn)生Condition
- 我們可以通過一個(gè)Lock對象產(chǎn)生多個(gè)Condition進(jìn)行多線程間的交互,非常的靈活。可以使得部分需要喚醒的線程喚醒,其他線程則繼續(xù)等待通知
使用一個(gè)條件
- t1線程先執(zhí)行,進(jìn)入等待的時(shí)候,釋放鎖,t2才可以得以執(zhí)行,t2線程開始執(zhí)行,對于t1線程發(fā)出喚醒通知,t1得以繼續(xù)執(zhí)行,最后釋放鎖
package com.example.core.aqs;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class UseCondition {//現(xiàn)在有一把鎖private Lock lock = new ReentrantLock(); //synchronized wait ---- notify//基于這把鎖產(chǎn)生了一個(gè) condition: 作用是對于這把鎖的 喚醒 和 等待操作private Condition condition = lock.newCondition();public void method1(){lock.lock();try {System.out.println("當(dāng)前線程:" + Thread.currentThread().getName() + "進(jìn)入等待狀態(tài)..");Thread.sleep(3000);System.out.println("當(dāng)前線程:" + Thread.currentThread().getName() + "釋放鎖..");condition.await();System.out.println("當(dāng)前線程:" + Thread.currentThread().getName() +"繼續(xù)執(zhí)行...");} catch (Exception e) {e.printStackTrace();} finally {System.err.println(Thread.currentThread().getName() + " unlock");lock.unlock();}}public void method2(){lock.lock();try {System.out.println("當(dāng)前線程:" + Thread.currentThread().getName() + "進(jìn)入..");Thread.sleep(3000);System.out.println("當(dāng)前線程:" + Thread.currentThread().getName() + "發(fā)出喚醒..");condition.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public static void main(String[] args) throws Exception {final UseCondition uc = new UseCondition();Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {uc.method1();}}, "t1");Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {uc.method2();}}, "t2");t1.start();Thread.sleep(1);t2.start();}
}
/*
output
當(dāng)前線程:t1進(jìn)入等待狀態(tài)..
當(dāng)前線程:t1釋放鎖..
當(dāng)前線程:t2進(jìn)入..
當(dāng)前線程:t2發(fā)出喚醒..
當(dāng)前線程:t1繼續(xù)執(zhí)行...
t1 unlock*/
使用多個(gè)條件
- 創(chuàng)建兩個(gè)條件,條件c1和c2,c1條件受制于t1和t2線程,由t3線程進(jìn)行喚醒;c2條件受制于t3線程,由t5線程進(jìn)行喚醒
package com.example.core.aqs;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class UseManyCondition {private Lock lock = new ReentrantLock();private Condition c1 = lock.newCondition();private Condition c2 = lock.newCondition();public void m1(){try {lock.lock();System.out.println("當(dāng)前線程:" +Thread.currentThread().getName() + "進(jìn)入方法m1等待..");c1.await();c1條件 由m4喚醒System.out.println("當(dāng)前線程:" +Thread.currentThread().getName() + "方法m1繼續(xù)..");} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void m2(){try {lock.lock();System.out.println("當(dāng)前線程:" +Thread.currentThread().getName() + "進(jìn)入方法m2等待..");c1.await();//c1條件 由m4喚醒System.out.println("當(dāng)前線程:" +Thread.currentThread().getName() + "方法m2繼續(xù)..");} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void m3(){try {lock.lock();System.out.println("當(dāng)前線程:" +Thread.currentThread().getName() + "進(jìn)入方法m3等待..");c2.await();c2條件 由m5喚醒System.out.println("當(dāng)前線程:" +Thread.currentThread().getName() + "方法m3繼續(xù)..");} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void m4(){try {lock.lock();System.out.println("當(dāng)前線程:" +Thread.currentThread().getName() + "喚醒..");c1.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void m5(){try {lock.lock();System.out.println("當(dāng)前線程:" +Thread.currentThread().getName() + "喚醒..");c2.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public static void main(String[] args) {final UseManyCondition umc = new UseManyCondition();Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {umc.m1();}},"t1");Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {umc.m2();}},"t2");Thread t3 = new Thread(new Runnable() {@Overridepublic void run() {umc.m3();}},"t3");Thread t4 = new Thread(new Runnable() {@Overridepublic void run() {umc.m4();}},"t4");Thread t5 = new Thread(new Runnable() {@Overridepublic void run() {umc.m5();}},"t5");t1.start();t2.start();t3.start();try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}t4.start();try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}t5.start();}
}
/*
output
當(dāng)前線程:t1進(jìn)入方法m1等待..
當(dāng)前線程:t3進(jìn)入方法m3等待..
當(dāng)前線程:t2進(jìn)入方法m2等待..
當(dāng)前線程:t4喚醒..
當(dāng)前線程:t1方法m1繼續(xù)..
當(dāng)前線程:t2方法m2繼續(xù)..
當(dāng)前線程:t5喚醒..
當(dāng)前線程:t3方法m3繼續(xù)..*/
AQS-ReentrantReadWriteLock
- 讀寫鎖ReentrantReadWriteLock,其核心就是實(shí)現(xiàn)讀寫分離的鎖。在高并發(fā)訪問下,尤其是讀多寫少的情況下,性能要遠(yuǎn)高于重入鎖。
- 之前學(xué)synchronized、ReentrantLock時(shí),我們知道,同一時(shí)間內(nèi),只能有一個(gè)線程進(jìn)行訪問被鎖定的代碼,那么讀寫鎖則不同,其本質(zhì)是分成兩個(gè)鎖,即讀鎖、寫鎖。在讀鎖下,多個(gè)線程可以并發(fā)的進(jìn)行訪問,但是在寫鎖的時(shí)候,只能一個(gè)一個(gè)的順序訪問
- 口訣:讀讀共享,寫寫互斥,讀寫互斥
- t1和t2都是讀鎖,t3是寫鎖,t1和t2可以并行執(zhí)行,他們與t3之間不可以同時(shí)執(zhí)行。讀讀共享,寫寫互斥,讀寫互斥
package com.example.core.aqs;import java.util.concurrent.locks.ReentrantReadWriteLock;public class UseReadWriteLock {private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();private ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();private ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();public void read(){readLock.lock();try{System.out.println("當(dāng)前線程 "+ Thread.currentThread().getName()+ " 進(jìn)入了讀方法");Thread.sleep(3000);System.out.println("當(dāng)前線程 "+ Thread.currentThread().getName()+ " 退出了讀方法");}catch (Exception e){e.printStackTrace();}finally{readLock.unlock();}}public void write(){writeLock.lock();try{System.out.println("當(dāng)前線程 "+ Thread.currentThread().getName()+ " 進(jìn)入了寫方法");Thread.sleep(3000);System.out.println("當(dāng)前線程 "+ Thread.currentThread().getName()+ " 退出了寫方法");}catch (Exception e){e.printStackTrace();}finally{writeLock.unlock();}}public static void main(String[] args) {UseReadWriteLock rwLock = new UseReadWriteLock();Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {rwLock.read();}},"t1");Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {rwLock.read();}},"t2");Thread t3 = new Thread(new Runnable() {@Overridepublic void run() {rwLock.write();}},"t3");t1.start();t2.start();t3.start();}
}
/*
output:
當(dāng)前線程 t3 進(jìn)入了寫方法
當(dāng)前線程 t3 退出了寫方法
當(dāng)前線程 t1 進(jìn)入了讀方法
當(dāng)前線程 t2 進(jìn)入了讀方法
當(dāng)前線程 t1 退出了讀方法
當(dāng)前線程 t2 退出了讀方法*/
LockSupport
- 提供park()和unpark()方法實(shí)現(xiàn)阻塞線程和解除線程阻塞,實(shí)現(xiàn)的阻塞和解除阻塞是基于”許可(permit)”作為關(guān)聯(lián),permit相當(dāng)于一個(gè)信號量(0,1),默認(rèn)是0. 線程之間不再需要一個(gè)Object或者其它變量來存儲狀態(tài),不再需要關(guān)心對方的狀態(tài)
- LockSupport不需要在同步代碼塊里 。所以線程間也不需要維護(hù)一個(gè)共享的同步對象了,實(shí)現(xiàn)了線程間的解耦.
- unpark函數(shù)可以先于park調(diào)用,所以不需要擔(dān)心線程間的執(zhí)行的先后順序
package com.example.core.aqs;import java.util.concurrent.locks.LockSupport;public class UseLockSupport {public static void main(String[] args) throws Exception {Thread A = new Thread(new Runnable() {@Overridepublic void run() {int sum = 0;for(int i=0;i<10;i++){sum+=i;}try {Thread.sleep(3000);} catch (Exception e) {e.printStackTrace();}LockSupport.park(); //滯后的System.out.println(sum);}});A.start();//后阻塞:Thread.sleep(1000);LockSupport.unpark(A); //優(yōu)先的}
}
package com.example.core.aqs;public class UseObjectLock {public static void main(String[] args) throws Exception {Object lock = new Object();Thread A = new Thread(new Runnable() {@Overridepublic void run() {int sum = 0;for(int i=0;i<10;i++){sum+=i;}synchronized (lock) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(sum);}});A.start();//后阻塞:Thread.sleep(1000);synchronized (lock) {lock.notify();}}
}
AQS-鎖優(yōu)化
- 避免死鎖
- 減小鎖的持有時(shí)間?
- 減小鎖的粒度?
- 鎖的分離?
- 盡量使用無鎖的操作,如原子操作(Atomic系列類),volatile關(guān)鍵字
acquire(int)
- 此方法是獨(dú)占模式下線程獲取共享資源的頂層入口。如果獲取到資源,線程直接返回,否則進(jìn)入等待隊(duì)列,直到獲取到資源為止,且整個(gè)過程忽略中斷的影響。這也正是lock()的語義,當(dāng)然不僅僅只限于lock()。獲取到資源后,線程就可以去執(zhí)行其臨界區(qū)代碼了
- tryAcquire()嘗試直接去獲取資源,如果成功則直接返回
- addWaiter()將該線程加入等待隊(duì)列的尾部,并標(biāo)記為獨(dú)占模式
- acquireQueued()使線程在等待隊(duì)列中獲取資源,一直獲取到資源后才返回。如果在整個(gè)等待過程中被中斷過,則返回true,否則返回false
- 如果線程在等待過程中被中斷過,它是不響應(yīng)的。只是獲取資源后才再進(jìn)行自我中斷selfInterrupt(),將中斷補(bǔ)上
?
總結(jié)
以上是生活随笔為你收集整理的Java AQS的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。