线程中task取消_Rust Async: async-task源码分析
async-std是rust異步生態(tài)中的基礎(chǔ)運行時庫之一,核心理念是合理的性能 + 用戶友好的api體驗。經(jīng)過幾個月密集的開發(fā),前些天已經(jīng)發(fā)布1.0穩(wěn)定版本。因此是時候來一次深入的底層源碼分析。async-std的核心是一個帶工作竊取的多線程Executor,而其本身的實現(xiàn)又依賴于async-task這個關(guān)鍵庫,因此本文主要對async-task的源碼進(jìn)行分析。
當(dāng)Future提交給Executor執(zhí)行時,Executor需要在堆上為這個Future分配空間,同時需要給它分配一些狀態(tài)信息,比如Future是否可以執(zhí)行(poll),是否在等待被喚醒,是否已經(jīng)執(zhí)行完成等等。我們一般把提交給Executor執(zhí)行的Future和其連帶的狀態(tài)稱為 task。async-task這個庫就是對task進(jìn)行抽象封裝,以便于Executor的實現(xiàn),其有幾個創(chuàng)新的特性:
使用方式
async-task只對外暴露了一個函數(shù)接口以及對應(yīng)了兩個返回值類型:
pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>) whereF: Future<Output = R> + Send + 'static,R: Send + 'static,S: Fn(Task<T>) + Send + Sync + 'static,T: Send + Sync + 'static,其中,參數(shù)future表示要執(zhí)行的Future,schedule是一個閉包,當(dāng)task變?yōu)榭蓤?zhí)行狀態(tài)時會調(diào)用這個函數(shù)以調(diào)度該task重新執(zhí)行,tag是附帶在該task上的額外上下文信息,比如task的名字,id等。 返回值Task就是構(gòu)造好的task對象,JoinHandle實現(xiàn)了Future,用于接收最終執(zhí)行的結(jié)果。
值得注意的是spawn這個函數(shù)并不會做類似在后臺進(jìn)行計算的操作,而僅僅是分配內(nèi)存,創(chuàng)建一個task出來,因此其實叫create_task反而更為恰當(dāng)且好理解。
Task提供了如下幾個方法:
// 對該task進(jìn)行調(diào)度pub fn schedule(self);// poll一次內(nèi)部的Future,如果Future完成了,則會通知JoinHandle取結(jié)果。否則task進(jìn)// 入等待,直到被被下一次喚醒進(jìn)行重新調(diào)度執(zhí)行。pub fn run(self);// 取消task的執(zhí)行pub fn cancel(&self);// 返回創(chuàng)建時傳入的tag信息pub fn tag(&self) -> &T;JoinHandle實現(xiàn)了Future trait,同時也提供了如下幾個方法:
// 取消task的執(zhí)行pub fn cancel(&self);// 返回創(chuàng)建時傳入的tag信息pub fn tag(&self) -> &T;同時,Task和JoinHandle都實現(xiàn)了Send+Sync,所以他們可以出現(xiàn)在不同的線程,并通過tag方法可以同時持有 &T,因此spawn函數(shù)對T有Sync的約束。
借助于async_task的抽象,下面的幾十行代碼就實現(xiàn)了一個共享全局任務(wù)隊列的多線程Executor:
use std::future::Future; use std::thread;use crossbeam::channel::{unbounded, Sender}; use futures::executor; use once_cell::sync::Lazy;static QUEUE: Lazy<Sender<async_task::Task<()>>> = Lazy::new(|| {let (sender, receiver) = unbounded::<async_task::Task<()>>();for _ in 0..4 {let recv = receiver.clone();thread::spawn(|| {for task in recv {task.run(); }});}sender });fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, ()> whereF: Future<Output = R> + Send + 'static,R: Send + 'static, {let schedule = |task| QUEUE.send(task).unwrap();let (task, handle) = async_task::spawn(future, schedule, ());task.schedule();handle }fn main() {let handles: Vec<_> = (0..10).map(|i| {spawn(async move {println!("Hello from task {}", i);})}).collect();// Wait for the tasks to finish.for handle in handles {executor::block_on(handle);} }Task的結(jié)構(gòu)圖
通常rust里的并發(fā)數(shù)據(jù)結(jié)構(gòu)會包含底層的實現(xiàn),一般叫Inner或者RawXXX,包含大量裸指針等unsafe操作,然后再其基礎(chǔ)上進(jìn)行類型安全包裝,提供上層語義。比如channel,上層暴露出 Sender和 Receiver,其行為不一樣,但內(nèi)部表示是完全一樣的。async-task也類似,JoinHandle, Task以及調(diào)用Future::poll時傳遞的Waker類型內(nèi)部都共享同一個RawTask結(jié)構(gòu)。由于JoinHandle本身是一個Future,整個并發(fā)結(jié)構(gòu)還有第四個角色-在JoinHandle上調(diào)用poll的task傳遞的Waker,為避免引起混淆就稱它為Awaiter吧。整個的結(jié)構(gòu)圖大致如下:
整個task在堆上一次分配,內(nèi)存布局按Header,Tag, Schedule,Future/Output排列。由于Future和Output不同時存在,因此他們共用同一塊內(nèi)存。
- JoinHandle:只有一個,不訪問Future,可以訪問Output,一旦銷毀就不再生成;
- Task:主要訪問Future,銷毀后可以繼續(xù)生成,不過同一時間最多只有一個,這樣可以避免潛在的多個Task對Future進(jìn)行并發(fā)訪問的bug;
- Waker:可以存在多份,主要訪問schedule數(shù)據(jù),由于spawn函數(shù)的參數(shù)要求schedule必須是Send+Sync,因此多個waker并發(fā)調(diào)用是安全的。
- Header:本身包含三個部分,state是一個原子變量,包含引用計數(shù),task的執(zhí)行狀態(tài),awaiter鎖等信息;awaiter保存的是JoinHandle所在的task執(zhí)行時傳遞的Waker,用于當(dāng)Output生成后通知JoinHandle來取;vtable是一個指向靜態(tài)變量的虛表指針。
task中的狀態(tài)
所有的并發(fā)操作都是通過Header中的state這個原子變量來進(jìn)行同步協(xié)調(diào)的。主要有以下幾種flag:
JoinHandle的實現(xiàn)分析
JoinHandle::cancel
為避免并發(fā)問題,JoinHandle不接觸Future數(shù)據(jù),而由于取消task的執(zhí)行需要析構(gòu)Future數(shù)據(jù),因此cancel操作通過重新schedule一次,把操作傳遞給Task執(zhí)行。
impl<R, T> JoinHandle<R, T> {pub fn cancel(&self) {let ptr = self.raw_task.as_ptr();let header = ptr as *const Header;unsafe {let mut state = (*header).state.load(Ordering::Acquire);loop {// 如果task已經(jīng)結(jié)束或者closed,什么也不做。if state & (COMPLETED | CLOSED) != 0 {break;}let new = if state & (SCHEDULED | RUNNING) == 0 {// 如果不處于scheduled或running狀態(tài),那么下面就需要調(diào)用schedule// 函數(shù)通知Task,因此要加上SCHEDULED 和增加引用計數(shù)(state | SCHEDULED | CLOSED) + REFERENCE} else {// 否則要么task已經(jīng)schedue過了,過段時間會重新執(zhí)行,要么當(dāng)前正在// 運行,因此只需要設(shè)置closed狀態(tài),task執(zhí)行完后會收到close狀態(tài)并// 進(jìn)行處理。state | CLOSED};match (*header).state.compare_exchange_weak(state,new,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {// 重新schedule以便executor將Future銷毀if state & (SCHEDULED | RUNNING) == 0 {((*header).vtable.schedule)(ptr);}// 如果有awaiter的話,通知相應(yīng)的的task。if state & AWAITER != 0 {(*header).notify();}break;}Err(s) => state = s,// 失敗重試}}}} }JoinHandle::drop
由于整個task的所有權(quán)是由JoinHandle,Task和Waker共享的,因此都需要手動實現(xiàn)drop。Output只會由JoinHandle訪問,因此如果有的話也要一同銷毀。
impl<R, T> Drop for JoinHandle<R, T> {fn drop(&mut self) {let ptr = self.raw_task.as_ptr();let header = ptr as *const Header;let mut output = None;unsafe {// 由于很多時候JoinHandle不用,會在剛創(chuàng)建的時候直接drop掉,因此針對這種情// 況作一個特殊化處理。這樣一個原子操作就完成了。if let Err(mut state) = (*header).state.compare_exchange_weak(SCHEDULED | HANDLE | REFERENCE,SCHEDULED | REFERENCE,Ordering::AcqRel,Ordering::Acquire,) {loop {// 如果task完成了,但是還沒有close掉,說明output還沒有被取走,需// 要在這里取出來進(jìn)行析構(gòu)。if state & COMPLETED != 0 && state & CLOSED == 0 {// 標(biāo)記為closed,這樣就可以安全地讀取output的數(shù)據(jù)。match (*header).state.compare_exchange_weak(state,state | CLOSED,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => { output =Some((((*header).vtable.get_output)(ptr) as *mut R).read());// 更新狀態(tài)重新循環(huán)state |= CLOSED;}Err(s) => state = s,}} else {// 進(jìn)到這里說明task要么沒完成,要么已經(jīng)closed了。let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {// Task和Waker都已經(jīng)沒了,并且沒closed,根據(jù)進(jìn)else的條// 件可知task沒完成,Future還在,重新schedule一次,讓// executor把Future析構(gòu)掉。SCHEDULED | CLOSED | REFERENCE} else {// 移除HANDLE flagstate & !HANDLE};match (*header).state.compare_exchange_weak(state,new,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {// 如果這是最后一個引用if state & !(REFERENCE - 1) == 0 { if state & CLOSED == 0 {//并且沒closed,根據(jù)進(jìn)else的條件可知task沒// 完成,重新schedule一次,析構(gòu)Future((*header).vtable.schedule)(ptr);} else {// task已經(jīng)完成了,output也已經(jīng)在上面讀出// 來了,同時也是最后一個引用,需要把task自// 身析構(gòu)掉。((*header).vtable.destroy)(ptr);}}// 還有其他引用在,資源的釋放由他們負(fù)責(zé)。break;}Err(s) => state = s,}}}}}// 析構(gòu)讀取出來的outputdrop(output);} }JoinHandle::poll
檢查Output是否已經(jīng)可以拿,沒有的話注冊cx.waker()等通知。
impl<R, T> Future for JoinHandle<R, T> {type Output = Option<R>;fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {let ptr = self.raw_task.as_ptr();let header = ptr as *const Header;unsafe {let mut state = (*header).state.load(Ordering::Acquire);loop {// task已經(jīng)closed了,沒output可拿。if state & CLOSED != 0 {// 大部分可情況下,header里的awaiter就是cx.waker,也有例外,因// 此一并進(jìn)行通知。(*header).notify_unless(cx.waker());return Poll::Ready(None);}// 如果task還沒完成if state & COMPLETED == 0 {// 那么注冊當(dāng)前的cx.waker到Header::awaiter里,這樣完成了可以收// 到通知。abort_on_panic(|| {(*header).swap_awaiter(Some(cx.waker().clone()));});// 要是在上面注冊前正好task完成了,那么就收不到通知了,因此注冊后// 需要重新讀取下狀態(tài)看看。state = (*header).state.load(Ordering::Acquire);// task已經(jīng)closed了,沒output可拿,返回None。if state & CLOSED != 0 {// 這里我分析下來是不需要再通知了,提了個pr等作者回應(yīng)。(*header).notify_unless(cx.waker());return Poll::Ready(None);}// task還沒完成,上面已經(jīng)注冊了waker,可以直接返回Pending。if state & COMPLETED == 0 {return Poll::Pending;}}// 到這里說明task已經(jīng)完成了。把它設(shè)置為closed狀態(tài),就可以拿output了。match (*header).state.compare_exchange(state,state | CLOSED,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {// 設(shè)置closed成功,通知其他的awaiter。由于上面是原子的swap操// 作,且一旦設(shè)置為closed,awaiter就不會再變更了,因此可以// 用AWAITER這個flag進(jìn)行快速判斷。if state & AWAITER != 0 {(*header).notify_unless(cx.waker());}// 讀取出Output并返回。let output = ((*header).vtable.get_output)(ptr) as *mut R;return Poll::Ready(Some(output.read()));}Err(s) => state = s,}}}} }Task的實現(xiàn)分析
Task::schedule
這個函數(shù)先通過Task內(nèi)部保存的指針指向Header,并從Header的vtable字段中拿到schedule函數(shù)指針,這個函數(shù)最終調(diào)用的是用戶調(diào)用spawn時傳入的schedule閉包。因此本身很直接。
Task::run
這個函數(shù)先通過Task內(nèi)部保存的指針指向Header,并從Header的vtable字段中拿到run函數(shù)指針,其指向RawTask::run,實現(xiàn)如下:
首先根據(jù)指針參數(shù)強(qiáng)轉(zhuǎn)為RawTask,并根據(jù)Header的vtable拿到RawWakerVTable,構(gòu)造好Waker和Context,為調(diào)用Future::poll做準(zhǔn)備。
unsafe fn run(ptr: *const ()) {let raw = Self::from_ptr(ptr);let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr,&(*raw.header).vtable.raw_waker,)));let cx = &mut Context::from_waker(&waker);//... }然后獲取當(dāng)前的state,循環(huán)直到更新state的RUNING成功為止。
let mut state = (*raw.header).state.load(Ordering::Acquire);loop {// 如果task已經(jīng)closed,那么Future可以直接析構(gòu)掉,并返回。if state & CLOSED != 0 {if state & AWAITER != 0 {(*raw.header).notify();}Self::drop_future(ptr);// 扣掉當(dāng)前task的引用計數(shù),因為run函數(shù)的參數(shù)是self。Self::decrement(ptr);return;}// 移除SCHEDULED狀態(tài),并標(biāo)記RUNINGmatch (*raw.header).state.compare_exchange_weak(state,(state & !SCHEDULED) | RUNNING,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {// 更新state到新的狀態(tài),后面的代碼還要復(fù)用state。state = (state & !SCHEDULED) | RUNNING;break;}Err(s) => state = s,}}標(biāo)記為RUNING狀態(tài)后,就可以開始正式調(diào)用Future::poll了,不過在調(diào)用前設(shè)置Guard,以便poll函數(shù)panic時,可以調(diào)用Guard的drop函數(shù)保證狀態(tài)一致。
let guard = Guard(raw);let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);mem::forget(guard); // 沒panic,移除掉guard.drop的調(diào)用。match poll {Poll::Ready(out) => {/// ... }Poll::Pending => {// ... }}如果Future完成了,那么先把Future析構(gòu)掉,騰出內(nèi)存把output寫進(jìn)去。并循環(huán)嘗試將RUNING狀態(tài)去掉。
match poll {Poll::Ready(out) => {Self::drop_future(ptr);raw.output.write(out);let mut output = None;loop {// JoinHandle已經(jīng)沒了,那么output沒人取,我們需要析構(gòu)掉output,并設(shè)置為// closed狀態(tài)。let new = if state & HANDLE == 0 {(state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED} else {(state & !RUNNING & !SCHEDULED) | COMPLETED};match (*raw.header).state.compare_exchange_weak(state,new,Ordering::AcqRel,Ordering::Acquire,) {Ok(_) => {// 如果handle沒了,或者跑的時候closed了,那么需要把output再讀取// 出來析構(gòu)掉。if state & HANDLE == 0 || state & CLOSED != 0 {output = Some(raw.output.read());}// 通知JoinHandle來取數(shù)據(jù)。if state & AWAITER != 0 {(*raw.header).notify();}Self::decrement(ptr);break;}Err(s) => state = s,}}drop(output);}Poll::Pending => {// ...}如果沒完成的話,循環(huán)嘗試移除RUNING,同時在poll的時候其他線程不能調(diào)用shedule函數(shù),而是設(shè)置SCHEDULED,所以需要檢查這個flag,如果設(shè)置了,則需要代勞。
match poll {Poll::Ready(out) => {/// handle ready case ... }Poll::Pending => {loop {// poll的時候closed了,這里為啥要移除SCHEDULED狀態(tài),暫時不清楚,需要問問// 作者。let new = if state & CLOSED != 0 {state & !RUNNING & !SCHEDULED} else {state & !RUNNING};match (*raw.header).state.compare_exchange_weak(state,new,Ordering::AcqRel,Ordering::Acquire,) {Ok(state) => {if state & CLOSED != 0 {// 設(shè)置closed狀態(tài)的那個線程是不能碰Future的,否則和當(dāng)前線程// 產(chǎn)生內(nèi)存并發(fā)訪問沖突。因此代勞析構(gòu)操作。Self::drop_future(ptr);Self::decrement(ptr);} else if state & SCHEDULED != 0 {// poll的時候其他線程想schedule這個task,但是不能調(diào)用,因此// 當(dāng)前線程代勞。 chedule函數(shù)接收self,類似move語義,因此這里// 不需要decrement。Self::schedule(ptr);} else {Self::decrement(ptr);}break;}Err(s) => state = s,}}} }在poll時如果發(fā)生panic,則Guard負(fù)責(zé)收拾殘局。
fn drop(&mut self) {let raw = self.0;let ptr = raw.header as *const ();unsafe {let mut state = (*raw.header).state.load(Ordering::Acquire);loop {// poll的時候被其他線程closed了,if state & CLOSED != 0 {// 看代碼state一旦處于CLOSED后,schedule不會再運行。這里為啥要移除// SCHEDULED狀態(tài),暫時不清楚,需要問問作者。(*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);// 析構(gòu)FutureRawTask::<F, R, S, T>::drop_future(ptr);RawTask::<F, R, S, T>::decrement(ptr);break;}match (*raw.header).state.compare_exchange_weak(state,(state & !RUNNING & !SCHEDULED) | CLOSED,Ordering::AcqRel,Ordering::Acquire,) {Ok(state) => {// 析構(gòu)FutureRawTask::<F, R, S, T>::drop_future(ptr);// 通知awaitertask已經(jīng)close了.if state & AWAITER != 0 {(*raw.header).notify();}RawTask::<F, R, S, T>::decrement(ptr);break;}Err(s) => state = s,}}} }Waker相關(guān)函數(shù)的實現(xiàn)
wake函數(shù)
wake函數(shù)主要功能是設(shè)置SCHEDULE狀態(tài),并嘗試調(diào)用schedule函數(shù),有兩個重要的細(xì)節(jié)需要注意:
wake_by_ref
這個函數(shù)的功能和wake類似,唯一的區(qū)別就是wake的參數(shù)是self,有move語義,wakebyref是&self。實現(xiàn)差異不大,就不做具體分析了。
clone_waker
waker的clone實現(xiàn)也比較簡單,直接將Header里的state的引用計數(shù)加一即可。
unsafe fn clone_waker(ptr: *const ()) -> RawWaker {let raw = Self::from_ptr(ptr);let raw_waker = &(*raw.header).vtable.raw_waker;let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);if state > isize::max_value() as usize {std::process::abort();}RawWaker::new(ptr, raw_waker) }總結(jié)
整個task的設(shè)計非常精細(xì),api也非常直觀,難怪一發(fā)布就直接上1.0版本。
總結(jié)
以上是生活随笔為你收集整理的线程中task取消_Rust Async: async-task源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java 中的doit(n)_CoreJ
- 下一篇: JAVA入门级教学之(构造方法)