pthread异步_探索 Flutter 异步消息的实现
本文作者:趙旭陽
字節跳動資深工程師
一、簡介
我們在進行 Android 開發的時候,會通過創建一個 Handler 并調用其 sendMessage ?或 Post 方法來進行異步消息調用,其背后涉及到了三個面試經常被問的類:Handler,Looper,MessageQueue,內部原理我想做過 Android 開發的基本都了解。
Flutter 使用 dart 開發,其也有類似的異步消息機制,具體參見這篇文章:https://dart.dev/articles/archive/event-loop,按這個文章的說法,在 Dart 應用中有一個事件循環(message loop)和兩個隊列(Event queue 和 Microtask queue)。
Event queue : 包含了所有的外部事件:I/O,鼠標點擊,繪制,定時器,Dart isolate 的消息等,其實這塊又根據消息的優先級細分成了兩個隊列,后面會有介紹。
Microtask queue :事件處理代碼有時需要在當前 event 之后,且在下一個 event 之前做一些任務。
兩種隊列的消息處理流程大致如圖所示:
dart 提供了 dart:async 庫來對這兩個隊列進行操作,主要是如下兩個API:
Future 類,創建一個定時器事件到 Event queue 尾部。
scheduleMicrotask() 方法,添加一個事件到 Microtask queue 尾部
下面將會分析這兩個 API 背后的實現,涉及的是 flutter engine 的源碼,可以參考官方的 wiki 來下載:
https://github.com/flutter/flutter/wiki/Setting-up-the-Engine-development-environment。
二、初始化
1. ?創建 MessageLoop
在啟動 Flutter 的時候,引擎會額外創建三個線程:UI Thread,IO Thread, GPU Thread,并為每個線程都創建一個 MessageLoop,之后各個線程就進入的消息循環的狀態,等待新的消息來處理,具體流程如下:
/src/flutter/shell/platform/android/android_shell_holder.cc
AndroidShellHolder::AndroidShellHolder(
????flutter::Settings?settings,
????fml::jni::JavaObjectWeakGlobalRef?java_object,
????bool?is_background_view)
????:?settings_(std::move(settings)),?java_object_(java_object)?{
......
??if?(is_background_view)?{
????thread_host_?=?{thread_label,?ThreadHost::Type::UI};
??}?else?{
????//?創建三個線程
????thread_host_?=?{thread_label,?ThreadHost::Type::UI?|?ThreadHost::Type::GPU?|
??????????????????????????????????????ThreadHost::Type::IO};
??}
......
}
進一步分析 ThreadHost 的創建,最后來到了創建 Thread,每個 Thread 都會創建一個 MessageLoop 并獲取其 TaskRunner,TaskRunner 是用來外部向 MessageLoop 中 Post Task 的。順帶說一下,在 Android 上 MessageLoop 的實現還是使用的系統自身的 Looper 機制,這里是通過 NDK 的 ALooper 相關接口來實現的。具體代碼如下:
/src/flutter/fml/thread.cc
??fml::AutoResetWaitableEvent?latch;
??fml::RefPtr<:taskrunner>?runner;
??thread_?=?std::make_unique<:thread>([&latch,?&runner,?name]()?->?void?{
????SetCurrentThreadName(name);//?創建?MessageLoop
????fml::MessageLoop::EnsureInitializedForCurrentThread();
????auto&?loop?=?MessageLoop::GetCurrent();//?獲取?TaskRunner
????runner?=?loop.GetTaskRunner();
????latch.Signal();
????loop.Run();
??});
??latch.Wait();
??task_runner_?=?runner;
}
MessageLoop 創建好后,我們就可以通過 TaskRunner 向其發送 Task 了,這里需要注意 MessageLoop 執行的 Task 僅是一個 無參的閉包 。類似這樣:
??FML_CHECK(pthread_setspecific(key,?reinterpret_cast<void*>(1))?==?0);
});
thread_host_.ui_thread->GetTaskRunner()->PostTask(jni_exit_task);
2. ?創建 Root Isolate
在 dart 語言中是沒有線程的,而是使用類似于線程但相互之間不共享堆內存的 isolate,代表一個獨立的 dart 程序執行環境。同樣 Flutter 的 dart 代碼是運行在一個叫 root isolate 的 isolate 中,下面簡要列下 root isolate 的創建過程。
a.啟動 dart vm
這個步驟的具體流程,大家可以順著 /engine-v1.5.4/src/flutter/runtime/dart_vm.cc 中的 DartVM 構造方法去跟進分析。在 dart vm 啟動過程中會創建 vm isolate 和 PortMap,這兩個的具體作用下面有介紹。
b.創建 root isolate
root isolate 是在 UI 線程中創建的,具體流程見 /src/flutter/runtime/dart_isolate.cc 的 CreateRootIsolate 方法。由于 isolate 是對當前線程執行環境的一個抽象表示,所以其內部存儲了很多信息,對于異步消息這塊有四個關鍵的信息是需要注意的。
下面三個字段是定義在 dart vm 層的 Isolate 類中,具體見 /src/third_party/dart/runtime/vm/isolate.h。
main_port :可以看做該 isolate 的標識,是一個整數;
message_handler :顧名思義,用來管理每個 isolate 的 Event queue,其內部根據 message 的優先級將消息分為了兩個隊列:普通優先級的 queue 和 OOB 優先級的 oob_queue。了解 TCP 協議的應該了解 TCP 中有帶外數據(即優先數據),isolate 的 OOB 優先級也是類似的意思,OOB 優先級的消息會被優先處理,目前看到有這么幾種 OOB 消息:
kPauseMsg?=?1,
kResumeMsg?=?2,
kPingMsg?=?3,
kKillMsg?=?4,
kAddExitMsg?=?5,
kDelExitMsg?=?6,
kAddErrorMsg?=?7,
kDelErrorMsg?=?8,
kErrorFatalMsg?=?9,
//?可以注意下?kLowMemoryMsg?,如果有大量?OOB?懷疑是內存不夠了
kInterruptMsg?=?10,?????//?Break?in?the?debugger.
kInternalKillMsg?=?11,??//?Like?kill,?but?does?not?run?exit?listeners,?etc.
kLowMemoryMsg?=?12,?????//?Run?compactor,?etc.
kDrainServiceExtensionsMsg?=?13,??//?Invoke?pending?service?extensions
message_notify_callback :message_handler 收到消息后會調用該變量指向的函數去處理;
Flutter 引擎層會對 dart vm 的 isolate 實例做一層包裝( DartIsolate 類),其內部定義了:
microtask_queue: 用來存儲 microtask 消息,可見在 Flutter 引擎中,Microtask queue 并不是由 dart vm 層來管理的
前面已經說過 isolate 之間是不能直接互相訪問,如圖:
可以看出 isolate 之間是共享 vm isolate 的堆內存區域的,有點類似于操作系統的內核空間,vm isolate 的堆內存存儲了 dart vm 內部的核心數據(內置類,內置對象)。除了 vm isolate,不同 isolate 之間的堆內存是不能直接訪問的,為此 dart vm 提供了 isolate 之間的通信機制,負責通信路由的大管家就是 PortMap,其內部實現就是一張 Hash 表,Key 為 isolate 的 main_port,Value 為 isolate 的 message_handler。
三、創建 Future
使用 dart 開發一定會用到 Future,當我們通過 Future 構造方法創建一個實例的時候,就會創建一個定時器消息到 Event queue,下面我們將分析這個流程。整體架構圖:
1. ?dart 層創建 Future
創建 Future 的時候,內部會通過 Timer 構造一個定時器,具體代碼如下:
/src/out/host_release/dart-sdk/lib/async/future.dart
factory?Future(FutureOr?computation())?{??_Future?result?=?new?_Future();
??Timer.run(()?{try?{
??????result._complete(computation());
????}?catch?(e,?s)?{
??????_completeWithErrorCallback(result,?e,?s);
????}
??});return?result;
}
跟進 Timer 的實現,具體代碼如下:
/src/out/host_release/dart-sdk/lib/async/timer.dart
static?void?run(void?callback())?{??new?Timer(Duration.zero,?callback);
}
factory?Timer(Duration?duration,?void?callback())?{
??if?(Zone.current?==?Zone.root)?{
????return?Zone.current.createTimer(duration,?callback);
??}
......
}
這里假定 duration == Duration.zero,Zone.current == Zone.root ,進而到 rootZone 的 createTimer 方法,里面又調用了 Timer 的 _createTimer 方法:
/src/out/host_release/dart-sdk/lib/async/zone.dart
class?_RootZone?extends?_Zone?{......
??Timer?createTimer(Duration?duration,?void?f())?{
????return?Timer._createTimer(duration,?f);
??}
......
}
/src/out/host_release/dart-sdk/lib/async/timer.dart
external?static?Timer?_createTimer(Duration?duration,?void?callback());可以看到 _createTimer 方法是個 external 的,按照 dart 語言的規范,external 方法的實現都在對應的 patch 文件中( timer_patch.dart),內部通過 _TimerFactory._factory 來創建 Timer,具體代碼如下:
/src/third_party/dart/runtime/lib/timer_patch.dart
@patchclass?Timer?{
......
??@patch
??static?Timer?_createTimer(Duration?duration,?void?callback())?{
????if?(_TimerFactory._factory?==?null)?{
??????_TimerFactory._factory?=?VMLibraryHooks.timerFactory;
????}
......
????int?milliseconds?=?duration.inMilliseconds;
????if?(milliseconds?0)?milliseconds?=?0;
???//?注意此處將外部的?callback?又包了一層
????return?_TimerFactory._factory(milliseconds,?(_)?{
??????callback();
????},?false);
??}
......
}
通過上面的代碼,我們知道 _TimerFactory._factory = VMLibraryHooks.timerFactory, VMLibraryHooks.timerFactory 又是在 root isolate 初始化時通過調用 _setupHooks 方法設置的,具體代碼如下:
/src/third_party/dart/runtime/lib/timer_impl.dart
@pragma("vm:entry-point",?"call")_setupHooks()?{
??VMLibraryHooks.timerFactory?=?_Timer._factory;
}
//?VMLibraryHooks.timerFactory?指向的該方法
//?我們假設創建的是非?repeating?消息,并且?milliSeconds?為?0
static?Timer?_factory(
??????int?milliSeconds,?void?callback(Timer?timer),?bool?repeating)?{
......
????return?new?_Timer(milliSeconds,?callback);
??}
}
factory?_Timer(int?milliSeconds,?void?callback(Timer?timer))?{
??return?_createTimer(callback,?milliSeconds,?false);
}
//?創建一個?Timer?實例并調用?_enqueue?將其加入到隊列
static?Timer?_createTimer(
????void?callback(Timer?timer),?int?milliSeconds,?bool?repeating)?{
......
??_Timer?timer?=
??????new?_Timer._internal(callback,?wakeupTime,?milliSeconds,?repeating);
??timer._enqueue();
??return?timer;
}
_Timer._internal(
????this._callback,?this._wakeupTime,?this._milliSeconds,?this._repeating)
????:?_id?=?_nextId();
//?這里?_milliSeconds?==?0,會向?ZeroTimer?隊列插入消息,然后調用?_notifyZeroHandler
void?_enqueue()?{
??if?(_milliSeconds?==?0)?{
????if?(_firstZeroTimer?==?null)?{
??????_lastZeroTimer?=?this;
??????_firstZeroTimer?=?this;
????}?else?{
??????_lastZeroTimer._indexOrNext?=?this;
??????_lastZeroTimer?=?this;
????}
????//?Every?zero?timer?gets?its?own?event.
????_notifyZeroHandler();
??}?else?{
???......
????//?延遲消息這里先不分析
??}
}
折騰了一大圈,最后只是構造了一個 _Timer 實例并把其加入到 ZeroTimer 隊列中,如果是延遲消息則會加入到 TimeoutTimerHeap 中,最后調用 _notifyZeroHandler 方法, 其主要做如下操作:
創建 RawReceivePort 并設置一個叫 _handleMessage 方法做為引擎層的回調方法
向引擎層 Event queue 發送一個普通優先級的 ?_ZERO_EVENT ,引擎層處理該消息的時候會最終回調到上面設置的 _handleMessage 方法。
具體代碼如下:
/src/third_party/dart/runtime/lib/timer_impl.dart
static?void?_notifyZeroHandler()?{??if?(_sendPort?==?null)?{
????_createTimerHandler();
??}
//?底層會調到?PortMap?的?PostMessage?方法,進而喚醒消息處理,后面會分析這個流程
??_sendPort.send(_ZERO_EVENT);
}
//?創建和引擎層通信的?RawReceivePort,并設置引擎層的回調方法?_handleMessage
static?void?_createTimerHandler()?{
??assert(_receivePort?==?null);
??assert(_sendPort?==?null);
??_receivePort?=?new?RawReceivePort(_handleMessage);
??_sendPort?=?_receivePort.sendPort;
??_scheduledWakeupTime?=?null;
}
/src/third_party/dart/runtime/lib/isolate_patch.dart
@patchclass?RawReceivePort?{
??@patch
??factory?RawReceivePort([Function?handler])?{
????_RawReceivePortImpl?result?=?new?_RawReceivePortImpl();
????result.handler?=?handler;
????return?result;
??}
}
//?最終將回調設置到?_RawReceivePortImpl?的?_handlerMap?中,引擎層會從這個?map?尋找消息的?handler
@pragma("vm:entry-point")
class?_RawReceivePortImpl?implements?RawReceivePort?{
????void?set?handler(Function?value)?{
??????_handlerMap[this._get_id()]?=?value;
????}
}
_handleMessage 回調方法會收集 Timer 并執行,具體代碼實現如下:
/src/third_party/dart/runtime/lib/timer_impl.dart
static?void?_handleMessage(msg)?{??var?pendingTimers;
??if?(msg?==?_ZERO_EVENT)?{
????//?找到所有的待處理?Timers
????pendingTimers?=?_queueFromZeroEvent();
????assert(pendingTimers.length?>?0);
??}?else?{
????......
????//?延時消息這里不分析
??}
//?處理Timer,即調用設置的?callback
??_runTimers(pendingTimers);
......
}
2. ?向 Event Queue 發送消息
前面說到 RawReceiverPort 會向引擎層 Event queue 發送一個 _ZERO_EVENT ?,其內部是通過調用 PortMap 的 PostMessage 方法將消息發送到 Event queue,該方法首先會根據接收方的 port id 找到對應的 message_handler,然后將消息根據優先級保存到相應的 queue 中,最后喚醒 message_notify_callback 回調函數 ,具體代碼如下:
/src/third_party/dart/runtime/vm/port.cc
bool?PortMap::PostMessage(Message*?message,?bool?before_events)?{??......
??intptr_t?index?=?FindPort(message->dest_port());
??......
??MessageHandler*?handler?=?map_[index].handler;
?......
??handler->PostMessage(message,?before_events);
??return?true;
}
/src/third_party/dart/runtime/vm/message_handler.cc
void?MessageHandler::PostMessage(Message*?message,?bool?before_events)?{??Message::Priority?saved_priority;
??bool?task_running?=?true;
?......
??//?根據消息優先級進入不同的隊列
????if?(message->IsOOB())?{
??????oob_queue_->Enqueue(message,?before_events);
????}?else?{
??????queue_->Enqueue(message,?before_events);
????}
???......
//喚醒并處理消息
??MessageNotify(saved_priority);
}
/src/third_party/dart/runtime/vm/isolate.cc
void?IsolateMessageHandler::MessageNotify(Message::Priority?priority)?{??if?(priority?>=?Message::kOOBPriority)?{
????I->ScheduleInterrupts(Thread::kMessageInterrupt);
??}
//?最后調用的?message_notify_callback?所指向的函數
??Dart_MessageNotifyCallback?callback?=?I->message_notify_callback();
??if?(callback)?{
????(*callback)(Api::CastIsolate(I));
??}
}
3. ? Event Queue 消息處理
前面消息已經發送成功并調用了消息處理喚醒的操作,下面我們需要知道 message_notify_callback 所指向的函數的實現, root isolate 在初始化時會設置該變量,具體代碼如下:
/src/flutter/runtime/dart_isolate.cc
bool?DartIsolate::Initialize(Dart_Isolate?dart_isolate,?bool?is_root_isolate)?{??......
??//?設置?message?handler?的?task?runner?為?UI?Task?Runner
??SetMessageHandlingTaskRunner(GetTaskRunners().GetUITaskRunner(),
???????????????????????????????is_root_isolate);
??......
??return?true;
}
void?DartIsolate::SetMessageHandlingTaskRunner(
????fml::RefPtr<:taskrunner>?runner,bool?is_root_isolate)?{
?......
??message_handler().Initialize(
??????[runner](std::function<void()>?task)?{?runner->PostTask(task);?});
}
進一步跟進分析發現通過 Dart_SetMessageNotifyCallback 將 ?root isolate 的 message_notify_callback 設置為 MessageNotifyCallback 方法,具體代碼如下:
/src/third_party/tonic/dart_message_handler.cc
void?DartMessageHandler::Initialize(TaskDispatcher?dispatcher)?{??TONIC_CHECK(!task_dispatcher_?&&?dispatcher);
??task_dispatcher_?=?dispatcher;
??Dart_SetMessageNotifyCallback(MessageNotifyCallback);
}
MessageNotifyCallback 會在 Event queue 收到消息后執行,但其執行過程中并沒有拿到 Event queue 中的消息,而是往 UI Thread 的 MessageLoop Post 了一個 Task 閉包,這個 Task 閉包會通過調用 Dart_HandleMessage 來處理 Event queue 中的消息,具體代碼流程如下:
/src/third_party/tonic/dart_message_handler.cc
void?DartMessageHandler::MessageNotifyCallback(Dart_Isolate?dest_isolate)?{??auto?dart_state?=?DartState::From(dest_isolate);
??TONIC_CHECK(dart_state);
??dart_state->message_handler().OnMessage(dart_state);
}
void?DartMessageHandler::OnMessage(DartState*?dart_state)?{
??auto?task_dispatcher_?=?dart_state->message_handler().task_dispatcher_;
??//?往?ui?線程?MessageLoop?Post?了一個?Task
??auto?weak_dart_state?=?dart_state->GetWeakPtr();
??task_dispatcher_([weak_dart_state]()?{
????if?(auto?dart_state?=?weak_dart_state.lock())?{
??????dart_state->message_handler().OnHandleMessage(dart_state.get());
????}
??});
}
void?DartMessageHandler::OnHandleMessage(DartState*?dart_state)?{
??......
??if?(Dart_IsPausedOnStart())?{
????......
??}?else?if?(Dart_IsPausedOnExit())?{
???......
??}?else?{
?????//?調用?Dart_HandleMessage?方法處理消息
????result?=?Dart_HandleMessage();
???......
??}
?......
}
Dart_HandleMessage 的實現很簡單,只是調用 message_handler 的 HandleNextMessage 方法,具體代碼實現如下:
/src/third_party/dart/runtime/vm/dart_api_impl.cc
DART_EXPORT?Dart_Handle?Dart_HandleMessage()?{......
??if?(I->message_handler()->HandleNextMessage()?!=?MessageHandler::kOK)?{
????return?Api::NewHandle(T,?T->StealStickyError());
??}
??return?Api::Success();
}
我們進一步跟進 ?HandleNextMessage 方法的實現,最終來到如下代碼:
/src/third_party/dart/runtime/vm/message_handler.cc
//?依次遍歷?message_handler?的消息隊列,對每個消息進程處理MessageHandler::MessageStatus?MessageHandler::HandleMessages(
????MonitorLocker*?ml,
????bool?allow_normal_messages,
????bool?allow_multiple_normal_messages)?{
......
??Message*?message?=?DequeueMessage(min_priority);
??while?(message?!=?NULL)?{
????......
????MessageStatus?status?=?HandleMessage(message);
???......
????message?=?DequeueMessage(min_priority);
??}
??return?max_status;
}
//?取消息的時候會優先處理?OOB?Message
Message*?MessageHandler::DequeueMessage(Message::Priority?min_priority)?{
??Message*?message?=?oob_queue_->Dequeue();
??if?((message?==?NULL)?&&?(min_priority?????message?=?queue_->Dequeue();
??}
??return?message;
}
每個消息的處理都是在 HandleMessage 方法中,該方法會根據不同的消息優先級做相應的處理,具體代碼如下:
/src/third_party/dart/runtime/vm/isolate.cc
MessageHandler::MessageStatus?IsolateMessageHandler::HandleMessage(????Message*?message)?{
......
??Object&?msg_handler?=?Object::Handle(zone);
//?非?OOB?消息,需要獲取?dart?層的??handler?函數
??if?(!message->IsOOB()?&&?(message->dest_port()?!=?Message::kIllegalPort))?{
????msg_handler?=?DartLibraryCalls::LookupHandler(message->dest_port());
????......
??}
......
??MessageStatus?status?=?kOK;
??if?(message->IsOOB())?{
???//?處理?OOB?消息,詳細實現可自己看代碼,這里不分析OOB消息
???......
??}?else?if?(message->dest_port()?==?Message::kIllegalPort)?{
????......
??}?else?{
????......
????//?調用前面找到的?msg_handler?來處理普通消息
????const?Object&?result?=
????????Object::Handle(zone,?DartLibraryCalls::HandleMessage(msg_handler,?msg));
???......
??}
??delete?message;
??return?status;
}
這里我們主要看普通消息的處理邏輯,首先會通過調用 DartLibraryCalls::LookupHandler 方法來從 dart 層尋找相應的 handler 函數,然后通過 DartLibraryCalls::HandleMessage 執行相應的處理函數,具體實現代碼如下:
/src/third_party/dart/runtime/vm/dart_entry.cc
RawObject*?DartLibraryCalls::LookupHandler(Dart_Port?port_id)?{??Thread*?thread?=?Thread::Current();
??Zone*?zone?=?thread->zone();
??Function&?function?=?Function::Handle(
??????zone,?thread->isolate()->object_store()->lookup_port_handler());
??const?int?kTypeArgsLen?=?0;
??const?int?kNumArguments?=?1;
//?如果沒有消息處理方法,則進行查找,最終找到的是 RawReceivePortImpl 的?_lookupHandler 方法。
??if?(function.IsNull())?{
????Library&?isolate_lib?=?Library::Handle(zone,?Library::IsolateLibrary());
????ASSERT(!isolate_lib.IsNull());
????const?String&?class_name?=?String::Handle(
????????zone,?isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));
????const?String&?function_name?=?String::Handle(
????????zone,?isolate_lib.PrivateName(Symbols::_lookupHandler()));
????function?=?Resolver::ResolveStatic(isolate_lib,?class_name,?function_name,
???????????????????????????????????????kTypeArgsLen,?kNumArguments,
???????????????????????????????????????Object::empty_array());
????ASSERT(!function.IsNull());
????thread->isolate()->object_store()->set_lookup_port_handler(function);
??}
//?執行消息處理函數
??const?Array&?args?=?Array::Handle(zone,?Array::New(kNumArguments));
??args.SetAt(0,?Integer::Handle(zone,?Integer::New(port_id)));
??const?Object&?result?=
??????Object::Handle(zone,?DartEntry::InvokeFunction(function,?args));
??return?result.raw();
}
最終執行的是 RawReceivePortImpl 的 _lookupHandler 方法,在前面在創建 Future 的時候我們已經設置 _handleMessage 到 _handlerMap 中,_lookupHandler 方法會從 _handlerMap 中找到設置的回調方法,最后執行回調方法。具體代碼如下:
/src/third_party/dart/runtime/lib/isolate_patch.dart
@pragma("vm:entry-point")class?_RawReceivePortImpl?implements?RawReceivePort?{
?......
??//?Called?from?the?VM?to?retrieve?the?handler?for?a?message.
??@pragma("vm:entry-point",?"call")
??static?_lookupHandler(int?id)?{
????var?result?=?_handlerMap[id];
????return?result;
??}
......
}
Future 的創建到這就分析完了,整個過程涉及到了 EventQueue 的消息收發。
上面主要分析了非延遲消息的處理,如果是延遲的 Timer 會怎么處理呢?這里簡單提一下,在 dart vm 內部還有個 EventHandler 線程,如果是延遲消息則會通過管道向這個線程寫入延遲數據,這個線程會負責延遲計數,到時間了就往引擎層 Post 喚醒消息,具體代碼可參見/src/third_party/dart/runtime/bin/eventhandler.cc,這里就不再贅述,感興趣的可以自行分析。
至此,通過分析 Future 我們已經把 Event Queue 的消息處理流程了解了。
四、Microtask
1. ?向 ?Microtask queue 發送消息
假設 Zone. current 為 rootZone, 直接看 scheduleMicrotask 方法的實現:
/src/third_party/dart/sdk/lib/async/schedule_microtask.dart
void?scheduleMicrotask(void?callback())?{??_Zone?currentZone?=?Zone.current;
??if?(identical(_rootZone,?currentZone))?{
????_rootScheduleMicrotask(null,?null,?_rootZone,?callback);
????return;
??}
......
}
跟進 _rootScheduleMicrotask 方法的實現,最終來到 _scheduleAsyncCallback 方法,該方法做了兩件事情:
將傳入的 callback 加入 callback 隊列
將 _startMicrotaskLoop 作為閉包參數調用 ?_AsyncRun._scheduleImmediate 方法,_startMicrotaskLoop 中會依次執行 callback 隊列保存的回調。
具體代碼如下:
/src/third_party/dart/sdk/lib/async/schedule_microtask.dart
void?_scheduleAsyncCallback(_AsyncCallback?callback)?{??_AsyncCallbackEntry?newEntry?=?new?_AsyncCallbackEntry(callback);
??if?(_nextCallback?==?null)?{
????_nextCallback?=?_lastCallback?=?newEntry;
????if?(!_isInCallbackLoop)?{
??????_AsyncRun._scheduleImmediate(_startMicrotaskLoop);
????}
??}?else?{
????_lastCallback.next?=?newEntry;
????_lastCallback?=?newEntry;
??}
}
//?該方法被作為回調設置到引擎,會在處理所有的?Microtask?的時候執行?
void?_startMicrotaskLoop()?{
??_isInCallbackLoop?=?true;
??try?{
????_microtaskLoop();
??}?finally?{
????_lastPriorityCallback?=?null;
????_isInCallbackLoop?=?false;
????if?(_nextCallback?!=?null)?{
??????_AsyncRun._scheduleImmediate(_startMicrotaskLoop);
????}
??}
}
class?_AsyncRun?{
??external?static?void?_scheduleImmediate(void?callback());
}
根據前面的經驗,會在對應的 patch 文件中找到 _AsyncRun._scheduleImmediate 的實現,其內部調用了 _ScheduleImmediate._closure 指向的方法。
具體代碼如下:
/src/third_party/dart/runtime/lib/schedule_microtask_patch.dart
@patchclass?_AsyncRun?{
??@patch
??static?void?_scheduleImmediate(void?callback())?{
????if?(_ScheduleImmediate._closure?==?null)?{
??????throw?new?UnsupportedError("Microtasks?are?not?supported");
????}
????_ScheduleImmediate._closure(callback);
??}
}
//?通過該方法設置?_ScheduleImmediate._closure
@pragma("vm:entry-point",?"call")
void?_setScheduleImmediateClosure(_ScheduleImmediateClosure?closure)?{
??_ScheduleImmediate._closure?=?closure;
}
那么 _ScheduleImmediate._closure 指向的是什么呢?我們需要找到 _setScheduleImmediateClosure 的調用方。root isolate 初始化時會執行一系列的 vm hook 調用,我們從中找到了 _setScheduleImmediateClosure 的調用,具體代碼如下:
/src/flutter/lib/ui/dart_runtime_hooks.cc
static?void?InitDartAsync(Dart_Handle?builtin_library,?bool?is_ui_isolate)?{??Dart_Handle?schedule_microtask;
??if?(is_ui_isolate)?{
//?這里的?builtin_library?是?Flutter?擴展的?ui?library
????schedule_microtask?=
????????GetFunction(builtin_library,?"_getScheduleMicrotaskClosure");
??}?else?{
????......
??}
??Dart_Handle?async_library?=?Dart_LookupLibrary(ToDart("dart:async"));
??Dart_Handle?set_schedule_microtask?=?ToDart("_setScheduleImmediateClosure");
??Dart_Handle?result?=?Dart_Invoke(async_library,?set_schedule_microtask,?1,
???????????????????????????????????&schedule_microtask);
??PropagateIfError(result);
}
進一步跟進,最終找到了 _ScheduleImmediate._closure 指向的方法,是一個 native 實現的函數,具體代碼如下:
/src/flutter/lib/ui/natives.dart
Function?_getScheduleMicrotaskClosure()?=>?_scheduleMicrotask;?void?_scheduleMicrotask(void?callback())?native?'ScheduleMicrotask';
跟進 _scheduleMicrotask 的 native 實現,發現其會把傳入的 _startMicrotaskLoop 方法加入到底層的Microtask queue,具體代碼如下:
/src/flutter/lib/ui/dart_runtime_hooks.cc
void?ScheduleMicrotask(Dart_NativeArguments?args)?{??Dart_Handle?closure?=?Dart_GetNativeArgument(args,?0);
??UIDartState::Current()->ScheduleMicrotask(closure);
}
/src/flutter/lib/ui/ui_dart_state.cc
void?UIDartState::ScheduleMicrotask(Dart_Handle?closure)?{??if?(tonic::LogIfError(closure)?||?!Dart_IsClosure(closure))?{
????return;
??}
??microtask_queue_.ScheduleMicrotask(closure);
}
2. ?Microtask queue 消息處理
前面已經將 _startMicrotaskLoop 方法加入到了 Microtask queue ,那么 Microtask queue 內的方法何時執行呢?我們通過跟進 Microtask queue ?的 RunMicrotasks 方法的調用方,最終找到 Microtask queue 內方法的執行時機 FlushMicrotasksNow,具體代碼如下:
/src/flutter/lib/ui/ui_dart_state.cc
void?UIDartState::FlushMicrotasksNow()?{??microtask_queue_.RunMicrotasks();
}
再跟進 FlushMicrotasksNow 方法的調用方,發現有兩處調用:
這里是在每一幀開始的時候去執行 Microtask
/src/flutter/lib/ui/window/window.cc
void?Window::BeginFrame(fml::TimePoint?frameTime)?{......
??UIDartState::Current()->FlushMicrotasksNow();
......
}
另外一處調用是通過 TaskObserve 的形式,具體代碼如下:
/src/flutter/lib/ui/ui_dart_state.cc
void?UIDartState::AddOrRemoveTaskObserver(bool?add)?{......
??if?(add)?{
//?這個 add_callback_?是啥呢?
????add_callback_(reinterpret_cast<intptr_t>(this),
??????????????????[this]()?{?this->FlushMicrotasksNow();?});
??}?else?{
????remove_callback_(reinterpret_cast<intptr_t>(this));
??}
}
跟進 add_callback_ 的賦值,這里是android的實現
/src/flutter/shell/platform/android/flutter_main.cc
void?FlutterMain::Init(JNIEnv*?env,???????????????????????jclass?clazz,
???????????????????????jobject?context,
???????????????????????jobjectArray?jargs,
???????????????????????jstring?bundlePath,
???????????????????????jstring?appStoragePath,
???????????????????????jstring?engineCachesPath)?{
?......
??settings.task_observer_add?=?[](intptr_t?key,?fml::closure?callback)?{
????fml::MessageLoop::GetCurrent().AddTaskObserver(key,?std::move(callback));
??};
......
}
FlushMicrotasksNow() 是作為 MessageLoop 的 TaskObserver 來執行的, TaskObserver 會在處理完task之后把該 Task 創建的 MicroTask 全部執行,也就是說在下一個 Task 運行前執行。代碼如下:
/src/flutter/fml/message_loop_impl.cc
void?MessageLoopImpl::FlushTasks(FlushType?type)?{......
??for?(const?auto&?invocation?:?invocations)?{
????invocation();
????for?(const?auto&?observer?:?task_observers_)?{
??????observer.second();
????}
??}
}
五、結束
通過前面的分析,Flutter 的異步消息處理流程還是挺復雜的,主要是代碼寫的比較亂,跳轉層次太多,可以通過對整個流程的掌控來尋找UI 線程的優化及監控點,進而降低 UI 線程的處理時間,希望本篇文章讓大家對 Flutter 的異步消息的整體處理流程有更深的理解。
推薦閱讀
你們吹捧的鴻蒙,只是另一個Fuchsia
Android仿微信QQ圖片裁剪
互聯網 HR 黑話大全,太真實了!
編程·思維·職場
歡迎掃碼關注
總結
以上是生活随笔為你收集整理的pthread异步_探索 Flutter 异步消息的实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: redmine备份_Redmine 数据
- 下一篇: cvs update 用法_WinCVS