动手实现Kotlin协程同步切换线程,以及Kotlin协程是如何实现线程切换的
前言
突發(fā)奇想想搞一個同步切換線程的Kotlin協(xié)程,而不用各種withContext(){},可以減少嵌套且邏輯更清晰,想實(shí)現(xiàn)的結(jié)果如下圖:
分析
實(shí)現(xiàn)我們想要的結(jié)果,首先需要知道協(xié)程為什么可以控制線程的切換以及在掛起函數(shù)恢復(fù)的時候回到原來設(shè)定的線程中
ps:掛起函數(shù)比普通函數(shù)多出了兩個操作:掛起和恢復(fù),具體參考:Kotlin協(xié)程在項(xiàng)目中的實(shí)際應(yīng)用_lt的博客-CSDN博客_kotlin協(xié)程使用
其實(shí)控制線程切換是協(xié)程庫內(nèi)內(nèi)置的一個攔截器類:ContinuationInterceptor
攔截器是一個協(xié)程上下文元素(ContinuationInterceptor實(shí)現(xiàn)了Element接口? ? ?,Element實(shí)現(xiàn)了CoroutineContext接口)
攔截器的作用是將協(xié)程體包裝一層后,攔截其恢復(fù)功能(resumeWith),這樣就可以在協(xié)程恢復(fù)的時候?qū)b在其內(nèi)部的協(xié)程體在相應(yīng)的線程中恢復(fù)執(zhí)行(執(zhí)行resumeWith方法)
比如協(xié)程自帶的Dispatchers.Main,Dispatchers.IO等都是協(xié)程攔截器,下面簡單分析下Dispatchers.IO攔截器
我們點(diǎn)進(jìn)去IO的定義,兜兜轉(zhuǎn)轉(zhuǎn)的找到其實(shí)現(xiàn)類LimitingDispatcher,其繼承了ExecutorCoroutineDispatcher? ,ExecutorCoroutineDispatcher繼承了CoroutineDispatcher? ,CoroutineDispatcher實(shí)現(xiàn)了ContinuationInterceptor接口,也就是其最終實(shí)現(xiàn)了協(xié)程攔截器的接口
CoroutineDispatcher重寫了攔截器的interceptContinuation方法,該方法就是用來包裝并攔截的
然后我們在看看DispatchedContinuation的resumeWith方法(也就是如何攔截并將包裝的協(xié)程體運(yùn)行在子線程的)
ps:其實(shí)走的是resumeCancellableWith方法,因?yàn)閰f(xié)程內(nèi)部做了一個判斷,IO的攔截器是繼承了DispatchedContinuation的
第一個紅框IO那是寫死的true,所以只會走第一個流程,而第二個紅框你可以簡單的理解為將后續(xù)任務(wù)(下面的代碼邏輯)放在這個dispatcher的線程池中運(yùn)行(就相當(dāng)于將協(xié)程中的代碼的線程放到了IO子線程中運(yùn)行了)
通過上面的分析,其實(shí)IO的攔截器可以簡單理解為如下代碼:
實(shí)現(xiàn)
那我們是不是可以將攔截器從協(xié)程上下文中移除呢?我試了下并不行,發(fā)現(xiàn)是在launch的時候會自動判斷,如果沒有攔截器則默認(rèn)附加Dispatchers.Default攔截器用于將操作置于子線程中
ps:可以通過遍歷來查看當(dāng)前協(xié)程上下文中都有哪些協(xié)程元素(下面是反射的實(shí)現(xiàn),可以使用系統(tǒng)提供的fold來遍歷):
/*** 通過反射遍歷協(xié)程上下文中的元素*/ fun CoroutineContext.forEach(action: (CoroutineContext.Element) -> Unit) {val modeClass = Class.forName("kotlin.coroutines.CombinedContext")val elementField = modeClass.getDeclaredField("element")elementField.isAccessible = trueval leftField = modeClass.getDeclaredField("left")leftField.isAccessible = truevar context: CoroutineContext? = thiswhile (context?.javaClass == modeClass) {(elementField.get(context) as? CoroutineContext.Element)?.let(action)context = leftField.get(context) as? CoroutineContext}(context as? CoroutineContext.Element)?.let(action) }coroutineContext.forEach {it.toString().e() }pps:協(xié)程上下文其實(shí)是以鏈表形式來存儲的,CombinedContext就相當(dāng)于鏈表的Node節(jié)點(diǎn),其element相當(dāng)于數(shù)據(jù),其left相當(dāng)于下一個節(jié)點(diǎn),而存儲的最后一個節(jié)點(diǎn)是未經(jīng)過CombinedContext包裝的協(xié)程上下文元素(可能是節(jié)省空間);而不管協(xié)程上下文,或者CombinedContext的element和left,都是val的,這樣上下文都是只讀的,避免了并發(fā)修改的危險.
那現(xiàn)在看來其實(shí)我們只要自己寫一個攔截器,然后在運(yùn)行協(xié)程的時候附加上去,線程切換就可以由我們來控制了,那實(shí)現(xiàn)起來其實(shí)也很簡單,代碼如下:
/*** 同步切換線程的協(xié)程元素,需要注意所有掛起函數(shù)都有可能影響到后面的線程,所以需要注意:最好內(nèi)部使用不會切換線程的掛起函數(shù)(或者你清楚使用的后果)*/ fun CoroutineScope.launchSyncSwitchThread(block: suspend SyncSwitchThreadCoroutineScope.() -> Unit): Job =launch(SyncSwitchThreadContinuationInterceptor) { SyncSwitchThreadCoroutineScope(this@launch).block() }//攔截器 object SyncSwitchThreadContinuationInterceptor : ContinuationInterceptor {override val key: CoroutineContext.Key<*> = ContinuationInterceptoroverride fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = SyncSwitchThreadContinuation(continuation)//協(xié)程體包裝類class SyncSwitchThreadContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> {override val context: CoroutineContext = continuation.context//這里我們不進(jìn)行攔截恢復(fù)方法,直接使用恢復(fù)者(誰調(diào)用了resume)的線程override fun resumeWith(result: Result<T>): Unit = continuation.resumeWith(result)}//協(xié)程作用域包裝類,用于控制toMain等方法不會被別的地方使用class SyncSwitchThreadCoroutineScope(coroutineScope: CoroutineScope) : CoroutineScope by coroutineScope {private suspend inline fun suspendToThread(crossinline threadFunction: (()->Unit) -> Unit) =suspendCoroutine<Unit> {threadFunction {if (it.context.isActive)it.resume(Unit)}}//切換到主線程,[force]是否強(qiáng)制post到主線程(false:如果當(dāng)前是主線程則不會post)suspend fun toMain(force: Boolean = false) {if (!force && isMainThread) returnsuspendToThread(HandlerPool::postEmptyListener)//Handler#post方法}//切換到單例子線程suspend fun toSingle(): Unit = suspendToThread(ThreadPool::submitToSingleThreadPool)//提交到單線程線程池suspend fun toIO(): Unit = suspendToThread(ThreadPool::submitToCacheThreadPool)//提交到子線程池suspend fun toCPU(): Unit = suspendToThread(ThreadPool::submitToCPUThreadPool)//提交到CPU密集型子線程池//或者為了理解起來簡單減少封裝寫成如下方式suspend fun toCPU(): Unit = suspendCoroutine { ThreadPool.submitToCacheThreadPool { if(it.context.isActive)it.resume(Unit)}}} }ps:第一次運(yùn)行的線程是啟動它的線程
pps:這里我們包裝了一下協(xié)程作用域CoroutineScope,可以防止toMain這些方法用在別的地方造成歧義且無用
然后我們就可以像開頭那樣使用了
或者通過協(xié)程上下文的plus(+)方法來替換掉默認(rèn)的攔截器:
最后在封裝一下:
fun CoroutineScope.launchSyncSwitchThread(block: suspend SyncSwitchThreadCoroutineScope.() -> Unit): Job =launch(SyncSwitchThreadContinuationInterceptor) { SyncSwitchThreadCoroutineScope(this@launch).block() }使用方式就和最開始的圖一樣了
結(jié)語
這樣就ok了,其實(shí)任何事情只要了解原理,就可以很快的想到方案,如果不清楚原理,就會對一些事情一頭霧水
ps:其實(shí)使用Dispatchers.Unconfined也可以實(shí)現(xiàn)相同的效果2333,但是了解原理還是更重要
pps:如果想限制launchSyncSwitchThread的lambda范圍內(nèi)只能使用自己定義的幾個掛起函數(shù),只需要給SyncSwitchThreadCoroutineScope類加上@RestrictsSuspension注解即可,這樣在SyncSwitchThreadCoroutineScope的作用域內(nèi),只能使用他內(nèi)部定義的掛起函數(shù),可以有效減少別的掛起函數(shù)意外切換掉線程的情況
end
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的动手实现Kotlin协程同步切换线程,以及Kotlin协程是如何实现线程切换的的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hook安卓项目内的字符串获取,用服务器
- 下一篇: 分析Kotlin协程只挂起不恢复会怎样(