JDK并发包1-1
https://www.jianshu.com/p/ef342bc21f7e多線程的控制并不是一個非常簡單的事情,一般意義上呢,最基本的多線程控制,用synchronized關鍵字,和object.wait,和object.notify,這些操作,那這個在之前的課程當中呢,也已經又給大家介紹,我們在這里會介紹一些更加高級的工具,這些高級的工具呢,他首先在功能上要比synchronized要高級一些,其實在使用上面來說呢,它會封裝一些更加常用的一些場景,使大家使用更加快捷,同時寫JDK開發包的呢,都是一些比較厲害的人物,所以相對來講,他們的實現呢,會比大家自己去實現一個類似的功能呢,性能上和效果上要好的多,首先第一個我們想要給大家介紹的呢,是ReentrantLock,是重入鎖,它是synchronized的一個替代品,或者說它是一個增強版,synchronized關鍵字,特點是使用簡單,但是功能上是比較薄弱的,因為他只能做到說,多個線程進行臨界區訪問的時候呢,不能進入臨界區的線程進行一個等待,這個等待是一個死等,只有前面的線程離開臨界區以后呢,才能進去,但是ReentranLock給我們提供了更多的選擇性,我們在JDK1.5之前呢,JAVA虛擬機對synchronized的優化并不會充分,RenntrantLock的性能要好于synchronized關鍵字,但是在做了充分優化之后,現在的JDK的版本當中,其實兩者性能是不相上下的,所以如果只是一個簡單功能的實現呢,沒有必要刻意去追求比較高級的功能,ReentrantLock我們主要看哪些方面,他除了實現普通的鎖的功能之外,他還實現了比如可中斷,可限時,公平鎖,可重入,這些特點,什么是可重入呢,ReentrantLock對于同一個線程來講,否則會出現一個線程把自己給卡死
package com.learn.thread;import java.util.concurrent.locks.ReentrantLock;public class ReentrantLockDemo implements Runnable{public static ReentrantLock lock=new ReentrantLock();public static int i=0;@Overridepublic void run() {for(int j=0;j<100000;j++) {lock.lock();try {i++;}finally {lock.unlock();}}}public static void main(String[] args) throws InterruptedException {ReentrantLockDemo demo=new ReentrantLockDemo();Thread t1=new Thread(demo);Thread t2=new Thread(demo);t1.start();t2.start();t1.join();t2.join();System.out.println(i);}
}
我們在多個線程要對i做++操作,大家可以知道,直接對這個變量做++操作呢,一定不是線程安全的,所以我們加鎖,這個做就是ReentrantLock,我們現在有兩個線程,都會對他去做一個加加操作,我們每一次只允許一個對他做++,所以我們每次在做加之前,我們會做一個lock操作,在這個鎖上做一個lock操作,離開之后要做unlock操作,這個寫法也是使用ReentantLock的一個基本范式,必須這樣寫,為什么要把unlock寫到finally里面呢,防止發生意外,導致你這個鎖沒有釋放掉,你這個鎖沒有釋放掉后果是很嚴重的,會導致你這個添加的線程沒法進來,萬一你在執行過程當中,給拋了個異常,異常你又沒有處理,那這個時候后果就比較嚴重,你寫finally里面呢,不管里面是否有異常發生,發生什么情況,finally在我這個程序退出之前呢,總是會給執行一下的,在這個地方把鎖給釋放掉,相對于synchronized來講呢,對synchronized來說,他的這個鎖的釋放呢,是虛擬機完成的自動的動作,我們只要把synchronized的括號給括起來,就可以了,語法上檢查是通過的,那synchronized就能保證鎖能夠被釋放掉,ReentrantLock是由程序決定是在什么時候釋放鎖,那從這點來講,它提供了鎖的一個靈活性,在任何場景下來釋放,但是同樣道理,靈活性付出的代價呢,你要格外小心,不能忘記把鎖給釋放掉,如果你忘了釋放這個地方,其他的就永遠進不來了,各自加了這么多次,它是沒有任何不安全性發生,如果是不安全,那么會小于這個數字,這里是一個重入,如果說你不幸很意外的,在線程中兩次對這個鎖加鎖,如果兩次對這個鎖進行加鎖之后呢,你許可的數量就變成2,如果出現這種情況,那么你必須對這個鎖釋放兩次,大家可能會覺得有點奇怪,從我們這個直觀意義上呢,我一個線程取一次鎖,這是很自然的事情,我不當心的情況,我又再取得一次,這不是什么大的問題,當我不需要使用這個鎖的時候呢,把它釋放掉,那我也只需要執行一次,那也就是可以了,如果我們在這種情況之下,沃我們只執行一次unlock會是什么結果,我們可以看到,這個程序我已經開啟來了,開了很長時間了,但是沒有停止,因為有一個線程卡死在里邊了,所以他是不會把這個i給打印出來的,這個時候是會有一個線程在等待的現象,是因為你前面的線程只釋放了一次unlock,導致其他線程就進不來了,對于重入鎖來講,lock了幾次,你就必須要釋放幾次,如果我們lock了兩次,我們就釋放兩次,那就沒有問題了,這是重入鎖的一個特點,如果你一個線程拿了兩個許可,你得釋放兩次,下面我們來看一個比較好的功能,叫做可中斷,重入鎖它是可以被中斷的,他不像synchronized關鍵字一樣,對中斷是沒有響應的,對中斷沒有響應的一個后果呢,如果你發生了死鎖,或者長期等待的情況,不一定死鎖產生了長期等待的情況,你前一個線程因為某一些原因沒有完成某些操作,導致后面的線程要做一個長期的等待,那么你在長期等待過程當中,我們想看到的一個現象,我們希望這個線程停下來,這個時候一個可行的辦法呢,發送一個中斷信號,讓這個線程停下來,重入鎖就提供這個功能說,我在 加鎖的同時,可以去響應你的中斷,如果我發生了死鎖,如果發生了一些意外發生的情況,我在一個鎖上卡了很久,那我還有一個辦法把你這個線程給喚醒,不至于永久性的卡死下去
package com.learn.thread;import java.util.concurrent.locks.ReentrantLock;public class ReentrantLockInterruptDemo implements Runnable{public static ReentrantLock lock1=new ReentrantLock();public static ReentrantLock lock2=new ReentrantLock();int lock;public ReentrantLockInterruptDemo(int lock) {this.lock=lock;}@Overridepublic void run() {try {if(lock==1) {lock1.lockInterruptibly();try {Thread.sleep(500);}catch (InterruptedException e) {e.printStackTrace();}lock2.lockInterruptibly();}else {lock2.lockInterruptibly();try {Thread.sleep(500);}catch (InterruptedException e) {e.printStackTrace();}lock1.lockInterruptibly();}}catch (InterruptedException e) {e.printStackTrace();}finally {if(lock1.isHeldByCurrentThread()) {lock1.unlock();}if(lock2.isHeldByCurrentThread()) {lock2.unlock();}System.out.println(Thread.currentThread().getId()+":線程退出");}}public static void main(String[] args) throws InterruptedException {ReentrantLockInterruptDemo r1=new ReentrantLockInterruptDemo(1);ReentrantLockInterruptDemo r2=new ReentrantLockInterruptDemo(2);Thread t1=new Thread(r1);Thread t2=new Thread(r2);t1.start();t2.start();Thread.sleep(1000);DeadlockChecker.check();}}
package com.learn.thread;import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;/** 檢查死鎖 */
public class DeadlockChecker {private final static ThreadMXBean mbean = ManagementFactory.getThreadMXBean();final static Runnable deadLockChecker = new Runnable() {@Overridepublic void run() {while(true){long[] deadLockedThreadIds = mbean.findDeadlockedThreads();if(deadLockedThreadIds != null){ThreadInfo[] threadInfos = mbean.getThreadInfo(deadLockedThreadIds);for(Thread t : Thread.getAllStackTraces().keySet()){for(ThreadInfo ti : threadInfos){if(ti.getThreadId() == t.getId()){t.interrupt();}}}}try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}}};public static void check() {Thread t = new Thread(deadLockChecker);t.setDaemon(true);t.start();}
}
lockInterruptibly表示一個可中斷的加鎖,只有這樣去加鎖,他才會響應中斷,我們去申請一個鎖,除非當前線程被中斷,中斷之后他就會拋出一個異常,那我們來看一下這個程序,這個程序有兩個鎖,我想在這個程序當中去構造一個死鎖的現象,線程我們也開兩個,這里是一個實例變量,不是一個靜態變量,我兩個實例可以賦值兩個不同的值,那當我lock等于1的時候呢,這樣線程1鎖了1,線程2先鎖了2,同時線程1申請lock2,是很明顯的一個死鎖,對于這個死鎖來講,如果你使用lock方法呢,就不太有辦法去幫他解開了,但是如果你使用Interruptbly,那你就可以把兩個線程當一個中斷,這個程序依然可以順利的結束,死鎖必然會產生一個無限期等待
package com.learn.thread;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;public class TimedLock implements Runnable{public static ReentrantLock lock=new ReentrantLock();@Overridepublic void run() {try {if(lock.tryLock(5, TimeUnit.SECONDS)) {Thread.sleep(6000);}else {System.out.println("get lock failed");}}catch (InterruptedException e) {e.printStackTrace();}finally {if(lock.isHeldByCurrentThread())lock.unlock();}}public static void main(String[] args) {TimedLock ins=new TimedLock();Thread t1=new Thread(ins);Thread t2=new Thread(ins);t1.start();t2.start();}}
釋放占用的鎖,使得別人可以獲得鎖,trylock有兩個參數,準備在這個鎖上等多久,比如這里就等5秒中,第二個是單位,這里是5秒,如果你要改成5毫秒,那這里就改成毫秒,另外一個是公平鎖,ReentrantLock內部是支持公平鎖的,參數是fair,/*** Creates an instance of {@code ReentrantLock} with the* given fairness policy.** @param fair {@code true} if this lock should use a fair ordering policy*/
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
}什么叫公平鎖呢,我這個鎖可以保證線程先來先到,后來者后得,在一般意義上呢,我們這個鎖是不公平的,我先來申請鎖的線程,未必會先拿到鎖,我后面來的線程未必后拿到鎖,可能我后面的鎖運氣好一點,我反而先拿到鎖,有可能會拿到某些線程,拿不到鎖,產生饑餓現象,公平鎖是不會有問題,對于公平鎖來說,先到的線程他一定會先拿到鎖,后到的線程會后拿到鎖,公平鎖雖然不會產生饑餓,但是公平鎖的性能呢,是要比非公平鎖性能差很多,因為公平鎖還要處理一個排隊的問題,所以如果說沒有特別需求,我們不一定要去使用公平的狀態,默認情況下這個鎖是非公平的,那這里是ReentrantLock所提供的一些功能,下面我們來看一下和重入鎖相關的一個概念condition,ReentrantLock和Condition之間的關系呢,如同synchronized和object.wait,object.notify,之間的關系,之前我們已經有了比較清楚地了解,你要去wait或者notify一個線程,那你就要獲得這個monitor監視器的所有權,如果你沒有得到monitor的所有權,不能做notify,那么同樣的道理,我們condition是相當于說,在某一個鎖上面,去做這個wait和notify,你也要去獲得這個鎖,Condition什么意思呢,如果之前把wait和notify清楚的話,Condition意思是和他一樣的,但是不同的是它是和ReentrantLock一起使用,而這兩個是和synchronized一起使用,一個是monitor,一個是ReentrantLock,那接口也是非常非常類似,一個是wait和notify,一個是await,等待在Condition上,signal通知,通知等待在Condition上的線程,也有signalAll,這個也notifyAll相同的意思,比較強大的是說,wait也是有一個等待時間public final void wait(long timeout, int nanos) throws InterruptedException {if (timeout < 0) {throw new IllegalArgumentException("timeout value is negative");}if (nanos < 0 || nanos > 999999) {throw new IllegalArgumentException("nanosecond timeout value out of range");}if (nanos > 0) {timeout++;}wait(timeout);
}https://www.cnblogs.com/ten951/p/6212127.html
package com.learn.thread;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class ReenterLockCondition implements Runnable {public static ReentrantLock lock = new ReentrantLock();public static Condition condition = lock.newCondition();/*** When an object implementing interface <code>Runnable</code> is used* to create a thread, starting the thread causes the object's* <code>run</code> method to be called in that separately executing* thread.* <p>* The general contract of the method <code>run</code> is that it may* take any action whatsoever.** @see Thread#run()*/@Overridepublic void run() {try {lock.lock();condition.await();System.out.println("Thread is going on");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public static void main(String[] args) throws InterruptedException {ReenterLockCondition r1 = new ReenterLockCondition();Thread t1 = new Thread(r1);t1.start();Thread.sleep(2000);lock.lock();condition.signal();lock.unlock();}
}
使得當前線程被掛起,等待操作,信號量是什么呢,對于我們這個鎖來講,它是互斥的,排他的,它是exclusive的,我進去了,沒有人再能進去,它是絕對嚴格的保護,當我有一個線程進了這個區間之后呢,另外不可能再有線程能夠進入到這個區間里面去,信號量是一個許可為一的,信號量是我允許若干個線程,進入到這個區間里面來,臨界區里面來,但是超過我許可范圍的線程呢,必須等待,他可以認為是一個廣義上的鎖,也可以認為是一個共享鎖,他可以有多個線程共享去使用這個臨界區,比如我們這個信號量當中,假使有10個許可,每一個許可可以分配給10個線程,當然一個線程也可以那兩三個許可,你可以根據業務的需求,每個線程可以拿幾個許可,比如10個線程,信號量允許多個線程進入臨界區,許可數量唯一的時候就相當于一把鎖,我要處理多個請求,如果系統的負載有限,只能同時處理10個請求的任務,超過10個我們就沒有能力處理,這個時候我們就可以使用信號量去控制,當有10個線程進來的時候呢,那么我就給他做執行,超過10個我就讓他做等待,這個是一個非常簡單的使用系統做這個功能控制,信號量是一個共享鎖
/*** Acquires a permit from this semaphore, blocking until one is* available, or the thread is {@linkplain Thread#interrupt interrupted}.** <p>Acquires a permit, if one is available and returns immediately,* reducing the number of available permits by one.** <p>If no permit is available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* one of two things happens:* <ul>* <li>Some other thread invokes the {@link #release} method for this* semaphore and the current thread is next to be assigned a permit; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread.* </ul>** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting* for a permit,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** @throws InterruptedException if the current thread is interrupted*/
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}也可以讓一個線程拿多個許可/*** Acquires the given number of permits from this semaphore,* blocking until all are available,* or the thread is {@linkplain Thread#interrupt interrupted}.** <p>Acquires the given number of permits, if they are available,* and returns immediately, reducing the number of available permits* by the given amount.** <p>If insufficient permits are available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* one of two things happens:* <ul>* <li>Some other thread invokes one of the {@link #release() release}* methods for this semaphore, the current thread is next to be assigned* permits and the number of available permits satisfies this request; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread.* </ul>** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting* for a permit,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.* Any permits that were to be assigned to this thread are instead* assigned to other threads trying to acquire permits, as if* permits had been made available by a call to {@link #release()}.** @param permits the number of permits to acquire* @throws InterruptedException if the current thread is interrupted* @throws IllegalArgumentException if {@code permits} is negative*/
public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);
}申請給定量許可,阻塞直到所有的許可可用https://www.cnblogs.com/ten951/p/6212132.html
package com.learn.thread;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;public class SemapDemo implements Runnable {final Semaphore semp = new Semaphore(5);//允許五個許可/*** When an object implementing interface <code>Runnable</code> is used* to create a thread, starting the thread causes the object's* <code>run</code> method to be called in that separately executing* thread.* <p>* The general contract of the method <code>run</code> is that it may* take any action whatsoever.** @see Thread#run()*/@Overridepublic void run() {try {semp.acquire();Thread.sleep(2000);System.out.println(Thread.currentThread().getId() + ":done!");semp.release();} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {ExecutorService exec = Executors.newFixedThreadPool(20);//容量為20的線程池final SemapDemo demo = new SemapDemo();for (int i = 0; i < 20; i++) {exec.submit(demo);}}
}
許可的釋放最好是寫到finally里面,萬一你忘記釋放了,你的許可就憑空蒸發掉了,永遠就沒有了,現在我有20個線程,但是我許可數量是5個,信號量構造的時候你可以指定許可,因此前5個線程去拿的時候呢,必然可以一下子拿到,進來就打印這個done,但是你后面的線程,進來的時候你未必可以馬上拿到,因為我這個線程要休眠兩秒鐘,這是一個比較長的時間,線程的提交是很快的,我一下子把20個線程提交上來了,但是我前5個在里面待2秒,在2秒鐘之內我只能做5個,另外的15個被等待,等待完了之后呢,后面5個進來之后呢,前5個一下子做完了,過兩秒鐘在一起刷一下,這個程序沒有結束,是因為這個線程池沒有結束掉,所有的線程不是Daemon線程,所以不會結束,事實上工作已經完成了,一個線程可以拿若干個許可,每個線程我給他拿兩個許可,這樣我們可以更加靈活的去共享我們的資源,應該分配給誰,信號量也是堆資源的一種分配,下面我們來看一下ReadWriteLock,讀寫鎖,為什么會有讀寫鎖這種東西呢,對于重入鎖來講,我們傳統的synchronized來說,他的鎖是部分線程功能的,有些時候讀和寫是兩種不同的操作,因此我們可以想象一下,如果在你讀這個數據當中,你不分青紅皂白都去加鎖,那對這個性能是有很大殺傷力的,如果你都去讀的,我們可以以一種比較開放的姿態去看這個問題,都是讀的線程你就不應該加鎖,大家都應該能進去,但是如果你有寫的線程發生,寫的事件發生,那在這種情況下,因為寫有可能修改數據,修改數據會導致你讀到的數據呢,會發生不一致,因此當有寫發生的時候呢,我們才有加鎖這個操作,因此從功能上面講,我們將這個鎖進行功能上的劃分,使得我們性能有很大的提高,并行度能夠提高,畢竟有一點,加鎖之后,并行度是1,也就是一次只有一個線程能夠進去,這完全不符合我們高并發的一個概念,高并發應該是一次有好多線程在跑,我一次只能跑一個線程,那只是傳統意義上的并發,ReadWriteLock才有一點點高并發的意思在里面,我允許你很多線程一起做read,我們講過一個無等待的阻塞,顯然我們的ReentrantLock,我們的Synchronized,都是阻塞的并行,無等待的并發,它會把這個線程掛起,ReadWriteLock如果沒有write線程發生,所有的read線程都是無等待的并發,JDK5提供的一個具有讀寫分離的一個鎖,我們看讀寫鎖的情況
我寫到一半,你會讀到不一致的情況,所以讀寫之間還是要做一些互斥的操作,下面我們來看一下CountDownLatch,從名字上可以看到,他就是一個倒數計時器,10,9,8,7,6,...0,是一個倒數,這個可以用到一個什么地方呢,比如一個非常典型的場景呢,就是在發射火箭的時候,在發射火箭之前,可能會進行一個各項的檢查,是不是符合發射的條件,那么每一項檢查呢,都可以看做有一個單獨的線程去執行的,那么在這個檢查的過程呢,每一個線程都有自己的檢查任務,如果我們一共有10個檢查項,假設我們有10個檢查項的話,每當一個線程完成自己的檢查任務之后呢,他就做一個countdown,自己任務就完成了,到達了他的一個執行目標,當所有的線程都完成任務了呢,我們的計時器就會清零,一共有10個線程去做這個事情,他每一個做完之后呢,他就會countdown,減到0之后呢,最終等待在countdown上的主線程,比如以火箭發射為例呢,那么主線程就是火箭發射本身,火箭發射發現所有的任務都完成了,這個時候await就會返回,不會再去等待,就返回了,就可以執行后面的一些事情,還有一個示意圖就是這樣子,主線程會在這個臨界線上做一個等待,等待發射
其他檢查任務可能就分別執行,過程可能也需要花費一些時間,并不會馬上就做完,因此主線程就在這里做等待,所有的檢查任務全部都到臨界點,全部都執行完畢之后,主線程才會到這個點上全部執行,所以countdownLatch可以簡單的看成是一個柵欄,整個線程按照時間的執行上面呢,畫了一條線,使得所有的線程都要到了那個點為止,我們的主線程,他才能夠繼續往下走
package com.learn.thread;import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class CountDownLatchDemo implements Runnable {static final CountDownLatch end = new CountDownLatch(10);static final CountDownLatchDemo demo = new CountDownLatchDemo();/*** When an object implementing interface <code>Runnable</code> is used* to create a thread, starting the thread causes the object's* <code>run</code> method to be called in that separately executing* thread.* <p>* The general contract of the method <code>run</code> is that it may* take any action whatsoever.** @see Thread#run()*/@Overridepublic void run() {try {Thread.sleep(new Random().nextInt(10) * 1000);System.out.println("check complete");end.countDown();//完成 可以減1} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws InterruptedException {ExecutorService exec = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {exec.submit(demo);}end.await();//主線程等待所有10個線程完成任務 才能繼續執行System.out.println("Fire!");exec.shutdown();}
}
我們有10個線程,有10個檢查任務,是需要在主線程開始之前,如果沒有跑完主線程會一直等待,在模擬一個任務或者檢查,檢查完畢之后會打印一個檢查完畢,然后countdown,表示自己完成了,做幾個countdown之后,程序啟動之后我們開啟10個線程,每個線程都會去做這個run,等到10個都跑完之后,這個countdown減掉之后呢,我們這個await才會返回,返回之后就發射火箭,因為每個線程都睡了隨機的時間,在這個線程完成之前,是不會做這個發射操作的,所有10個線程都完成了,那就做這個發射操作,所以這個應用場景,其實在我們的實際業務當中是非常普遍的,要等待他的準備業務完成之后才能操作,那么你這個準備業務怎么去通知主線程,我們就可以用countdown去通知,我都可以包裝在countdownlatch上面,下面是CyclicBarrier,循環柵欄,和CountDownLatch是非常相像的,Cyclic是循環,CountDownLatch只是一次計數,Cyclic他可以反復的使用,他可以一批一批的去執行,比如我要做10個線程,我第二批10個線程到了之后呢,我主線程再工作一次,第三個10個線程到了之后呢,我主線程還可以再工作一次,他就是循環的一個姿態,主要接口也非常的相像,等待所有的參與者都到達了之后,我才能夠繼續往下執行
當有一個士兵到了你不能叫做集合完成,當有一個士兵到達了之后,要等待其他的士兵全部到達,才叫集合完畢,所以這個士兵會等待所有的士兵到達,到達完畢之后呢,下達任務,所有士兵都會分別去執行自己的任務,當有一個士兵執行完成呢,我們并不能表明我們總體任務完成,我要等待所有的任務都執行完成了,我們才能說這個任務完成了,其實他們是可以復用同一個Cyclic的,這里10個等待完畢,這里10個任務完成,都用同一個實例https://www.cnblogs.com/ten951/p/6212160.html
package com.learn.thread;import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo {public static class Soldier implements Runnable {private String soldier;private final CyclicBarrier cyclic;public Soldier(CyclicBarrier cyclic, String soldier) {this.soldier = soldier;this.cyclic = cyclic;}/*** When an object implementing interface <code>Runnable</code> is used* to create a thread, starting the thread causes the object's* <code>run</code> method to be called in that separately executing* thread.* <p>* The general contract of the method <code>run</code> is that it may* take any action whatsoever.** @see Thread#run()*/@Overridepublic void run() {try {//等待所有士兵到齊cyclic.await();doWork();//等待所有士兵完成工作cyclic.await();} catch (InterruptedException e) {//在等待過程中,線程被中斷e.printStackTrace();} catch (BrokenBarrierException e) {//表示當前CyclicBarrier已經損壞.系統無法等到所有線程到齊了.e.printStackTrace();}}void doWork() {try {Thread.sleep(Math.abs(new Random().nextInt() % 10000));} catch (InterruptedException e) {e.printStackTrace();}System.out.println(soldier + ":任務完成");}}public static class BarrierRun implements Runnable {boolean flag;int N;public BarrierRun(boolean flag, int N) {this.flag = flag;this.N = N;}/*** When an object implementing interface <code>Runnable</code> is used* to create a thread, starting the thread causes the object's* <code>run</code> method to be called in that separately executing* thread.* <p>* The general contract of the method <code>run</code> is that it may* take any action whatsoever.** @see Thread#run()*/@Overridepublic void run() {if (flag) {System.out.println("司令:[士兵" + N + "個,任務完成!]");} else {System.out.println("司令:[士兵" + N + "個,集合完畢!]");flag = true;}}}public static void main(String[] args) {final int N = 10;Thread[] allSoldier = new Thread[N];boolean flag = false;CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));//設置屏障點,主要為了執行這個方法System.out.println("集合隊伍! ");for (int i = 0; i < N; i++) {System.out.println("士兵" + i + "報道! ");allSoldier[i] = new Thread(new Soldier(cyclic, "士兵" + i));allSoldier[i].start();}}
}
一旦調用lockSupport就會被掛起了,unpark可以繼續往下執行,他和suspend一樣,都是掛起,但是有個本質的不同,Support是可以毫無顧忌的去使用,但是suspend不是,是不建議使用的,LockSupport他有一個什么好處呢,他的思想有點類似于信號量的思想,我內部會產生一個許可的東西,那我park的時候呢,unpark就是去申請這個許可,因此他有一個特點是說,如果不幸的發生unpark,發現發生在park之前,那我這個park并不會把這個線程給阻塞,這個和suspend是不一樣的,如果resume發生在suspend之前,現在還是會被掛起,但是如果unpark發生在park之前,那我park是掛不住的
package com.learn.thread;import java.util.concurrent.locks.LockSupport;public class LockSupportDemo {public static Object u = new Object();static ChangeObjectThread t1 = new ChangeObjectThread("t1");static ChangeObjectThread t2 = new ChangeObjectThread("t2");public static class ChangeObjectThread extends Thread {public ChangeObjectThread(String name) {super.setName(name);}public void run() {synchronized (u) {System.out.println("in" + getName());LockSupport.park();}}}public static void main(String[] args) throws InterruptedException {t1.start();Thread.sleep(100);t2.start();LockSupport.unpark(t1);LockSupport.unpark(t2);t1.join();t2.join();}
}
就算我unpark發生在park之前,相當于許可被拿掉了,不會吧線程給阻塞住,unpark是使得一個許可可用,park會使得線程掛起,什么時候可以讓這個線程繼續往下走呢,第一種是調用unpark,第二種是中斷,大部分wait函數,他在中斷之后呢,都會拋出一個中斷異常,但是你要注意的是,park不會,他不會拋出中斷異常,這個操作是沒有異常拋出的,但是他會響應中斷,如果中斷發生,這個park會立即返回,另外線程中斷了當前線程,如果說這個地方被中斷了,但是從Park本身是不可以判斷是否被中斷的,park有這么一個特點,能夠響應中斷,但是不拋出異常,JDK內部是使用的非常廣泛的,它是一個比較底層的原理操作,把這個線程給掛起,內部也是使用了unsafe這個類,/*** Disables the current thread for thread scheduling purposes unless the* permit is available.** <p>If the permit is available then it is consumed and the call* returns immediately; otherwise the current thread becomes disabled* for thread scheduling purposes and lies dormant until one of three* things happens:** <ul>** <li>Some other thread invokes {@link #unpark unpark} with the* current thread as the target; or** <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread; or** <li>The call spuriously (that is, for no reason) returns.* </ul>** <p>This method does <em>not</em> report which of these caused the* method to return. Callers should re-check the conditions which caused* the thread to park in the first place. Callers may also determine,* for example, the interrupt status of the thread upon return.*/
public static void park() {UNSAFE.park(false, 0L);
}有些地方會直接調用unsafe的park,看一下重入鎖ReentrantLock基本的實現思路
重入鎖它是一個應用級的東西,不是一個系統級的東西,LockSupport它是一個系統級的東西,它是調用了一些native的API,重入鎖本身它是一個應用級實現,有一些他調用了LockSupport,直接的實現是JAVA的實現,他的實現有三個內容是比較重要的,第一個是CAS狀態,無鎖的操作我們在前面也有介紹,CAS狀態做什么事情呢,判斷這個鎖到底有沒有被人占用,比如0沒有被占用,1表示被占用了,鎖的本質內部是一個CAS操作,用它來修改某一個變量,這個變量能不能修改成功,第二個是等待隊列,如果我沒有拿到這個鎖,那我就進入一個等待的隊列,多個線程要排隊,內部必須要維護一個等待隊列,把所有等待在這個鎖上的線程呢,都給保存起來,等待在隊列上的線程呢,類似LockSupport當中的park操作,只要我進入了等待隊列的線程呢,我都要park把它掛起,我什么時候把它unpark呢,讓他繼續執行呢,當我前面的線程把鎖unlock的時候,我就從等待隊列當中挑一個出來,來做unpark操作,這個就是ReentrantLock實現的一個根本,/*** Acquires the lock.** <p>Acquires the lock if it is not held by another thread and returns* immediately, setting the lock hold count to one.** <p>If the current thread already holds the lock then the hold* count is incremented by one and the method returns immediately.** <p>If the lock is held by another thread then the* current thread becomes disabled for thread scheduling* purposes and lies dormant until the lock has been acquired,* at which time the lock hold count is set to one.*/
public void lock() {sync.lock();
}這里有兩個實現,一個公平的,一個是非公平的,我們來看一下非公平的實現/*** Performs {@link Lock#lock}. The main reason for subclassing* is to allow fast path for nonfair version.*/
abstract void lock();/*** Sync object for non-fair locks*/
static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}
}首先是compareAndSetState比較設置CAS操作,acquire這個是鎖申請,/*** Acquires in exclusive mode, ignoring interrupts. Implemented* by invoking at least once {@link #tryAcquire},* returning on success. Otherwise the thread is queued, possibly* repeatedly blocking and unblocking, invoking {@link* #tryAcquire} until success. This method can be used* to implement method {@link Lock#lock}.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquire} but is otherwise uninterpreted and* can represent anything you like.*/
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}加到等待隊列當中去/*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}等待隊列的節點是Node,Node是對線程的包裝,節點可以把等待的線程信息都拿出來
?
總結