深入理解 Java 锁与线程阻塞
相信大家對線程鎖和線程阻塞都很了解,無非就是 synchronized, wait/notify 等, 但是你有仔細想過 Java 虛擬機是如何實現鎖和阻塞的呢?它們之間又有哪些聯系呢?如果感興趣的話請接著往下看。
為保障多線程下處理共享數據的安全性,Java 語言給我們提供了線程鎖,保證同一時刻只有一個線程能處理共享數據。當一個鎖被某個線程持有的時候,另一個線程嘗試去獲取這個鎖將產生線程阻塞,直到持有鎖的線程釋放了該鎖。
除了搶占鎖的時候會出現線程阻塞,另外還有一些方法也會產生線程阻塞,比如: Object.wait(), Thread.sleep(), ArrayBlockingQueue.put() 等等,他們都有一個共同特點:不消耗 CPU 時間片。另外值得指出的是 Object.wait() 會釋放持有的鎖,而 Thread.sleep() 不會,相信這點大家都清楚。 當然 while(true){ } 也能產生阻塞線程的效果,自旋鎖就是使用循環,配合 CAS (compareAndSet) 實現的,這個不在我們的討論之列。
相信大家對線程鎖都很熟悉,目前有兩種方法,準確來說是三種,synchronized 方法,synchronized 區塊,ReentrantLock。先說 synchronized,代碼如下:
public class Lock {public static void synchronized print() {System.out.println("method synchronized");}public static void print2() {synchronized(Lock.class) {System.out.println("synchronized");}}public static void main(String[] args) {Lock.print();Lock.print2();} }編譯后通過如下命令查看其字節碼
javap -c -v Lock其中節選方法一(Lock.print)的字節碼如下:
public static synchronized void print();descriptor: ()Vflags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZEDCode:stack=2, locals=0, args_size=00: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream;3: ldc #3 // String method synchronized5: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V8: return }可以看到方法表的訪問標志位 (flags) 中多了個 ACC_SYNCHRONIZED,然后看字節碼指令區域 (Code) ,和普通方法沒任何差別, 猜測 Java 虛擬機通過檢查方法表中是否存在標志位 ACC_SYNCHRONIZED 來決定是否需要獲取鎖,至于獲取鎖的原理后文會提到。
然后看第二個使用 synchronized 區塊的方法(Lock.print2)字節碼:
public static void print2();descriptor: ()Vflags: ACC_PUBLIC, ACC_STATICCode:stack=2, locals=2, args_size=00: ldc #5 // 將鎖對象 Lock.class 入棧2: dup // 復制一份,此時棧中有兩個 Lock.class 3: astore_0 // 出棧一個 Lock.class 對象保存到局部變量表 Slot 1 中4: monitorenter // 以棧頂元素 Lock.class 作為鎖,開始同步5: getstatic #2 // 5-10 調用 System.out.println("synchronized");8: ldc #610: invokevirtual #413: aload_0 // 將局部變量表 Slot 1 中的數據入棧,即 Lock.class14: monitorexit // 使用棧頂數據退出同步15: goto 23 // 方法結束,跳轉到 23 返回18: astore_1 // 從這里開始是異常路徑,將異常信息保存至局部變量表 Slot 2 中,查看異常表19: aload_0 // 將局部變量表 Slot 1 中的 Lock.class 入棧20: monitorexit // 使用棧頂數據退出同步21: aload_1 // 將局部變量表 Slot 2 中的異常信息入棧22: athrow // 把異常對象重新拋出給方法的調用者23: return // 方法正常返回Exception table: // 異常表from to target type5 15 18 any // 5-15 出現任何(any)異常跳轉到 18 18 21 18 any // 18-21 出現任何(any)異常跳轉到 18synchronized 區塊的字節碼相比較 synchronized 方法復雜了許多。每一行字節碼的含義我都作了詳細注釋,可以看到此時是通過字節碼指令 monitorenter,monitorexit 來進入和退出同步的。特別值得注意的是,我們并沒有寫 try.catch 捕獲異常,但是字節碼指令中存在異常處理的代碼,其實為了保證在方法異常完成時 monitorenter 和 monitorexit 指令依然可以正確配對執行,編譯器會自動產生一個異常處理器,這個異常處理器聲明可處理所有的異常,它的目的就是用來執行 monitorexit 指令。這個機制確保在 synchronized 區塊中產生任何異常都可以正常退出同步,釋放鎖資源。
不管是檢查標志位中的 ACC_SYNCHRONIZED,還是字節碼指令 monitorenter,monitorexit,鎖機制的實現最終肯定存在于 JVM 中,后面我們會再提到這點。
接下來繼續看 ReentrantLock 的實現,鑒于篇幅有限,ReentrantLock 的原理不會講的很詳細,感興趣的可以自行研究。ReentrantLock 是基于并發基礎組件 AbstractQueuedSynchronizer 實現的,內部有一個 int 類型的 state 變量來控制同步狀態,為 0 時表示無線程占用鎖資源,等于 1 時表示則說明有線程占用,由于 ReentrantLock 是可重入鎖,state 也可能大于 1 表示該線程有多次獲取鎖。AQS 內部還有一個由內部類 Node 構成的隊列用來完成線程獲取鎖的排隊。本文只是簡單的介紹一下 lock 和 unLock 方法。
下面先看 ReentrantLock.lock 方法:
// ReentrantLock.java public void lock() {this.sync.lock(); } // ReentrantLock.NonfairSync.class final void lock() {// 使用 cas 設置 state,如果設置成功表示當前無其他線程競爭鎖,優先獲取鎖資源if (this.compareAndSetState(0, 1)) {// 保存當前線程由于后續重入鎖的判斷this.setExclusiveOwnerThread(Thread.currentThread());} else {this.acquire(1);} } // AbstractQueuedSynchronizer.javapublic final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt(); // 如果阻塞被中斷,重新設置中斷通知調用者 } // 判斷是否是重入 protected final boolean tryAcquire(int var1) {return this.nonfairTryAcquire(var1); } // 處理等待隊列 final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} } private final boolean parkAndCheckInterrupt() {LockSupport.park(this); // 阻塞線程return Thread.interrupted(); }對于鎖競爭的情況,最終會調用 LockSupport.park(this) 阻塞當前線程,同樣的 ReentrantLock.unlock 方法會調用 LockSupport.unpark(thread) 來恢復阻塞的線程。繼續看 LockSupport 的實現:
public static void unpark(Thread thread) {if (var0 != null) {UNSAFE.unpark(thread);} } public static void park(Object obj) {Thread thread = Thread.currentThread();setBlocker(thread, obj);UNSAFE.park(false, 0L);setBlocker(thread, (Object)null); }LockSupport 內部調用了 UnSafe 類的 park 和 unpark, 是 native 代碼,該類由虛擬機實現,以 Hotspot 虛擬機為例,查看 park 方法:
// unsafe.cpp UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))UnsafeWrapper("Unsafe_Park"); #ifndef USDT2HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time); #else /* USDT2 */HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time); #endif /* USDT2 */JavaThreadParkedState jtps(thread, time != 0);thread->parker()->park(isAbsolute != 0, time); #ifndef USDT2HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker()); #else /* USDT2 */HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker()); #endif /* USDT2 */ UNSAFE_END調用了: thread->parker()->park(isAbsolute != 0, time); 我們可以猜測是這句代碼阻塞了當前線程。HotSpot 虛擬機里的 Thread 類對應著一個 OS 的 Thread, JavaThread 類繼承于 Thread, JavaThread 實例對應著一個 Java 層的 Thread.
簡而言之,Java 層的 Thread 對應著一個 OS 的 Thread。使用如下代碼創建線程:
回到 Thread 類中的 Park,我們查看 HotSpot 的 thread.hpp, 找到了如下三個 Park:
public:ParkEvent * _ParkEvent ; // for synchronized()ParkEvent * _SleepEvent ; // for Thread.sleep // JSR166 per-thread parker private:Parker* _parker;從注釋上可以看出分別是用于 synchronized 的阻塞,Thread.sleep 的阻塞還有用于 UnSafe 的線程阻塞,繼續查看 park.hpp 節選:
// A word of caution: The JVM uses 2 very similar constructs: // 1. ParkEvent are used for Java-level "monitor" synchronization. // 2. Parkers are used by JSR166-JUC park-unpark. class Parker : public os::PlatformParker { // 略 } class ParkEvent : public os::PlatformEvent { // 略 }注釋上更近一步解釋了兩種 Parker 的區別,他們的實現非常相似,那為什么會存在兩個呢?網絡上有解釋說是只是沒重構而已。下面只看 Parker 的實現,發現 park.cpp 中并沒有實現 park 方法,猜測應該是父類中實現了,因為這是和系統相關的操作,以 Linux 系統為例,查看 linux_os.cpp 找到了 park 的實現,截取了主要部分:
void Parker::park(bool isAbsolute, jlong time) {// 省略了前置判斷// 獲取鎖if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {return;}if (time == 0) {_cur_index = REL_INDEX; // arbitrary choice when not timed// 調用 pthread_cond_wait 阻塞線程status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;} else {_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;if (status != 0 && WorkAroundNPTLTimedWaitHang) {pthread_cond_destroy (&_cond[_cur_index]) ;pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());}}_cur_index = -1;// 已從 block 中恢復,釋放鎖_counter = 0 ;status = pthread_mutex_unlock(_mutex) ;// 略 }總共分三步走,先獲取鎖,再調用 pthread_cond_wait 阻塞線程,最后阻塞恢復了之后釋放鎖,是不是和我們使用 Object.wait 十分類似,事實上 Object.wait 底層也是這種方式實現的。為了更清楚的了解底層的實現,寫了一段 c 代碼看一下線程的創建和鎖的使用:
int counter = 0; // 互斥鎖對象 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; void* add() {for(int i = 0;i < 2;++i) {// 獲取鎖pthread_mutex_lock( &mutex );++counter;sleep(1);// 釋放鎖pthread_mutex_unlock( &mutex );printf("counter = %d\n", counter);}pthread_exit(NULL); } int main() {pthread_t thread_1, thread_2;// 創建線程pthread_create(&thread_1, NULL, add, NULL);pthread_create(&thread_2, NULL, add, NULL);pthread_join(thread_1, NULL);pthread_join(thread_2, NULL);return 0; }使用 pthread_create 創建線程,使用 pthread_mutex_lock 獲取鎖,使用 pthread_mutex_unlock 釋放鎖。那既然 pthread_mutex_lock 和 pthread_mutex_unlock 就能實現鎖了,那為什么鎖實現的時候還要使用 pthread_cond_wait 來阻塞線程呢?回過頭看 PlatformParker :
//os_linux.hpp class PlatformParker {pthread_mutex_t _mutex[1];//一個是給park用, 另一個是給parkUntil用pthread_cond_t _cond[2]; // one for relative times and one for abs.//略... };每個 JavaThread 實例都有自己的 mutex,在上述自己寫的例子中是多個線程競爭同一個 mutex,阻塞線程隊列管理的邏輯直接由 mutex 實現,而此處的 mutex 線程私有,不存在直接競爭關系,事實上,JVM 為了提升平臺通用性(?),只提供了線程阻塞和恢復操作,阻塞線程隊列的管理工作交給了 Java 層,也就是前面提到的 AQS。對于 Java 層來說 JVM 只需要提供 「阻塞」 和 「喚醒」 的操作即可。
在 Java 中講解 Object.wait, Object.notify 的時候通常會用生產者-消費者作為例子,這里我也簡單的寫了一個 c 的例子,讓大家了解底層線程阻塞的原理:
其中消費者線程是一個循環,在循環中先獲取鎖,然后判斷隊列是否為空,如果為空則調用 pthread_cond_wait 阻塞線程,這個阻塞操作會自動釋放持有的鎖并出讓 cpu 時間片,恢復的時候自動獲取鎖,消費完隊列之后會調用 pthread_cond_signal 通知生產者線程,另外還有一個通知所有線程恢復的 pthread_cond_broadcast,與 notifyAll 類似。
最后再簡單談一下阻塞中斷,Java 層 Thread 中有個 interrupt 方法,它的作用是在線程收到阻塞的時候拋出一個中斷信號,這樣線程就會退出阻塞狀態,但是并不是我們遇到的所有阻塞都會中斷,要看是否會響應中斷信號,Object.wait, Thread.join,Thread.sleep,ReentrantLock.lockInterruptibly 這些會拋出受檢異常 InterruptedException 的都會被中斷。synchronized,ReentrantLock.lock 的鎖競爭阻塞是不會被中斷的,interrupt 并不會強制終止線程,而是會將線程設置成 interrupted 狀態,我們可以通過判斷 isInterrupted 或 interrupted 來獲取中斷狀態,區別在于后者會重置中斷狀態為 false。看一下底層線程中斷的代碼:
// os_linux.cpp void os::interrupt(Thread* thread) {OSThread* osthread = thread->osthread();if (!osthread->interrupted()) {osthread->set_interrupted(true);OrderAccess::fence();ParkEvent * const slp = thread->_SleepEvent ;if (slp != NULL) slp->unpark() ;}// For JSR166. Unpark even if interrupt status already was setif (thread->is_Java_thread())((JavaThread*)thread)->parker()->unpark();ParkEvent * ev = thread->_ParkEvent ;if (ev != NULL) ev->unpark() ; }可以看到,線程中斷也是由 unpark 實現的, 即恢復了阻塞的線程。并且對之前提到的三個 Parker (_ParkEvent,_SleepEvent,_parker) 都進行了 unpark。
說到這里相信大家對 Java 線程鎖與線程阻塞有個大體的了解了吧,由于本人水平實在有限,有些地方講的不好或者有錯誤的地方請多包涵,如果發現任何問題,請提出討論,我會及時修改。
>> 轉載請注明來源:深入理解 Java 鎖與線程阻塞
?
總結
以上是生活随笔為你收集整理的深入理解 Java 锁与线程阻塞的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 微信终端跨平台组件 Mars 系列(三)
- 下一篇: 亿级 ELK 日志平台构建实践