threadlocal使用_多方位点评ThreadLocal,细看各大开源软件实现
ThreadLocal類可以幫助創建只能由同一個線程訪問和寫入的變量。因此,即使是兩個線程在執行相同的代碼,代碼引用相同的ThreadLocal變量,兩個線程也只能看到自己線程的ThreadLocal變量。ThreadLocal類提供了一個簡單地方法維護線程安全。
ThreadLocal源碼
ThreadLocal類可以創建同一個線程訪問和寫入的變量,變量存儲在ThreadLocal.ThreadLocalMap類中,ThreadLocalMap使用數組存儲ThreadLocal變量和變量值。因為變量是和線程綁定的,所以ThreadLocalMap的引用在Thread類中,每個線程只能訪問它私有的ThreadLocalMap變量,從而保證ThreadLocal變量和線程的綁定關系。
public?class?Thread?implements?Runnable?{????/*?ThreadLocal?values?pertaining?to?this?thread.?This?map?is?maintained
?????*?by?the?ThreadLocal?class.?*/
????ThreadLocal.ThreadLocalMap?threadLocals?=?null;
????/*
?????*?InheritableThreadLocal?values?pertaining?to?this?thread.?This?map?is
?????*?maintained?by?the?InheritableThreadLocal?class.
?????*/
????ThreadLocal.ThreadLocalMap?inheritableThreadLocals?=?null;
????public?static?native?Thread?currentThread();
}
ThreadLocalMap的維護是由ThreadLocal類實現的,ThreadLocal類負責向ThreadLocalMap中寫入讀取和移除變量。
public?class?ThreadLocal<T>?{????public?T?get()?{
????????Thread?t?=?Thread.currentThread();
????????ThreadLocalMap?map?=?getMap(t);
????????if?(map?!=?null)?{
????????????ThreadLocalMap.Entry?e?=?map.getEntry(this);
????????????if?(e?!=?null)?{
????????????????@SuppressWarnings("unchecked")
????????????????T?result?=?(T)e.value;
????????????????return?result;
????????????}
????????}
????????return?setInitialValue();
????}
????private?T?setInitialValue()?{
????????T?value?=?initialValue();
????????Thread?t?=?Thread.currentThread();
????????ThreadLocalMap?map?=?getMap(t);
????????if?(map?!=?null)
????????????map.set(this,?value);
????????else
????????????createMap(t,?value);
????????return?value;
????}
????protected?T?initialValue()?{
????????return?null;
????}
????public?void?set(T?value)?{
????????Thread?t?=?Thread.currentThread();
????????ThreadLocalMap?map?=?getMap(t);
????????if?(map?!=?null)
????????????map.set(this,?value);
????????else
????????????createMap(t,?value);
????}
????public?void?remove()?{
????????ThreadLocalMap?m?=?getMap(Thread.currentThread());
????????if?(m?!=?null)
????????????m.remove(this);
????}
????ThreadLocalMap?getMap(Thread?t)?{
????????return?t.threadLocals;
????}
????void?createMap(Thread?t,?T?firstValue)?{
????????t.threadLocals?=?new?ThreadLocalMap(this,?firstValue);
????}
????static?ThreadLocalMap?createInheritedMap(ThreadLocalMap?parentMap)?{
????????return?new?ThreadLocalMap(parentMap);
????}
}
可以看到ThreadLocal在讀取寫入或移除變量時需要先獲取ThreadLocalMap,步驟是:
Thread?t?=?Thread.currentThread();ThreadLocalMap?map?=?t.threadLocals;
繼而通過ThreadLocalMap實現讀取寫入或移除變量的操作。
ThreadLocalMap是一個hash map的實現,基于數組,解決hash碰撞的方式是開放地址法而不是HashMap的拉鏈法,這里沒有使用JDK集合框架的相關的Map實現。數組中存儲實體是一個Entry的內部類,它繼承了WeakReference接口,使用了弱引用的特性,確保不會因為Entry的原因導致ThreadLocal對象無法被回收。但是注意table[]數組對于Entry的引用是強引用,Entry對象對于value對象的引用也是強引用。
ThreadLocal的潛在內存泄漏問題
ThreadLocal如果使用不當會出現內存泄漏的問題。因為ThreadLocal保證了變量是和線程綁定的,所以官方建議在使用ThreadLocal的時候將ThreadLocal實例設置為static:
?{@code?ThreadLocal}?instances?are?typically?private?static?fields?in?classes?that?wish?to?associate?state?with?a?thread?(e.g.,?a?user?ID?or?Transaction?ID).?
當ThreadLocal實例設置為靜態變量后,實例就不會隨著對象的gc而一起被gc,導致ThreadLocal內的對象一致被引用從而無法釋放。
內存泄漏的含義表示Value對象無法回收,而不是ThreadLocal對象無法回收。
這里的內存泄漏有三個條件:
下圖是棧和堆中實例和引用關系。從圖中可知如果線程常駐會導致ThreadLocalMap對象無法被回收,從而內部的table: Entry[]變量無法被回收。
圖中有兩個引用是指向堆中的ThreadLocal實例,一個是棧上的ThreadLocal引用,即代碼中創建的ThreadLocal對象,另一個是ThreadLocalMap持有的Entry對ThreadLocal對象的弱引用。
在ThreadLocalMap的內部類Entry中,使用弱應用持有ThreadLocal實例的引用,當垃圾回收壓力大時會回收ThreadLocal實例,但是如果代碼持有的ThreadLocal引用被聲明為靜態的,那么就不會隨著任務的執行結束而自動釋放,即會一直有個棧上的ThreadLocal引用存在,從而導致ThreadLocal無法被回收,而Entry對象也一直持有ThreadLocal的引用,導致無法觸發stale機制,從而Value對象無法回收。
static?class?Entry?extends?WeakReference<ThreadLocal>>?{????/**?The?value?associated?with?this?ThreadLocal.?*/
????Object?value;
????
????Entry(ThreadLocal>?k,?Object?v)?{
????????super(k);
????????value?=?v;
????}
}
stale機制是Entry使用ThreadLocal對象作為key,如果ThreadLocal對象被回收,那么Entry對象的key即為null,Entry對象不能再被外界訪問,從而可以被清除釋放。因為Entry對象持有的是弱引用,所以當棧上釋放引用的時候ThreadLocal就有了被回收的可能性,當某段時間后ThreadLocal對象被垃圾回收,Entry對象對ThreadLocal引用就變成了null,從而ThreadLocalMap就知道這個Entry對象永遠無法在被訪問,即將Entry對象清除。
解決辦法就是代碼在執行完畢后手動調用#remove方法。當調用完畢后棧上依然持有ThreadLocal的引用,但是Entry對象會移除對ThreadLocal的引用,并主動設置value為null,然后將table數組內對Entry的引用設置為null,這樣就保證value對象和Entry對象都會被回收。
這里還有個問題:即stale對象會在ThreadLocalMap的存活時間。如果棧上釋放ThreadLocal對象后,ThreadLocal對象的垃圾回收需要遵從弱引用的規則,其次當ThreadLocal對象已經被回收后ThreadLocalMap對象清除Entry對象也不是立馬能夠完成的。只有在ThreadLocalMap在進行數組擴容時才會檢測所有的對象并清除其中的stale對象,其余時候只在添加新的Entry實例時掃描幾輪table數組清除部分stale對象。
ExecutorService中使用ThreadLocal
如果打算同時使用線程池,且提交給線程池的任務代碼中使用了ThreadLocal,如果使用ThreadLocal不當便會出現難以排查的并發問題。
線程池線程運行時傳值
ThreadLocal類保證了變量只有同一個線程才能訪問和寫入,那么如果執行任務的線程創建了一個線程或者將任務提交到線程池,那么任務的執行就會涉及到多個線程,ThreadLocal變量需要在涉及的多個線程間共享即支持訪問和寫入。
對于新創建線程的問題,JDK提供了InheritableThreadLocal類。InheritableThreadLocal支持在任務線程創建新的線程時將任務線程的ThreadLocal變量通過淺拷貝共享給子線程,這樣子線程在執行任務的時候可以訪問寫入任務線程的ThreadLocal變量。
但是對于線程池中的線程,因為工作線程可能會常駐,任務提交到線程池時被放入工作隊列,工作線程從隊列中彈出任務進行執行,導致InheritableThreadLocal無法生效。
解決方案是在創建任務時獲取ThreadLocal的變量,保存在任務對象中,當任務執行時將變量設置到執行線程的ThreadLocal變量中。
public?class?CustomAttachmentDemo?{????private?static?ThreadLocal?holder?=?new?ThreadLocal<>();public?static?void?main(String[]?args)?throws?Exception?{
????????holder.set("value-set-in-parent");
????????Runnable?task?=?()?->?System.out.println("[child?thread]?get?"?+?holder.get());
????????Thread?thread?=?new?Thread(new?AttachmentdRunnable(task));
????????thread.start();
????????thread.join();
????????System.out.println("[parent?thread]?get?"?+?holder.get());
????}private?static?class?AttachmentdRunnable?implements?Runnable?{private?String?attachments;private?Runnable?task;public?AttachmentdRunnable(Runnable?task)?{this.task?=?task;this.attachments?=?holder.get();
????????}@Overridepublic?void?run()?{
????????????holder.set(attachments);
????????????task.run();
????????}
????}
}
工作線程的多任務間的ThreadLocal變量
因為線程池復用線程,線程池內的線程可能會常駐不會銷毀,任務中從ThreadLocal中訪問的變量可能是之前的任務寫入的,并非當前任務寫入的變量,從而導致不同的任務的ThreadLocal變量可以被多個任務讀取和寫入導致錯誤。
解決方案是在每個任務執行完畢后可以使用try/finally調用#remove()方法清除線程持有的線程本地變量。
@Overridepublic?void?run()?{
????try?{
????????holder.set(attachments);
????????task.run();
????}?finally?{
????????holder.remove();
????}
}
恢復工作線程ThreadLocal變量
工作線程可能在執行任務前有自己的ThreadLocal變量,在任務執行完畢后需要恢復工作線程的ThreadLocal變量以執行下一個任務,從而保證每個任務在執行的時候上下文環境一致。
transmittalbe-thread-local解決方案
阿里開源的ttl提供了任務提交給線程池時的ThreadLocal值傳遞到任務執行時。ttl不僅實現工作線程執行時傳值到ThreadLocal,還包括任務執行完畢后恢復工作線程池狀態的功能。使用ttl有三種方式:
其中修飾線程池的方式就是在調用execute,invoke等方法時調用TtlRunnable#get(Runnable)或TtlCallable#get(Callable)方法修飾Runnable和Callable。而Agent修飾線程池就是將對線程池的顯式修飾改為Agent增強。所以ttl的核心邏輯是對于Runnable和Callable的增強。
ttl有多種使用場景,在github上有一個使用案例文章匯總。
修飾Runnable或Callable
ttl提供的Runnable的修飾類TtlRunnable如下,這里精簡了很多的無關代碼,只顯示了最核心的代碼。
public?final?class?TtlRunnable?implements?Runnable?{????private?final?AtomicReference?capturedRef;private?final?Runnable?runnable;private?final?boolean?releaseTtlValueReferenceAfterRun;private?TtlRunnable(@NonNull?Runnable?runnable,?boolean?releaseTtlValueReferenceAfterRun)?{this.capturedRef?=?new?AtomicReference(capture());this.runnable?=?runnable;this.releaseTtlValueReferenceAfterRun?=?releaseTtlValueReferenceAfterRun;
????}/**
?????*?wrap?method?{@link?Runnable#run()}.
?????*/@Overridepublic?void?run()?{final?Object?captured?=?capturedRef.get();if?(captured?==?null?||?releaseTtlValueReferenceAfterRun?&&?!capturedRef.compareAndSet(captured,?null))?{throw?new?IllegalStateException("TTL?value?reference?is?released?after?run!");
????????}final?Object?backup?=?replay(captured);try?{
????????????runnable.run();
????????}?finally?{
????????????restore(backup);
????????}
????}
}
可以看到處理邏輯分為三步:
而完成這三步的方法capture(), replay(Object), restore(Object)方法都是TransmittableThreadLocal.Transmitter提供的方法。
在TransmittableThreadLocal類中使用靜態變量private static final InheritableThreadLocal, ?>> holder持有invoke線程的ThreadLocal變量,類似一個ThreadLocal注冊表。只所以使用WeakHashMap的考慮和ThreadLocalMap中Entry使用WeakReference的道理一致,是為了防止業務代碼沒有顯式調用ThreadLocal#remove()方法時能夠回收Entry對象。
public?class?TransmittableThreadLocal<T>?extends?InheritableThreadLocal<T>?implements?TtlCopier<T>?{????private?static?final?Logger?logger?=?Logger.getLogger(TransmittableThreadLocal.class.getName());
????private?final?boolean?disableIgnoreNullValueSemantics;
????public?TransmittableThreadLocal()?{
????????this(false);
????}
????public?TransmittableThreadLocal(boolean?disableIgnoreNullValueSemantics)?{
????????this.disableIgnoreNullValueSemantics?=?disableIgnoreNullValueSemantics;
????}
????public?T?copy(T?parentValue)?{
????????return?parentValue;
????}
????public?final?T?get()?{
????????T?value?=?super.get();
????????if?(disableIgnoreNullValueSemantics?||?null?!=?value)?addThisToHolder();
????????return?value;
????}
????public?final?void?set(T?value)?{
????????if?(!disableIgnoreNullValueSemantics?&&?null?==?value)?{
????????????//?may?set?null?to?remove?value
????????????remove();
????????}?else?{
????????????super.set(value);
????????????addThisToHolder();
????????}
????}
????public?final?void?remove()?{
????????removeThisFromHolder();
????????super.remove();
????}
????private?void?superRemove()?{
????????super.remove();
????}
????//?Note?about?the?holder:
????//?1.?holder?self?is?a?InheritableThreadLocal(a?*ThreadLocal*).
????//?2.?The?type?of?value?in?the?holder?is?WeakHashMap,??>.
????//????2.1?but?the?WeakHashMap?is?used?as?a?*Set*:
????//????????the?value?of?WeakHashMap?is?*always*?null,?and?never?used.
????//????2.2?WeakHashMap?support?*null*?value.
????private?static?final?InheritableThreadLocal,??>>?holder?=new?InheritableThreadLocal,??>>()?{@Overrideprotected?WeakHashMap,??>?initialValue()?{return?new?WeakHashMap,?Object>();
????????????????}@Overrideprotected?WeakHashMap,??>?childValue(WeakHashMap,??>?parentValue)?{return?new?WeakHashMap,?Object>(parentValue);
????????????????}
????????????};private?void?addThisToHolder()?{if?(!holder.get().containsKey(this))?{
????????????holder.get().put((TransmittableThreadLocal)?this,?null);?//?WeakHashMap?supports?null?value.
????????}
????}private?void?removeThisFromHolder()?{
????????holder.get().remove(this);
????}
}
可以看到TransmittableThreadLocal類的get(), set(), remove()方法都增加了對ThreadLocal注冊表的操作,確保創建移除TransmittableThreadLocal的時候能夠自動將TransmittableThreadLocal實例注冊到注冊表中。
TransmittableThreadLocal.Transmitter的源碼如下,里面省略了很多的代碼,其中Transmitter不僅支持TransmittableThreadLocal的捕獲重放和回放,還提供了對ThreadLocal和InheritableThreadLocal的增強功能,通過對ThreadLocal和InheritableThreadLocal的注冊,實現和TransmittableThreadLocal類似的功能。其中ThreadLocal和InheritableThreadLocal會被存儲到private static volatile WeakHashMap, TtlCopier> threadLocalHolder變量中,Transmitter對它的處理邏輯和TransmittableThreadLocal#holder的處理邏輯一致,其中Transmitter一般TtlValues針對的是holder注冊表的數據,ThreadLocalValues針對的是Transmitter注冊的數據。
public?static?class?Transmitter?{????public?static?Object?capture()?{
????????return?new?Snapshot(captureTtlValues(),?captureThreadLocalValues());
????}
????private?static?HashMap,?Object>?captureTtlValues()?{
????????HashMap,?Object>?ttl2Value?=?new?HashMap,?Object>();for?(TransmittableThreadLocal?threadLocal?:?holder.get().keySet())?{
????????????ttl2Value.put(threadLocal,?threadLocal.copyValue());
????????}return?ttl2Value;
????}public?static?Object?replay(Object?captured)?{final?Snapshot?capturedSnapshot?=?(Snapshot)?captured;return?new?Snapshot(replayTtlValues(capturedSnapshot.ttl2Value),?replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
????}private?static?HashMap,?Object>?replayTtlValues(HashMap,?Object>?captured)?{
????????HashMap,?Object>?backup?=?new?HashMap,?Object>();for?(final?Iterator>?iterator?=?holder.get().keySet().iterator();?iterator.hasNext();?)?{
????????????TransmittableThreadLocal?threadLocal?=?iterator.next();//?backup
????????????backup.put(threadLocal,?threadLocal.get());//?clear?the?TTL?values?that?is?not?in?captured//?avoid?the?extra?TTL?values?after?replay?when?run?taskif?(!captured.containsKey(threadLocal))?{
????????????????iterator.remove();
????????????????threadLocal.superRemove();
????????????}
????????}//?set?TTL?values?to?captured
????????setTtlValuesTo(captured);//?call?beforeExecute?callback
????????doExecuteCallback(true);return?backup;
????}private?static?void?setTtlValuesTo(@NonNull?HashMap,?Object>?ttlValues)?{for?(Map.Entry,?Object>?entry?:?ttlValues.entrySet())?{
????????????TransmittableThreadLocal?threadLocal?=?entry.getKey();
????????????threadLocal.set(entry.getValue());
????????}
????}public?static?void?restore(@NonNull?Object?backup)?{final?Snapshot?backupSnapshot?=?(Snapshot)?backup;
????????restoreTtlValues(backupSnapshot.ttl2Value);
????????restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
????}private?static?void?restoreTtlValues(@NonNull?HashMap,?Object>?backup)?{//?call?afterExecute?callback
????????doExecuteCallback(false);for?(final?Iterator>?iterator?=?holder.get().keySet().iterator();?iterator.hasNext();?)?{
????????????TransmittableThreadLocal?threadLocal?=?iterator.next();//?clear?the?TTL?values?that?is?not?in?backup//?avoid?the?extra?TTL?values?after?restoreif?(!backup.containsKey(threadLocal))?{
????????????????iterator.remove();
????????????????threadLocal.superRemove();
????????????}
????????}//?restore?TTL?values
????????setTtlValuesTo(backup);
????}private?static?class?Snapshot?{final?HashMap,?Object>?ttl2Value;final?HashMap,?Object>?threadLocal2Value;private?Snapshot(HashMap,?Object>?ttl2Value,?HashMap,?Object>?threadLocal2Value)?{this.ttl2Value?=?ttl2Value;this.threadLocal2Value?=?threadLocal2Value;
????????}
????}private?static?volatile?WeakHashMap,?TtlCopier>?threadLocalHolder?=?new?WeakHashMap,?TtlCopier>();private?Transmitter()?{throw?new?InstantiationError("Must?not?instantiate?this?class");
????}
}
仔細觀察TtlRunnable的實現,可以看出#capture()方法是在構造器調用的,處理的線程是invoke線程,結合holder變量的類型是InheritableThreadLocal,這里捕獲的都是invoke線程的ThreadLocal變量,轉換成一個Map。
#replay(Object), #restore(Object)方法是在#run()方法內,此時已經是工作線程在處理了??梢钥闯鰎eplay方法和restore方法是互為鏡像的方法,replay首先記錄子線程的數據進行備份稍后作為方法的返回結果,然后將入參傳入的需要回放的數據設置到工作線程的ThreadLocal中。restore方法省略了備份,將入參傳入的需要恢復的數據設置到工作線程的ThreadLocal中。
Netty關于ThreadLocal的優化FastThreadLocal
因為ThreadLocalMap其實是基于尋址法實現的hash map,在最差的情況下O(1)的時間復雜度會退化為O(N)的線形查找。Netty針對這點進行了優化,提供了FastThreadLocal實現,主要改動是對于InternalThreadLocalMap的實現。它的存儲也是基于數組實現,但是數組的索引是無法復用的,每個新創建的FastThreadLocal都會新增加一個唯一的索引,對于InternalThreadLocalMap中數組的一個下標。這個索引是只增不減的,從而保證了每次查詢都是O(1)的時間復雜度,但是當FastThreadLocal被回收后,InternalThreadLocalMap中底層數組索引對應的下標也不會在使用,這是一種典型的空間換時間的做法。這個優化點也被dubbo移植到項目中。
dubbo關于RpcContext的實現
?上下文信息是一次 RPC 調用過程中附帶的環境信息,如方法名、參數類型、真實參數、本端/對端地址等。這些數據僅屬于一次調用,作用于 Consumer 到 Provider 調用的整個流程。
?RpcContext的底層實現借助了ThreadLocal。其中ThreadLocal的實現移植了Netty項目中FastThreadLocal的實現。
下面是RpcContext的部分變量,它內部持有了很多RPC調用過程中的環境信息,在網絡傳輸時RpcContext中的信息作為Message的一部分序列化傳輸到另一端。在另一端經過反序列化后經過路由分配到相應地Invoker實現時被Filter從Message中讀取重新設置到RpcContext中,但是這個時候任務已經開始執行了,開始執行真正地業務邏輯而不是框架代碼。
/**?*?use?internal?thread?local?to?improve?performance
?*/
private?static?final?InternalThreadLocal?LOCAL?=?new?InternalThreadLocal()?{@Overrideprotected?RpcContext?initialValue()?{return?new?RpcContext();
????}
};/**
?*?用于服務端傳遞
?*/private?static?final?InternalThreadLocal?SERVER_LOCAL?=?new?InternalThreadLocal()?{@Overrideprotected?RpcContext?initialValue()?{return?new?RpcContext();
????}
};private?final?Map?attachments?=?new?HashMap();private?URL?url;private?String?methodName;private?Class>[]?parameterTypes;private?Object[]?arguments;private?InetSocketAddress?localAddress;private?InetSocketAddress?remoteAddress;
dubbo規避了任務提交到線程池時的ThreadLocal的傳值問題,如果在業務代碼中需要提交線程池等操作就需要開發者自己維護ThreadLocal的傳值問題。
dubbo的Filter使用了SPI機制,通過@Activate注解可知消費者端接收Response Message,消費者端接收Request Message,對應的RpcContext支持如下:
@Activate(group?=?Constants.CONSUMER,?order?=?-10000)public?class?ConsumerContextFilter?implements?Filter?{
????@Override
????public?Result?invoke(Invoker>?invoker,?Invocation?invocation)?throws?RpcException?{
????????RpcContext.getContext()
????????????????.setInvoker(invoker)
????????????????.setInvocation(invocation)
????????????????.setLocalAddress(NetUtils.getLocalHost(),?0)
????????????????.setRemoteAddress(invoker.getUrl().getHost(),
????????????????????????invoker.getUrl().getPort());
????????if?(invocation?instanceof?RpcInvocation)?{
????????????((RpcInvocation)?invocation).setInvoker(invoker);
????????}
????????try?{
????????????RpcResult?result?=?(RpcResult)?invoker.invoke(invocation);
????????????RpcContext.getServerContext().setAttachments(result.getAttachments());
????????????return?result;
????????}?finally?{
????????????RpcContext.getContext().clearAttachments();
????????}
????}
}
@Activate(group?=?Constants.PROVIDER,?order?=?-10000)
public?class?ContextFilter?implements?Filter?{
????@Override
????public?Result?invoke(Invoker>?invoker,?Invocation?invocation)?throws?RpcException?{
????????Map?attachments?=?invocation.getAttachments();if?(attachments?!=?null)?{
????????????attachments?=?new?HashMap(attachments);
????????????attachments.remove(Constants.PATH_KEY);
????????????attachments.remove(Constants.GROUP_KEY);
????????????attachments.remove(Constants.VERSION_KEY);
????????????attachments.remove(Constants.DUBBO_VERSION_KEY);
????????????attachments.remove(Constants.TOKEN_KEY);
????????????attachments.remove(Constants.TIMEOUT_KEY);
????????????attachments.remove(Constants.ASYNC_KEY);//?Remove?async?property?to?avoid?being?passed?to?the?following?invoke?chain.
????????}
????????RpcContext.getContext()
????????????????.setInvoker(invoker)
????????????????.setInvocation(invocation)//????????????????.setAttachments(attachments)??//?merged?from?dubbox
????????????????.setLocalAddress(invoker.getUrl().getHost(),
????????????????????????invoker.getUrl().getPort());//?mreged?from?dubbox//?we?may?already?added?some?attachments?into?RpcContext?before?this?filter?(e.g.?in?rest?protocol)if?(attachments?!=?null)?{if?(RpcContext.getContext().getAttachments()?!=?null)?{
????????????????RpcContext.getContext().getAttachments().putAll(attachments);
????????????}?else?{
????????????????RpcContext.getContext().setAttachments(attachments);
????????????}
????????}if?(invocation?instanceof?RpcInvocation)?{
????????????((RpcInvocation)?invocation).setInvoker(invoker);
????????}try?{
????????????RpcResult?result?=?(RpcResult)?invoker.invoke(invocation);//?pass?attachments?to?result
????????????result.addAttachments(RpcContext.getServerContext().getAttachments());return?result;
????????}?finally?{
????????????RpcContext.removeContext();
????????????RpcContext.getServerContext().clearAttachments();
????????}
????}
}
lucene和elasticsearch解決方案
因為ThreadLocalMap的value的移除如果不是顯式地調用ThreadLocal#remove()方法,那么就需要弱引用的垃圾回收機制和stale對象的移除機制,所以可能ThreadLocal中value的移除可能會花費非常多地時間,然而這不是一個內存泄漏,因為它最后能夠被回收。但是當弱引用被GC機制回收之后,如果沒有ThreadLocalMap#set()方法調用,那么永遠不會觸發stale的Entry對象回收,那么Entry對象和value對象也都不能被回收,極端情況下可能造成OOM問題。那么只需要顯示地調用一次ThreadLocal#remove()方法即可解決這個問題。
Lucene的CloseableThreadLocal
lucene提供了CloseableThreadLocal方法繼承了Closeable方法,在#close()方法內執行了#remove()方法。
elasticsearch的ThreadContext
ThreadContext提供了和ttl類似的功能。elasticsearch因為任務主要是transport任務,在請求和響應的處理中需要透明處理headers的傳值,當收到網絡請求時,所有的headers通過反序列化后通過ThreadContext#readHeaders(StreamInput)方法直接添加進ThreadContext,當請求處理完畢后同樣會恢復之前的context以處理下一個請求。
ThreadContext同樣借助ThreadLocal實現功能,它使用的是Lucene優化后的CloseableThreadLocal類。elasticsearch的ContextThreadLocal指定了ThreadLocal存儲的變量類型為ThreadContextStruct,這是個headers的bean包裝類。
private?static?class?ContextThreadLocal?extends?CloseableThreadLocal<ThreadContextStruct>?{????
????private?final?AtomicBoolean?closed?=?new?AtomicBoolean(false);
????@Override
????public?void?set(ThreadContextStruct?object)?{
????????try?{
????????????if?(object?==?DEFAULT_CONTEXT)?{
????????????????super.set(null);
????????????}?else?{
????????????????super.set(object);
????????????}
????????}?catch?(NullPointerException?ex)?{
????????????/*?This?is?odd?but?CloseableThreadLocal?throws?a?NPE?if?it?was?closed?but?still?accessed.
???????????????to?get?a?real?exception?we?call?ensureOpen()?to?tell?the?user?we?are?already?closed.*/
????????????ensureOpen();
????????????throw?ex;
????????}
????}
????@Override
????public?ThreadContextStruct?get()?{
????????try?{
????????????ThreadContextStruct?threadContextStruct?=?super.get();
????????????if?(threadContextStruct?!=?null)?{
????????????????return?threadContextStruct;
????????????}
????????????return?DEFAULT_CONTEXT;
????????}?catch?(NullPointerException?ex)?{
????????????/*?This?is?odd?but?CloseableThreadLocal?throws?a?NPE?if?it?was?closed?but?still?accessed.
???????????????to?get?a?real?exception?we?call?ensureOpen()?to?tell?the?user?we?are?already?closed.*/
????????????ensureOpen();
????????????throw?ex;
????????}
????}
????private?void?ensureOpen()?{
????????if?(closed.get())?{
????????????throw?new?IllegalStateException("threadcontext?is?already?closed");
????????}
????}
????public?void?close()?{
????????if?(closed.compareAndSet(false,?true))?{
????????????super.close();
????????}
????}
}
ThreadContextStruct提供了從StreamInput讀取headers的構造函數,以及一系列添加request/response headers的方法,ThreadContextStruct是headers讀取寫入的一個管理工具類。
private?static?final?class?ThreadContextStruct?{????private?final?Map?requestHeaders;private?final?Map?transientHeaders;private?final?Map>?responseHeaders;private?ThreadContextStruct(StreamInput?in)?throws?IOException?{//...
????}
}
elasticsearch捕獲恢復ThreadLocal的方式很巧妙,具體的實現類是ThreadContext.StoredContext。它是一個函數式接口,#restore()方法默認是調用#close()方法。
@FunctionalInterfacepublic?interface?StoredContext?extends?AutoCloseable?{
????@Override
????void?close();
????default?void?restore()?{
????????close();
????}
}
ThreadContext提供了#newStoredContext()和#stashContext()方法分別用于捕獲回放和清除恢復線程,二者的實現類似,但是注意到它們的執行線程是不一致的。#newStoredContext()方法是在調用線程中運行的,而#stashContext()是在工作線程中運行的。
public?StoredContext?newStoredContext(boolean?preserveResponseHeaders)?{????final?ThreadContextStruct?context?=?threadLocal.get();
????return?()??->?{
????????if?(preserveResponseHeaders?&&?threadLocal.get()?!=?context)?{
????????????threadLocal.set(context.putResponseHeaders(threadLocal.get().responseHeaders));
????????}?else?{
????????????threadLocal.set(context);
????????}
????};
}
public?StoredContext?stashContext()?{
????final?ThreadContextStruct?context?=?threadLocal.get();
????threadLocal.set(null);
????return?()?->?threadLocal.set(context);
}
而elasticsearch提供了兩種對Runnable的修飾類,分別是ContextPreservingRunnable和ContextPreservingAbstractRunnable,二者都在構造器中都捕獲了調用線程的ThreadContext中threadLocal變量中持有的ThreadContextStruct對象并封裝為ThreadContext.StoredContext,然后在執行run方法前先備份工作線程的ThreadLocal變量,在執行業務邏輯后恢復工作線程的環境,只不過ContextPreservingRunnable使用的是try/with方式,而ContextPreservingAbstractRunnable是在#onAfter回調中。
private?class?ContextPreservingRunnable?implements?WrappedRunnable?{????private?final?Runnable?in;
????private?final?ThreadContext.StoredContext?ctx;
????private?ContextPreservingRunnable(Runnable?in)?{
????????ctx?=?newStoredContext(false);
????????this.in?=?in;
????}
????@Override
????public?void?run()?{
????????boolean?whileRunning?=?false;
????????try?(ThreadContext.StoredContext?ignore?=?stashContext()){
????????????ctx.restore();
????????????in.run();
????????}?
????}
}
private?class?ContextPreservingAbstractRunnable?extends?AbstractRunnable?implements?WrappedRunnable?{
????private?final?AbstractRunnable?in;
????private?final?ThreadContext.StoredContext?creatorsContext;
????private?ThreadContext.StoredContext?threadsOriginalContext?=?null;
????private?ContextPreservingAbstractRunnable(AbstractRunnable?in)?{
????????creatorsContext?=?newStoredContext(false);
????????this.in?=?in;
????}
????@Override
????public?void?onAfter()?{
????????try?{
????????????in.onAfter();
????????}?finally?{
????????????if?(threadsOriginalContext?!=?null)?{
????????????????threadsOriginalContext.restore();
????????????}
????????}
????}
????@Override
????protected?void?doRun()?throws?Exception?{
????????threadsOriginalContext?=?stashContext();
????????creatorsContext.restore();
????????in.doRun();
????}
}
elasticsearch的線程池實現提供了開箱即用的ThreadContext支持,它會對每個執行的任務進行修飾,關鍵代碼如下:
public?class?EsThreadPoolExecutor?extends?ThreadPoolExecutor?{????private?final?ThreadContext?contextHolder;
???
????@Override
????public?void?execute(Runnable?command)?{
????????command?=?wrapRunnable(command);
????????try?{
????????????super.execute(command);
????????}?catch?(EsRejectedExecutionException?ex)?{
????????????if?(command?instanceof?AbstractRunnable)?{
????????????????//?If?we?are?an?abstract?runnable?we?can?handle?the?rejection
????????????????//?directly?and?don't?need?to?rethrow?it.
????????????????try?{
????????????????????((AbstractRunnable)?command).onRejection(ex);
????????????????}?finally?{
????????????????????((AbstractRunnable)?command).onAfter();
????????????????}
????????????}?else?{
????????????????throw?ex;
????????????}
????????}
????}
????protected?Runnable?wrapRunnable(Runnable?command)?{
????????return?contextHolder.preserveContext(command);
????}
}
//?ThreadContext#preserveContext(Runnable)
public?Runnable?preserveContext(Runnable?command)?{
????if?(command?instanceof?ContextPreservingAbstractRunnable)?{
????????return?command;
????}
????if?(command?instanceof?ContextPreservingRunnable)?{
????????return?command;
????}
????if?(command?instanceof?AbstractRunnable)?{
????????return?new?ContextPreservingAbstractRunnable((AbstractRunnable)?command);
????}
????return?new?ContextPreservingRunnable(command);
}
總結
以上是生活随笔為你收集整理的threadlocal使用_多方位点评ThreadLocal,细看各大开源软件实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ubuntu卸载nvidia驱动_解决U
- 下一篇: 生成对抗网络gan原理_生成对抗网络(G