死磕Synchronized底层实现--偏向锁
注:本篇很長,請找個舒適的姿勢閱讀。
?
本文為synchronized系列第二篇。主要內容為分析偏向鎖的實現。
偏向鎖的誕生背景和基本原理在上文中已經講過了,強烈建議在有看過上篇文章的基礎下閱讀本文。
更多文章見個人博客:https://github.com/farmerjohngit/myblog
本系列文章將對HotSpot的synchronized鎖實現進行全面分析,內容包括偏向鎖、輕量級鎖、重量級鎖的加鎖、解鎖、鎖升級流程的原理及源碼分析,希望給在研究synchronized路上的同學一些幫助。主要包括以下幾篇文章:
死磕Synchronized底層實現--概論
死磕Synchronized底層實現--偏向鎖
死磕Synchronized底層實現--輕量級鎖(待更新)
死磕Synchronized底層實現--重量級鎖(待更新)
本文將分為幾塊內容:
1.偏向鎖的入口
2.偏向鎖的獲取流程
3.偏向鎖的撤銷流程
4.偏向鎖的釋放流程
5.偏向鎖的批量重偏向和批量撤銷
本文分析的JVM版本是JVM8,具體版本號以及代碼可以在這里看到。
偏向鎖入口
目前網上的很多文章,關于偏向鎖源碼入口都找錯地方了,導致我之前對于偏向鎖的很多邏輯一直想不通,走了很多彎路。
synchronized分為synchronized代碼塊和synchronized方法,其底層獲取鎖的邏輯都是一樣的,本文講解的是synchronized代碼塊的實現。上篇文章也說過,synchronized代碼塊是由monitorenter和monitorexit兩個指令實現的。
關于HotSpot虛擬機中獲取鎖的入口,網上很多文章要么給出的方法入口為interpreterRuntime.cpp#monitorenter,要么給出的入口為bytecodeInterpreter.cpp#1816。包括占小狼的這篇文章關于鎖入口的位置說法也是有問題的(當然文章還是很好的,在我剛開始研究synchronized的時候,小狼哥的這篇文章給了我很多幫助)。
要找鎖的入口,肯定是要在源碼中找到對monitorenter指令解析的地方。在HotSpot的中有兩處地方對monitorenter指令進行解析:一個是在bytecodeInterpreter.cpp#1816?,另一個是在templateTable_x86_64.cpp#3667。
前者是JVM中的字節碼解釋器(bytecodeInterpreter),用C++實現了每條JVM指令(如monitorenter、invokevirtual等),其優點是實現相對簡單且容易理解,缺點是執行慢。后者是模板解釋器(templateInterpreter),其對每個指令都寫了一段對應的匯編代碼,啟動時將每個指令與對應匯編代碼入口綁定,可以說是效率做到了極致。模板解釋器的實現可以看這篇文章,在研究的過程中也請教過文章作者‘汪先生’一些問題,這里感謝一下。
在HotSpot中,只用到了模板解釋器,字節碼解釋器根本就沒用到,R大的讀書筆記中說的很清楚了,大家可以看看,這里不再贅述。
所以montorenter的解析入口在模板解釋器中,其代碼位于templateTable_x86_64.cpp#3667。通過調用路徑:templateTable_x86_64#monitorenter->interp_masm_x86_64#lock_object進入到偏向鎖入口macroAssembler_x86#biased_locking_enter,在這里大家可以看到會生成對應的匯編代碼。需要注意的是,不是說每次解析monitorenter指令都會調用biased_locking_enter,而是只會在JVM啟動的時候調用該方法生成匯編代碼,之后對指令的解析是通過直接執行匯編代碼。
其實bytecodeInterpreter的邏輯和templateInterpreter的邏輯是大同小異的,因為templateInterpreter中都是匯編代碼,比較晦澀,所以看bytecodeInterpreter的實現會便于理解一點。但這里有個坑,在jdk8u之前,bytecodeInterpreter并沒有實現偏向鎖的邏輯。我之前看的JDK8-87ee5ee27509這個版本就沒有實現偏向鎖的邏輯,導致我看了很久都沒看懂。在這個commit中對bytecodeInterpreter加入了偏向鎖的支持,我大致了看了下和templateInterpreter對比除了棧結構不同外,其他邏輯大致相同,所以下文就按bytecodeInterpreter中的代碼對偏向鎖邏輯進行講解。templateInterpreter的匯編代碼講解可以看這篇文章,其實匯編源碼中都有英文注釋,了解了匯編幾個基本指令的作用再結合注釋理解起來也不是很難。
偏向鎖獲取流程
下面開始偏向鎖獲取流程分析,代碼在bytecodeInterpreter.cpp#1816。注意本文代碼都有所刪減。
CASE(_monitorenter): { // lockee 就是鎖對象oop lockee = STACK_OBJECT(-1); // derefing's lockee ought to provoke implicit null checkCHECK_NULL(lockee); // code 1:找到一個空閑的Lock RecordBasicObjectLock* limit = istate->monitor_base();BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();BasicObjectLock* entry = NULL; while (most_recent != limit ) { if (most_recent->obj() == NULL) entry = most_recent; else if (most_recent->obj() == lockee) break;most_recent++;} //entry不為null,代表還有空閑的Lock Recordif (entry != NULL) { // code 2:將Lock Record的obj指針指向鎖對象entry->set_obj(lockee); int success = false; uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place; // markoop即對象頭的mark wordmarkOop mark = lockee->mark(); intptr_t hash = (intptr_t) markOopDesc::no_hash; // code 3:如果鎖對象的mark word的狀態是偏向模式if (mark->has_bias_pattern()) { uintptr_t thread_ident; uintptr_t anticipated_bias_locking_value;thread_ident = (uintptr_t)istate->thread(); // code 4:這里有幾步操作,下文分析anticipated_bias_locking_value =(((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &~((uintptr_t) markOopDesc::age_mask_in_place); // code 5:如果偏向的線程是自己且epoch等于class的epochif (anticipated_bias_locking_value == 0) { // already biased towards this thread, nothing to doif (PrintBiasedLockingStatistics) {(* BiasedLocking::biased_lock_entry_count_addr())++;}success = true;} // code 6:如果偏向模式關閉,則嘗試撤銷偏向鎖else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {markOop header = lockee->klass()->prototype_header(); if (hash != markOopDesc::no_hash) {header = header->copy_set_hash(hash);} // 利用CAS操作將mark word替換為class中的mark wordif (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) { if (PrintBiasedLockingStatistics)(*BiasedLocking::revoked_lock_entry_count_addr())++;}} // code 7:如果epoch不等于class中的epoch,則嘗試重偏向else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) { // 構造一個偏向當前線程的mark wordmarkOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident); if (hash != markOopDesc::no_hash) {new_header = new_header->copy_set_hash(hash);} // CAS替換對象頭的mark word ?if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) { if (PrintBiasedLockingStatistics)(* BiasedLocking::rebiased_lock_entry_count_addr())++;} else { // 重偏向失敗,代表存在多線程競爭,則調用monitorenter方法進行鎖升級CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);}success = true;} else { // 走到這里說明當前要么偏向別的線程,要么是匿名偏向(即沒有偏向任何線程)// code 8:下面構建一個匿名偏向的mark word,嘗試用CAS指令替換掉鎖對象的mark wordmarkOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |(uintptr_t)markOopDesc::age_mask_in_place |epoch_mask_in_place)); if (hash != markOopDesc::no_hash) {header = header->copy_set_hash(hash);}markOop new_header = (markOop) ((uintptr_t) header | thread_ident); // debugging hintDEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);) if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) { // CAS修改成功if (PrintBiasedLockingStatistics)(* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;} else { // 如果修改失敗說明存在多線程競爭,所以進入monitorenter方法CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);}success = true;}} // 如果偏向線程不是當前線程或沒有開啟偏向模式等原因都會導致success==falseif (!success) { // 輕量級鎖的邏輯//code 9: 構造一個無鎖狀態的Displaced Mark Word,并將Lock Record的lock指向它markOop displaced = lockee->mark()->set_unlocked();entry->lock()->set_displaced_header(displaced); //如果指定了-XX:+UseHeavyMonitors,則call_vm=true,代表禁用偏向鎖和輕量級鎖bool call_vm = UseHeavyMonitors; // 利用CAS將對象頭的mark word替換為指向Lock Record的指針if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) { // 判斷是不是鎖重入if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) { //code 10: 如果是鎖重入,則直接將Displaced Mark Word設置為nullentry->lock()->set_displaced_header(NULL);} else { CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);}}} UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);} else { // lock record不夠,重新執行istate->set_msg(more_monitors); UPDATE_PC_AND_RETURN(0); // Re-execute} }再回顧下對象頭中mark word的格式:
JVM中的每個類也有一個類似mark word的prototype_header,用來標記該class的epoch和偏向開關等信息。上面的代碼中lockee->klass()->prototype_header()即獲取class的prototype_header。
code 1,從當前線程的棧中找到一個空閑的Lock Record(即代碼中的BasicObjectLock,下文都用Lock Record代指),判斷Lock Record是否空閑的依據是其obj字段 是否為null。注意這里是按內存地址從低往高找到最后一個可用的Lock Record,換而言之,就是找到內存地址最高的可用Lock Record。
code 2,獲取到Lock Record后,首先要做的就是為其obj字段賦值。
code 3,判斷鎖對象的mark word是否是偏向模式,即低3位是否為101。
code 4,這里有幾步位運算的操作?anticipated_bias_locking_value = (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) & ?~((uintptr_t) markOopDesc::age_mask_in_place);?這個位運算可以分為3個部分。
第一部分((uintptr_t)lockee->klass()->prototype_header() | thread_ident)?將當前線程id和類的prototype_header相或,這樣得到的值為(當前線程id + prototype_header中的(epoch + 分代年齡 + 偏向鎖標志 + 鎖標志位)),注意prototype_header的分代年齡那4個字節為0
第二部分?^ (uintptr_t)mark?將上面計算得到的結果與鎖對象的markOop進行異或,相等的位全部被置為0,只剩下不相等的位。
第三部分?& ~((uintptr_t) markOopDesc::age_mask_in_place)?markOopDesc::age_mask_in_place為...0001111000,取反后,變成了...1110000111,除了分代年齡那4位,其他位全為1;將取反后的結果再與上面的結果相與,將上面異或得到的結果中分代年齡給忽略掉。
code 5,anticipated_bias_locking_value==0代表偏向的線程是當前線程且mark word的epoch等于class的epoch,這種情況下什么都不用做。
code 6,(anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0代表class的prototype_header或對象的mark word中偏向模式是關閉的,又因為能走到這已經通過了mark->has_bias_pattern()判斷,即對象的mark word中偏向模式是開啟的,那也就是說class的prototype_header不是偏向模式。
然后利用CAS指令Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark撤銷偏向鎖,我們知道CAS會有幾個參數,1是預期的原值,2是預期修改后的值 ,3是要修改的對象,與之對應,cmpxchg_ptr方法第一個參數是預期修改后的值,第2個參數是修改的對象,第3個參數是預期原值,方法返回實際原值,如果等于預期原值則說明修改成功。
code 7,如果epoch已過期,則需要重偏向,利用CAS指令將鎖對象的mark word替換為一個偏向當前線程且epoch為類的epoch的新的mark word。
code 8,CAS將偏向線程改為當前線程,如果當前是匿名偏向則能修改成功,否則進入鎖升級的邏輯。
code 9,這一步已經是輕量級鎖的邏輯了。從上圖的mark word的格式可以看到,輕量級鎖中mark word存的是指向Lock Record的指針。這里構造一個無鎖狀態的mark word,然后存儲到Lock Record(Lock Record的格式可以看第一篇文章)。設置mark word是無鎖狀態的原因是:輕量級鎖解鎖時是將對象頭的mark word設置為Lock Record中的Displaced Mark Word,所以創建時設置為無鎖狀態,解鎖時直接用CAS替換就好了。
code 10, 如果是鎖重入,則將Lock Record的Displaced Mark Word設置為null,起到一個鎖重入計數的作用。
以上是偏向鎖加鎖的流程(包括部分輕量級鎖的加鎖流程),如果當前鎖已偏向其他線程||epoch值過期||偏向模式關閉||獲取偏向鎖的過程中存在并發沖突,都會進入到InterpreterRuntime::monitorenter方法, 在該方法中會對偏向鎖撤銷和升級。
偏向鎖的撤銷
這里說的撤銷是指在獲取偏向鎖的過程因為不滿足條件導致要將鎖對象改為非偏向鎖狀態;釋放是指退出同步塊時的過程,釋放鎖的邏輯會在下一小節闡述。請讀者注意本文中撤銷與釋放的區別。
如果獲取偏向鎖失敗會進入到InterpreterRuntime::monitorenter方法
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))...Handle h_obj(thread, elem->obj()); assert(Universe::heap()->is_in_reserved_or_null(h_obj()),"must be NULL or an object"); if (UseBiasedLocking) { // Retry fast entry if bias is revoked to avoid unnecessary inflationObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);} else { ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);}... IRT_END可以看到如果開啟了JVM偏向鎖,那會進入到ObjectSynchronizer::fast_enter方法中。
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) { if (UseBiasedLocking) { if (!SafepointSynchronize::is_at_safepoint()) {BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD); if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) { return;}} else { assert(!attempt_rebias, "can not rebias toward VM thread"); BiasedLocking::revoke_at_safepoint(obj);} assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");} slow_enter (obj, lock, THREAD) ; }如果是正常的Java線程,會走上面的邏輯進入到BiasedLocking::revoke_and_rebias方法,如果是VM線程則會走到下面的BiasedLocking::revoke_at_safepoint。我們主要看BiasedLocking::revoke_and_rebias方法。這個方法的主要作用像它的方法名:撤銷或者重偏向,第一個參數封裝了鎖對象和當前線程,第二個參數代表是否允許重偏向,這里是true。
BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) { assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");markOop mark = obj->mark(); if (mark->is_biased_anonymously() && !attempt_rebias) { //如果是匿名偏向且attempt_rebias==false會走到這里,如鎖對象的hashcode方法被調用會出現這種情況,需要撤銷偏向鎖。markOop biased_value = mark;markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark); if (res_mark == biased_value) { return BIAS_REVOKED;}} else if (mark->has_bias_pattern()) { // 鎖對象開啟了偏向模式會走到這里Klass* k = obj->klass();markOop prototype_header = k->prototype_header(); //code 1: 如果對應class關閉了偏向模式if (!prototype_header->has_bias_pattern()) {markOop biased_value = mark;markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark); assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked"); return BIAS_REVOKED; //code2: 如果epoch過期} else if (prototype_header->bias_epoch() != mark->bias_epoch()) { if (attempt_rebias) { assert(THREAD->is_Java_thread(), "");markOop biased_value = mark;markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark); if (res_mark == biased_value) { return BIAS_REVOKED_AND_REBIASED;}} else {markOop biased_value = mark;markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark); if (res_mark == biased_value) { return BIAS_REVOKED;}}}} //code 3:批量重偏向與批量撤銷的邏輯HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias); if (heuristics == HR_NOT_BIASED) { return NOT_BIASED;} else if (heuristics == HR_SINGLE_REVOKE) { //code 4:撤銷單個線程Klass *k = obj->klass();markOop prototype_header = k->prototype_header(); if (mark->biased_locker() == THREAD &&prototype_header->bias_epoch() == mark->bias_epoch()) { // 走到這里說明需要撤銷的是偏向當前線程的鎖,當調用Object#hashcode方法時會走到這一步// 因為只要遍歷當前線程的棧就好了,所以不需要等到safepoint再撤銷。ResourceMark rm; if (TraceBiasedLocking) {tty->print_cr("Revoking bias by walking my own stack:");}BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD);((JavaThread*) THREAD)->set_cached_monitor_info(NULL); assert(cond == BIAS_REVOKED, "why not?"); return cond;} else { // 下面代碼最終會在VM線程中的safepoint調用revoke_bias方法VM_RevokeBias revoke(&obj, (JavaThread*) THREAD); VMThread::execute(&revoke); return revoke.status_code();}} assert((heuristics == HR_BULK_REVOKE) ||(heuristics == HR_BULK_REBIAS), "?"); //code5:批量撤銷、批量重偏向的邏輯VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,(heuristics == HR_BULK_REBIAS),attempt_rebias); VMThread::execute(&bulk_revoke); return bulk_revoke.status_code(); }會走到該方法的邏輯有很多,我們只分析最常見的情況:假設鎖已經偏向線程A,這時B線程嘗試獲得鎖。
上面的code 1,code 2B線程都不會走到,最終會走到code 4處,如果要撤銷的鎖偏向的是當前線程則直接調用revoke_bias撤銷偏向鎖,否則會將該操作push到VM Thread中等到safepoint的時候再執行。
關于VM Thread這里介紹下:在JVM中有個專門的VM Thread,該線程會源源不斷的從VMOperationQueue中取出請求,比如GC請求。對于需要safepoint的操作(VM_Operationevaluate_at_safepoint返回true)必須要等到所有的Java線程進入到safepoint才開始執行。 關于safepoint可以參考下這篇文章。
接下來我們著重分析下revoke_bias方法。第一個參數為鎖對象,第2、3個參數為都為false
static BiasedLocking::Condition revoke_bias(oop obj, bool allow_rebias, bool is_bulk, JavaThread* requesting_thread) {markOop mark = obj->mark(); // 如果沒有開啟偏向模式,則直接返回NOT_BIASEDif (!mark->has_bias_pattern()) {... return BiasedLocking::NOT_BIASED;} uint age = mark->age(); // 構建兩個mark word,一個是匿名偏向模式(101),一個是無鎖模式(001)markOop biased_prototype = markOopDesc::biased_locking_prototype()->set_age(age);markOop unbiased_prototype = markOopDesc::prototype()->set_age(age);...JavaThread* biased_thread = mark->biased_locker(); if (biased_thread == NULL) { // 匿名偏向。當調用鎖對象的hashcode()方法可能會導致走到這個邏輯// 如果不允許重偏向,則將對象的mark word設置為無鎖模式if (!allow_rebias) {obj->set_mark(unbiased_prototype);}... return BiasedLocking::BIAS_REVOKED;} // code 1:判斷偏向線程是否還存活bool thread_is_alive = false; // 如果當前線程就是偏向線程 if (requesting_thread == biased_thread) {thread_is_alive = true;} else { // 遍歷當前jvm的所有線程,如果能找到,則說明偏向的線程還存活for (JavaThread* cur_thread = Threads::first(); cur_thread != NULL; cur_thread = cur_thread->next()) { if (cur_thread == biased_thread) {thread_is_alive = true; break;}}} // 如果偏向的線程已經不存活了if (!thread_is_alive) { // 允許重偏向則將對象mark word設置為匿名偏向狀態,否則設置為無鎖狀態if (allow_rebias) {obj->set_mark(biased_prototype);} else {obj->set_mark(unbiased_prototype);}... return BiasedLocking::BIAS_REVOKED;} // 線程還存活則遍歷線程棧中所有的Lock RecordGrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread);BasicLock* highest_lock = NULL; for (int i = 0; i < cached_monitor_info->length(); i++) {MonitorInfo* mon_info = cached_monitor_info->at(i); // 如果能找到對應的Lock Record說明偏向的線程還在執行同步代碼塊中的代碼if (mon_info->owner() == obj) {... // 需要升級為輕量級鎖,直接修改偏向線程棧中的Lock Record。為了處理鎖重入的case,在這里將Lock Record的Displaced Mark Word設置為null,第一個Lock Record會在下面的代碼中再處理markOop mark = markOopDesc::encode((BasicLock*) NULL);highest_lock = mon_info->lock();highest_lock->set_displaced_header(mark);} else {...}} if (highest_lock != NULL) { // 修改第一個Lock Record為無鎖狀態,然后將obj的mark word設置為執行該Lock Record的指針highest_lock->set_displaced_header(unbiased_prototype);obj->release_set_mark(markOopDesc::encode(highest_lock));...} else { // 走到這里說明偏向線程已經不在同步塊中了... if (allow_rebias) { //設置為匿名偏向狀態obj->set_mark(biased_prototype);} else { // 將mark word設置為無鎖狀態obj->set_mark(unbiased_prototype);}} return BiasedLocking::BIAS_REVOKED; }需要注意下,當調用鎖對象的Object#hash或System.identityHashCode()方法會導致該對象的偏向鎖或輕量級鎖升級。這是因為在Java中一個對象的hashcode是在調用這兩個方法時才生成的,如果是無鎖狀態則存放在mark word中,如果是重量級鎖則存放在對應的monitor中,而偏向鎖是沒有地方能存放該信息的,所以必須升級。具體可以看這篇文章的hashcode()方法對偏向鎖的影響小節(注意:該文中對于偏向鎖的加鎖描述有些錯誤),另外我也向該文章作者請教過一些問題,他很熱心的回答了我,在此感謝一下!
言歸正傳,revoke_bias方法邏輯:
查看偏向的線程是否存活,如果已經不存活了,則直接撤銷偏向鎖。JVM維護了一個集合存放所有存活的線程,通過遍歷該集合判斷某個線程是否存活。
偏向的線程是否還在同步塊中,如果不在了,則撤銷偏向鎖。我們回顧一下偏向鎖的加鎖流程:每次進入同步塊(即執行monitorenter)的時候都會以從高往低的順序在棧中找到第一個可用的Lock Record,將其obj字段指向鎖對象。每次解鎖(即執行monitorexit)的時候都會將最低的一個相關Lock Record移除掉。所以可以通過遍歷線程棧中的Lock Record來判斷線程是否還在同步塊中。
將偏向線程所有相關Lock Record的Displaced Mark Word設置為null,然后將最高位的Lock Record的Displaced Mark Word?設置為無鎖狀態,最高位的Lock Record也就是第一次獲得鎖時的Lock Record(這里的第一次是指重入獲取鎖時的第一次),然后將對象頭指向最高位的Lock Record,這里不需要用CAS指令,因為是在safepoint。 執行完后,就升級成了輕量級鎖。原偏向線程的所有Lock Record都已經變成輕量級鎖的狀態。這里如果看不明白,請再回顧游戲啊上篇文章的輕量級鎖加鎖過程。
偏向鎖的釋放
偏向鎖的釋放入口在bytecodeInterpreter.cpp#1923
CASE(_monitorexit): {oop lockee = STACK_OBJECT(-1); CHECK_NULL(lockee); // derefing's lockee ought to provoke implicit null check// find our monitor slotBasicObjectLock* limit = istate->monitor_base();BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base(); // 從低往高遍歷棧的Lock Recordwhile (most_recent != limit ) { // 如果Lock Record關聯的是該鎖對象if ((most_recent)->obj() == lockee) {BasicLock* lock = most_recent->lock();markOop header = lock->displaced_header(); // 釋放Lock Recordmost_recent->set_obj(NULL); // 如果是偏向模式,僅僅釋放Lock Record就好了。否則要走輕量級鎖or重量級鎖的釋放流程if (!lockee->mark()->has_bias_pattern()) { bool call_vm = UseHeavyMonitors; // header!=NULL說明不是重入,則需要將Displaced Mark Word CAS到對象頭的Mark Wordif (header != NULL || call_vm) { if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) { // CAS失敗或者是重量級鎖則會走到這里,先將obj還原,然后調用monitorexit方法most_recent->set_obj(lockee); CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);}}} //執行下一條命令UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);} //處理下一條Lock Recordmost_recent++;} // Need to throw illegal monitor state exceptionCALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception); ShouldNotReachHere(); }上面的代碼結合注釋理解起來應該不難,偏向鎖的釋放很簡單,只要將對應Lock Record釋放就好了,而輕量級鎖則需要將Displaced Mark Word替換到對象頭的mark word中。如果CAS失敗或者是重量級鎖則進入到InterpreterRuntime::monitorexit方法中。該方法會在輕量級與重量級鎖的文章中講解。
批量重偏向和批量撤銷
批量重偏向和批量撤銷的背景可以看上篇文章,相關實現在BiasedLocking::revoke_and_rebias中:
BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {... //code 1:重偏向的邏輯HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias); // 非重偏向的邏輯... assert((heuristics == HR_BULK_REVOKE) ||(heuristics == HR_BULK_REBIAS), "?"); //code 2:批量撤銷、批量重偏向的邏輯VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,(heuristics == HR_BULK_REBIAS),attempt_rebias); VMThread::execute(&bulk_revoke); return bulk_revoke.status_code(); }在每次撤銷偏向鎖的時候都通過update_heuristics方法記錄下來,以類為單位,當某個類的對象撤銷偏向次數達到一定閾值的時候JVM就認為該類不適合偏向模式或者需要重新偏向另一個對象,update_heuristics就會返回HR_BULK_REVOKE或HR_BULK_REBIAS。進行批量撤銷或批量重偏向。
先看update_heuristics方法。
static HeuristicsResult update_heuristics(oop o, bool allow_rebias) {markOop mark = o->mark(); //如果不是偏向模式直接返回if (!mark->has_bias_pattern()) { return HR_NOT_BIASED;} // 鎖對象的類Klass* k = o->klass(); // 當前時間jlong cur_time = os::javaTimeMillis(); // 該類上一次批量撤銷的時間jlong last_bulk_revocation_time = k->last_biased_lock_bulk_revocation_time(); // 該類偏向鎖撤銷的次數int revocation_count = k->biased_lock_revocation_count(); // BiasedLockingBulkRebiasThreshold是重偏向閾值(默認20),BiasedLockingBulkRevokeThreshold是批量撤銷閾值(默認40),BiasedLockingDecayTime是開啟一次新的批量重偏向距離上次批量重偏向的后的延遲時間,默認25000。也就是開啟批量重偏向后,經過了一段較長的時間(>=BiasedLockingDecayTime),撤銷計數器才超過閾值,那我們會重置計數器。if ((revocation_count >= BiasedLockingBulkRebiasThreshold) &&(revocation_count < BiasedLockingBulkRevokeThreshold) &&(last_bulk_revocation_time != 0) &&(cur_time - last_bulk_revocation_time >= BiasedLockingDecayTime)) { // This is the first revocation we've seen in a while of an// object of this type since the last time we performed a bulk// rebiasing operation. The application is allocating objects in// bulk which are biased toward a thread and then handing them// off to another thread. We can cope with this allocation// pattern via the bulk rebiasing mechanism so we reset the// klass's revocation count rather than allow it to increase// monotonically. If we see the need to perform another bulk// rebias operation later, we will, and if subsequently we see// many more revocation operations in a short period of time we// will completely disable biasing for this type.k->set_biased_lock_revocation_count(0);revocation_count = 0;} // 自增撤銷計數器if (revocation_count <= BiasedLockingBulkRevokeThreshold) {revocation_count = k->atomic_incr_biased_lock_revocation_count();} // 如果達到批量撤銷閾值則返回HR_BULK_REVOKEif (revocation_count == BiasedLockingBulkRevokeThreshold) { return HR_BULK_REVOKE;} // 如果達到批量重偏向閾值則返回HR_BULK_REBIASif (revocation_count == BiasedLockingBulkRebiasThreshold) { return HR_BULK_REBIAS;} // 沒有達到閾值則撤銷單個對象的鎖return HR_SINGLE_REVOKE; }當達到閾值的時候就會通過VM 線程在safepoint調用bulk_revoke_or_rebias_at_safepoint, 參數bulk_rebias如果是true代表是批量重偏向否則為批量撤銷。attempt_rebias_of_object代表對操作的鎖對象o是否運行重偏向,這里是true。
static BiasedLocking::Condition bulk_revoke_or_rebias_at_safepoint(oop o, bool bulk_rebias, bool attempt_rebias_of_object,JavaThread* requesting_thread) {...jlong cur_time = os::javaTimeMillis();o->klass()->set_last_biased_lock_bulk_revocation_time(cur_time);Klass* k_o = o->klass();Klass* klass = k_o; if (bulk_rebias) { // 批量重偏向的邏輯if (klass->prototype_header()->has_bias_pattern()) { // 自增前類中的的epochint prev_epoch = klass->prototype_header()->bias_epoch(); // code 1:類中的epoch自增klass->set_prototype_header(klass->prototype_header()->incr_bias_epoch()); int cur_epoch = klass->prototype_header()->bias_epoch(); // code 2:遍歷所有線程的棧,更新類型為該klass的所有鎖實例的epochfor (JavaThread* thr = Threads::first(); thr != NULL; thr = thr->next()) {GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(thr); for (int i = 0; i < cached_monitor_info->length(); i++) {MonitorInfo* mon_info = cached_monitor_info->at(i);oop owner = mon_info->owner();markOop mark = owner->mark(); if ((owner->klass() == k_o) && mark->has_bias_pattern()) { // We might have encountered this object already in the case of recursive lockingassert(mark->bias_epoch() == prev_epoch || mark->bias_epoch() == cur_epoch, "error in bias epoch adjustment");owner->set_mark(mark->set_bias_epoch(cur_epoch));}}}} // 接下來對當前鎖對象進行重偏向revoke_bias(o, attempt_rebias_of_object && klass->prototype_header()->has_bias_pattern(), true, requesting_thread);} else {... // code 3:批量撤銷的邏輯,將類中的偏向標記關閉,markOopDesc::prototype()返回的是一個關閉偏向模式的prototypeklass->set_prototype_header(markOopDesc::prototype()); // code 4:遍歷所有線程的棧,撤銷該類所有鎖的偏向for (JavaThread* thr = Threads::first(); thr != NULL; thr = thr->next()) {GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(thr); for (int i = 0; i < cached_monitor_info->length(); i++) {MonitorInfo* mon_info = cached_monitor_info->at(i);oop owner = mon_info->owner();markOop mark = owner->mark(); if ((owner->klass() == k_o) && mark->has_bias_pattern()) { revoke_bias(owner, false, true, requesting_thread);}}} // 撤銷當前鎖對象的偏向模式revoke_bias(o, false, true, requesting_thread);}...BiasedLocking::Condition status_code = BiasedLocking::BIAS_REVOKED; if (attempt_rebias_of_object &&o->mark()->has_bias_pattern() &&klass->prototype_header()->has_bias_pattern()) { // 構造一個偏向請求線程的mark wordmarkOop new_mark = markOopDesc::encode(requesting_thread, o->mark()->age(),klass->prototype_header()->bias_epoch()); // 更新當前鎖對象的mark wordo->set_mark(new_mark);status_code = BiasedLocking::BIAS_REVOKED_AND_REBIASED;...}... return status_code; }該方法分為兩個邏輯:批量重偏向和批量撤銷。
先看批量重偏向,分為兩步:
code 1?將類中的撤銷計數器自增1,之后當該類已存在的實例獲得鎖時,就會嘗試重偏向,相關邏輯在偏向鎖獲取流程小節中。
code 2?處理當前正在被使用的鎖對象,通過遍歷所有存活線程的棧,找到所有正在使用的偏向鎖對象,然后更新它們的epoch值。也就是說不會重偏向正在使用的鎖,否則會破壞鎖的線程安全性。
批量撤銷邏輯如下:
code 3將類的偏向標記關閉,之后當該類已存在的實例獲得鎖時,就會升級為輕量級鎖;該類新分配的對象的mark word則是無鎖模式。
code 4處理當前正在被使用的鎖對象,通過遍歷所有存活線程的棧,找到所有正在使用的偏向鎖對象,然后撤銷偏向鎖。
總結
以上是生活随笔為你收集整理的死磕Synchronized底层实现--偏向锁的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux内存映射mmap原理分析
- 下一篇: 面试常考:Synchronized 有几