深入理解 Kotlin coroutine (二)
原文鏈接:https://github.com/enbandari/Kotlin-Tutorials
上周我們把 Kotlin Coroutine 的基本 API 挨個(gè)講了一下,也給出了一些簡(jiǎn)單的封裝。
真是不要太給臉,就在前幾天發(fā)布的 1.1 Beta 2 當(dāng)中,所有協(xié)程的 API 包名后面都加了一個(gè) experimental,這意味著 Kotlin 官方在 1.1 當(dāng)中還是傾向于將 Coroutine 作為一個(gè)實(shí)驗(yàn)性質(zhì)的特性的,不過(guò),這也沒關(guān)系,我們學(xué)習(xí)的心不以外界的變化而變化不是?
這一篇我們基于前面的基礎(chǔ)來(lái)了解一下 Kotlinx.coroutines 這個(gè)庫(kù)的使用,如果大家對(duì)它的實(shí)現(xiàn)原理有興趣,可以再讀一讀上一篇文章,我們也可以在后面繼續(xù)寫一些文章來(lái)給深入地大家介紹。
1. 準(zhǔn)備工作
就像前面我們說(shuō)到的,1.1 Beta 2 當(dāng)中協(xié)程相關(guān)的基礎(chǔ)庫(kù)的包名都增加了 experimental,所以我們?cè)谶x擇 kotlinx.coroutines 的版本的時(shí)候也一定要對(duì)應(yīng)好編譯器的版本,不然…你自己想哈哈。
我們強(qiáng)調(diào)一下,kotlin 的版本選擇 1.1.0-beta-38,kotlinx.coroutines 的版本選擇 0.6-beta,如果你恰好使用 gradle,那么告訴你一個(gè)好消息,我會(huì)直接告訴你怎么配置:
buildscript { ext.kotlin_version = '1.1.0-beta-38' repositories { jcenter() maven { url "http://dl.bintray.com/kotlin/kotlin-eap-1.1" } } ... } repositories { jcenter() maven { url "http://dl.bintray.com/kotlin/kotlin-eap-1.1" } } kotlin { experimental { coroutines 'enable' } } dependencies { compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version" compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.6-beta' }2. 一個(gè)基本的協(xié)程的例子
這個(gè)例子是 kotlinx.coroutines 的第一個(gè)小例子。
fun main(args: Array<String>) { launch(CommonPool) { // create new coroutine in common thread pool delay(1000L) // non-blocking delay for 1 second (default time unit is ms) println("World!") // print after delay } println("Hello,") // main function continues while coroutine is delayed Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive }這個(gè)例子的運(yùn)行結(jié)果是:
Hello, World!其實(shí)有了上一篇文章的基礎(chǔ)我們很容易知道,launch 方法啟動(dòng)了一個(gè)協(xié)程,CommonPool 是一個(gè)有線程池的上下文,它可以負(fù)責(zé)把協(xié)程的執(zhí)行分配到合適的線程上。所以從線程的角度來(lái)看,打印的這兩句是在不同的線程上的。
20170206-063015.015 [main] Hello, 20170206-063016.016 [ForkJoinPool.commonPool-worker-1] World!這段代碼的執(zhí)行效果與線程的版本看上去是一樣的:
thread(name = "MyThread") { Thread.sleep(1000L) log("World!") } log("Hello,") Thread.sleep(2000L)3. 主線程上的協(xié)程
我們剛才通過(guò) launch 創(chuàng)建的協(xié)程是在 CommonPool 的線程池上面的,所以協(xié)程的運(yùn)行并不在主線程。如果我們希望直接在主線程上面創(chuàng)建協(xié)程,那怎么辦?
fun main(args: Array<String>) = runBlocking<Unit> { launch(CommonPool) { delay(1000L) println("World!") } println("Hello,") delay(2000L) }這個(gè)還是 kotlinx.coroutines 的例子,我們來(lái)分析一下。runBlocking 實(shí)際上也跟 launch 一樣,啟動(dòng)一個(gè)協(xié)程,只不過(guò)它傳入的 context 不會(huì)進(jìn)行線程切換,也就是說(shuō),由它創(chuàng)建的協(xié)程會(huì)直接運(yùn)行在當(dāng)前線程上。
在 runBlocking 當(dāng)中通過(guò) launch 再創(chuàng)建一個(gè)協(xié)程,顯然,這段代碼的運(yùn)行結(jié)果與上一個(gè)例子是完全一樣的。需要注意的是,盡管我們可以在協(xié)程中通過(guò) launch 這樣的方法創(chuàng)建協(xié)程,但不要再協(xié)程當(dāng)中通過(guò) runBlocking 再來(lái)創(chuàng)建協(xié)程,因?yàn)檫@樣做雖然一般來(lái)說(shuō)不會(huì)導(dǎo)致程序異常,不過(guò),這樣的程序也沒有多大意義:
fun main(args: Array<String>) = runBlocking<Unit> { runBlocking { delay(1000L) println("World!") } println("Hello,") }運(yùn)行結(jié)果:
World! Hello,大家看到了,嵌套的 runBlocking 實(shí)際上仍然只是一段順序代碼而已。
那么,讓我們?cè)僮屑?xì)看看前面的例子,不知道大家有沒有問(wèn)題:如果我在 launch 創(chuàng)建的協(xié)程當(dāng)中多磨嘰一會(huì)兒,主線程上的協(xié)程 delay(2000L) 好像也沒多大用啊。有沒有什么方法保證協(xié)程執(zhí)行完?
4. 外部控制協(xié)程
我們?cè)谏弦黄恼庐?dāng)中只是對(duì)內(nèi)置的基礎(chǔ) API 進(jìn)行了簡(jiǎn)單的封裝,而 kotlinx.coroutines 卻為我們做了非常多的事情。比如,每一個(gè)協(xié)程都看做一個(gè) Job,我們?cè)谝粋€(gè)協(xié)程的外部也可以控制它的運(yùn)行。
fun main(args: Array<String>) = runBlocking<Unit> { val job = launch(CommonPool) { delay(1000L) println("World!") } println("Hello,") job.join() }job.join 其實(shí)就是要求當(dāng)前協(xié)程等待 job 執(zhí)行完成之后再繼續(xù)執(zhí)行。
其實(shí),我們還可以取消協(xié)程,讓他直接停止執(zhí)行:
fun main(args: Array<String>) = runBlocking<Unit> { val job = launch(CommonPool) { delay(1000L) println("World!") } println("Hello,") job.cancel() }job.cancel 會(huì)直接終止 job 的執(zhí)行。如果 job 已經(jīng)執(zhí)行完畢,那么 job.cancel 的執(zhí)行時(shí)沒有意義的。我們也可以根據(jù) cancel 的返回值來(lái)判斷是否取消成功。
另外,cancel 還可以提供原因:
job.cancel(IllegalAccessException("World!"))如果我們提供了這個(gè)原因,那么被取消的協(xié)程會(huì)將它打印出來(lái)。
Hello, Exception in thread "main" java.lang.IllegalAccessException: World! at example13.Example_13Kt$main$1.doResume(example-13.kt:14) at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:53) at kotlinx.coroutines.experimental.DispatchedContinuation$resume$1.run(CoroutineDispatcher.kt:57)其實(shí),如果你自己做過(guò)對(duì)線程任務(wù)的取消,你大概會(huì)知道除非被取消的線程自己去檢查取消的標(biāo)志位,或者被 interrupt,否則取消是無(wú)法實(shí)現(xiàn)的,這有點(diǎn)兒像一個(gè)人執(zhí)意要做一件事兒,另一個(gè)人說(shuō)你別做啦,結(jié)果人家壓根兒沒聽見,你說(shuō)他能停下來(lái)嗎?那么我們前面的取消到底是誰(shuí)去監(jiān)聽了這個(gè) cancel 操作呢?
當(dāng)然是 delay 這個(gè)操作了。其實(shí)所有 kotlinx.coroutines 當(dāng)中定義的操作都可以做到這一點(diǎn),我們對(duì)代碼稍加改動(dòng),你就會(huì)發(fā)現(xiàn)異常來(lái)自何處了:
val job = launch(CommonPool) { try { delay(1000L) println("World!") } catch(e: Exception) { e.printStackTrace() }finally { println("finally....") } } println("Hello,") job.cancel(IllegalAccessException("World!"))是的,你沒看錯(cuò),我們居然可以在協(xié)程里面對(duì) cancel 進(jìn)行捕獲,如果你愿意的話,你甚至可以繼續(xù)在這個(gè)協(xié)程里面運(yùn)行代碼,但請(qǐng)不要這樣做,下面的示例破壞了 cancel 的設(shè)計(jì)本意,所以請(qǐng)勿模仿:
val job = launch(CommonPool) { try { ... }finally { println("finally....") } println("I'm an EVIL!!! Hahahaha") }說(shuō)這個(gè)是什么意思呢?在協(xié)程被 cancel 掉的時(shí)候,我們應(yīng)該做的其實(shí)是把戰(zhàn)場(chǎng)打掃干凈,比如:
val job = launch(CommonPool) { val inputStream = ...try {...}finally {inputStream.close()} }我們?cè)賮?lái)考慮下面的情形:
fun main(args: Array<String>) = runBlocking<Unit> { val job = launch(CommonPool) { var nextPrintTime = 0L var i = 0 while (true) { // computation loop val currentTime = System.currentTimeMillis() if (currentTime >= nextPrintTime) { println("I'm sleeping ${i++} ...") nextPrintTime = currentTime + 500L } } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancel() // cancels the job delay(1300L) // delay a bit to see if it was cancelled.... println("main: Now I can quit.") }不得不說(shuō),kotlinx.coroutines 在幾天前剛剛更新的文檔和示例非常的棒。我們看到這個(gè)例子,while(true) 會(huì)讓這個(gè)協(xié)程不斷運(yùn)行來(lái)模擬耗時(shí)計(jì)算,盡管外部調(diào)用了 job.cancel(),但由于內(nèi)部并沒有 care 自己是否被 cancel,所以這個(gè) cancel 顯然有點(diǎn)兒失敗。如果你想要在類似這種耗時(shí)計(jì)算當(dāng)中檢測(cè)當(dāng)前協(xié)程是否被取消的話,你可以這么寫:
... while (isActive) { // computation loop ... } ...isActive 會(huì)在 cancel 之后被置為 false。
其實(shí),通過(guò)這幾個(gè)示例大家就會(huì)發(fā)現(xiàn)協(xié)程的取消,與我們通常取消線程操作的思路非常類似,只不過(guò)人家封裝的比較好,而我們呢,每次還得自己搞一個(gè) CancelableTask 來(lái)實(shí)現(xiàn) Runnable 接口去承載自己的異步操作,想想也是夠原始呢。
5. 輕量級(jí)線程
協(xié)程時(shí)輕量級(jí)的,它擁有自己的運(yùn)行狀態(tài),但它對(duì)資源的消耗卻非常的小。其實(shí)能做到這一點(diǎn)的本質(zhì)原因,我們已經(jīng)在上一篇文章當(dāng)中提到過(guò),一臺(tái)服務(wù)器開 1k 線程和 1k 協(xié)程來(lái)響應(yīng)服務(wù),前者對(duì)資源的消耗必然很大,而后者可能只是基于很少的幾個(gè)或幾十個(gè)線程來(lái)工作的,隨著請(qǐng)求數(shù)量的增加,協(xié)程的優(yōu)勢(shì)可能會(huì)體現(xiàn)的更加明顯。
我們來(lái)看個(gè)比較簡(jiǎn)單的例子:
fun main(args: Array<String>) = runBlocking<Unit> { val jobs = List(100_000) { launch(CommonPool) { delay(1000L) print(".") } } jobs.forEach { it.join() } //這里不能用 jobs.forEach(Job::join),因?yàn)?Job.join 是 suspend 方法 }通過(guò) List 這個(gè)方法,我們可以瞬間創(chuàng)建出很多對(duì)象放入返回的 List,注意到這里的 jobs 其實(shí)就是協(xié)程的一個(gè) List。
運(yùn)行上面的代碼,我們發(fā)現(xiàn) CommonPool 當(dāng)中的線程池的線程數(shù)量基本上維持在三四個(gè)就足夠了,如果我們用線程來(lái)寫上面的代碼會(huì)是什么感覺?
fun main(args: Array<String>) = runBlocking<Unit> { val jobs = List(100_000) { thread { Thread.sleep(1000L) log(".") } } jobs.forEach(Thread::join) // Thread::join 說(shuō)起來(lái)也是 1.1 的新特性呢! }運(yùn)行時(shí),在創(chuàng)建了 1k 多個(gè)線程之后,就拋出了異常:
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method)嗯,又多了一個(gè)用協(xié)程的理由,對(duì)不對(duì)?
6. 攜帶值的 Job
我們前面說(shuō)了,通過(guò)攜程返回的 Job,我們可以控制攜程的運(yùn)行。可有時(shí)候我們更關(guān)注協(xié)程運(yùn)行的結(jié)果,比如從網(wǎng)絡(luò)加載一張圖片:
suspend fun loadImage(url: String): Bitmap { ... return ... }沒錯(cuò),我們更關(guān)注它的結(jié)果,這種情況我們?cè)撛趺崔k呢?如果 loadImage 不是 suspend 方法,那么我們?cè)诜?UI 線程當(dāng)中直接獲取他們:
val imageA = loadImage(urlA) val imageB = loadImage(urlB) onImageGet(imageA, imageB)這樣的操作有什么問(wèn)題?順序獲取兩張圖片,耗時(shí),不經(jīng)濟(jì)。所以傳統(tǒng)的做法就是開兩個(gè)線程做這件事兒,這意味著你會(huì)看到兩個(gè)回調(diào),并且還要同步這兩個(gè)回調(diào),想想都頭疼。
不過(guò)我們現(xiàn)在有更好的辦法:
val imageA = defer(CommonPool) { loadImage(urlA) } val imageB = defer(CommonPool) { loadImage(urlB) } onImageGet(imageA.await(),imageB.await())代碼量幾乎沒有增加,不過(guò)我們卻做到了兩張圖片異步獲取,并同時(shí)傳給 onImageGet 以便繼續(xù)后面的操作。
defer 到底是個(gè)什么東西?其實(shí)大家大可不必看到新詞就感到恐慌,這東西用法幾乎跟 launch 一樣,只不過(guò)它返回的 Deferred 功能比 Job 多了一樣:攜帶返回值。我們前面看到的 imageA 其實(shí)就是一個(gè) Deferred 實(shí)例,而它的 await 方法返回的則是 Bitmap 類型,也即 loadImage(urlA) 的返回值。
所以如果你對(duì)協(xié)程運(yùn)行的結(jié)果感興趣,直接使用 defer 來(lái)替換你的 launch 就可以了。需要注意的是,即便你不調(diào)用 await,defer 啟動(dòng)的協(xié)程也會(huì)立即運(yùn)行,如果你希望你的協(xié)程能夠按需啟動(dòng),例如只有你調(diào)用 await 之后再啟動(dòng),那么你可以用 lazyDefer:
val imageA = lazyDefer(CommonPool) { loadImage(urlA) } val imageB = lazyDefer(CommonPool) { loadImage(urlB) } onImageGet(imageA.await(),imageB.await()) //這時(shí)候才開始真正去加載圖片7. 生成器
不知道大家對(duì) python 的生成器有沒有了解,這個(gè)感覺就好似延遲計(jì)算一樣。
假設(shè)我們要計(jì)算 fibonacci 數(shù)列,這個(gè)大家都知道,也非常容易寫,你可能分分鐘寫出一個(gè)遞歸的函數(shù)來(lái)求得這個(gè)序列,不過(guò)你應(yīng)該知道遞歸的層級(jí)越多,stackOverflow 的可能性越大吧?另外,如果我們只是用到其中的幾個(gè),那么遞歸的函數(shù)一下子都給求出來(lái),而且每次調(diào)用也沒有記憶性導(dǎo)致同一個(gè)值計(jì)算多次,非常不經(jīng)濟(jì)。大家看一個(gè) python 的例子:
def fibonacci(): yield 1 # 直接返回 1, 并且在此處暫停 first = 1 second = 1 while True: yield first first, second = first + second, first a = fibonacci() for x in a: print x if x > 100: break前面給出的這種計(jì)算方法,fibonacci 函數(shù)返回一個(gè)可迭代的對(duì)象,這個(gè)對(duì)象其實(shí)就是生成器,只有我們?cè)诘臅r(shí)候,它才會(huì)去真正執(zhí)行計(jì)算,只要遇到 yield,那么這一次迭代到的值就是 yield 后面的值,比如,我們第一次調(diào)用 fibonacci 這個(gè)函數(shù)的時(shí)候,得到的值就是 1,后面依次類推。
Kotlin 在添加了協(xié)程這個(gè)功能之后,也可以這么搞了:
val fibonacci = buildSequence { yield(1) // first Fibonacci number var cur = 1 var next = 1 while (true) { yield(next) // next Fibonacci number val tmp = cur + next cur = next next = tmp } } ... for (i in fibonacci){ println(i) if(i > 100) break //大于100就停止循環(huán) }可以這么說(shuō),這段代碼與前面的 python 版本功能是完全相同的,在 yield 方法調(diào)用時(shí),傳入的值就是本次迭代的值。
fibonacci 這個(gè)變量的類型如下:
public interface Sequence<out T> { public operator fun iterator(): Iterator<T> }既然有 iterator 方法,那么我們可以直接對(duì) fibonacci 進(jìn)行迭代也就沒什么大驚小怪的了。這個(gè) iterator 保證每次迭代的時(shí)候去執(zhí)行 buildSequence 后面的 Lambda 的代碼,從上一個(gè) yield 之后開始到下一個(gè) yield 結(jié)束,yield 傳入的值就是 iterator 的 next 的返回值。
有了這個(gè)特性,我們就可以構(gòu)造許多“懶”序列,只有在用到的時(shí)候才去真正計(jì)算每一個(gè)元素的值,而且運(yùn)算狀態(tài)可以保存,每次計(jì)算的結(jié)果都不會(huì)浪費(fèi)。
注:這個(gè)特性是被 Kotlin 標(biāo)準(zhǔn)庫(kù)收錄了的,并不存在于 kotlinx.coroutines 當(dāng)中,不過(guò)這也沒關(guān)系啦,kotlinx.coroutines 的 API 會(huì)不會(huì)在不久的將來(lái)也作為 Kotlin 標(biāo)準(zhǔn)庫(kù)的內(nèi)容出現(xiàn)呢?
8. 小結(jié)
這一篇的內(nèi)容其實(shí)相對(duì)上一篇要簡(jiǎn)單一些,面對(duì) kotlinx.coroutines 這樣的框架,我們直接通過(guò)分析案例,將 coroutine 這么理論化的東西投入實(shí)際場(chǎng)景,讓大家從感性上對(duì)其有個(gè)更加深入的認(rèn)識(shí)。
當(dāng)然,我們并沒有深入其中了解其原理,原因就是上一篇我們?yōu)榇俗隽俗銐虻臏?zhǔn)備 —— kotlinx.coroutines 作為官方的框架,自然要實(shí)現(xiàn)得完善一些,但也是萬(wàn)變不離其宗。
寫到這里,我想,我們還是需要有一篇文章再來(lái)介紹一些協(xié)程使用的一些注意事項(xiàng),那么我們下一篇再見吧。
總結(jié)
以上是生活随笔為你收集整理的深入理解 Kotlin coroutine (二)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 深入理解 Kotlin Coroutin
- 下一篇: 细说 Lambda 表达式