Node.js 代码阅读笔记系列(0)Timer 的实现
setImmediate
先來看看當我們使用 setImmediate 的時候經歷了那些過程
我們先這樣用
setImmediate(fn, arg)復制代碼可以看到 setImmediate 接收到了 callback, arg1等幾個參數
exports.setImmediate = function(callback, arg1, arg2, arg3) {if (typeof callback !== 'function') {throw new TypeError('"callback" argument must be a function');}var i, args;// 判斷傳入參數數量switch (arguments.length) {// 如果只有 callback 不帶其他參數的話,立即退出這里的switch// fast casescase 1:break;case 2:// 只有一個參數的話,設置 `args` 為有包含一個參數的數組args = [arg1];break;case 3:args = [arg1, arg2];break;default:// 參數長度超過 4 的話,遍歷之后的參數填入 `args`args = [arg1, arg2, arg3];for (i = 4; i < arguments.length; i++)// 這里也有提到在 Node 6.0.0 之后使用 `apply` 會比目前這種動態擴展數組快很多// extend array dynamically, makes .apply run much faster in v6.0.0args[i - 1] = arguments[i];break;}// 前面主要工作是參數的判斷和包裝,在這里開始創建 `Immediate`return createImmediate(args, callback); };復制代碼前面主要工作是參數的判斷和包裝,在這里開始創建 Immediate
createImmediate
function createImmediate(args, callback) {// 這里注釋提到,在使用 `const immediate` 在 6.0.0 中不能被優化// 創建 `Immediate` 節點,并給節點賦參數, 值得注意的是 `_callback` 和 `_onImmediate` 同樣都是賦 `callback`var immediate = new Immediate();immediate._callback = callback;immediate._argv = args;immediate._onImmediate = callback;// 設置 `process._needImmediateCallback` 標記,并給 `processImmediate ` 賦值到 `process._immediateCallback` ,用于原生模塊調用if (!process._needImmediateCallback) {process._needImmediateCallback = true;process._immediateCallback = processImmediate;}// `immediateQueue` 隊列鏈表中加入 immediate 節點immediateQueue.append(immediate);return immediate; }復制代碼這里的 createImmediate 根據接收的參數創建 immediate ,并把它加入到 immediateQueue 的隊列,在線程中設置需要執行Immediate回調的標記。
Immediate 隊列節點
這里用到的 Immediate 任務隊列節點的構造函數。這里 ImmediateQueue 采用的的是一個無序鏈表。
function Immediate() {// 直接注冊 callback 會導致優化不穩定(node v6.0.0, v8 5.0.71.35 老鐵不穩啊)// 所以先就聲明,有個疑問,這里是 hidden class 的問題嗎?this._idleNext = null;this._idlePrev = null;this._callback = null;this._argv = null;this._onImmediate = null;// 設置為當前線程的域this.domain = process.domain; }復制代碼processImmediate
function processImmediate() {// 取隊列的頭尾,申明 `domain` 也就是域var immediate = immediateQueue.head;var tail = immediateQueue.tail;var domain;// 清空隊列頭尾immediateQueue.head = immediateQueue.tail = null;while (immediate) {// immediate 任務的域domain = immediate.domain;// 如果沒有回調就下一個if (!immediate._onImmediate) {immediate = immediate._idleNext;continue;}if (domain)domain.enter();// 不是很明白這里,之前不是給它倆都賦值了 `callback` 么 ?immediate._callback = immediate._onImmediate;// 先暫存一個下一個節點,避免 `clearImmediate(immediate)` 被調用時被清理。 var next = immediate._idleNext;tryOnImmediate(immediate, tail);if (domain)domain.exit();// 如果有調用 `clearImmediate(immediate)` 的話就使用之前暫存的next,沒有的話,那就調用 `immediate._idleNext`if (immediate._idleNext)immediate = immediate._idleNext;elseimmediate = next;}// 判斷 immediate 隊列為空的話設置 `_needImmediateCallback ` 標志為false// 需要提到的是這里的邏輯 C++ 模塊中有實現if (!immediateQueue.head) {process._needImmediateCallback = false;} }復制代碼上面實現了 processImmediate 主要的作用是遍歷 immediateQueue 中的節點,并調用 tryOnImmediate 嘗試執行任務。
可以看到它被設置在 process 的 _immediateCallback 。那么有一個疑問,他是在什么時候被調用執行的?
可以看到這里在env全局環境變量上設置 _immediateCallback 的的代理符號
// src/env.hV(immediate_callback_string, "_immediateCallback") static inline Environment* from_immediate_check_handle(uv_check_t* handle);static inline Environment* from_destroy_ids_idle_handle(uv_idle_t* handle);inline uv_check_t* immediate_check_handle();inline uv_idle_t* immediate_idle_handle();inline uv_idle_t* destroy_ids_idle_handle();復制代碼// src/node.ccstatic void CheckImmediate(uv_check_t* handle) {Environment* env = Environment::from_immediate_check_handle(handle);HandleScope scope(env->isolate());Context::Scope context_scope(env->context());MakeCallback(env, env->process_object(), env->immediate_callback_string()); }復制代碼看到這里 CheckImmediate 感覺已經快接近答案了。
tryOnImmediate
我們繼續回到 JS
function tryOnImmediate(immediate, oldTail) {var threw = true;try {// 這里是因為之前的 v8 會放棄優化帶有`try/finally`的function,所以這里把執行函數再外置到一個小函數,small function 會得到v8優化runCallback(immediate);threw = false;} finally {// 如果執行成功并且有下一個節點if (threw && immediate._idleNext) {// 處理正常的話,繼續下一個const curHead = immediateQueue.head;const next = immediate._idleNext;if (curHead) {curHead._idlePrev = oldTail;oldTail._idleNext = curHead;next._idlePrev = null;immediateQueue.head = next;} else {immediateQueue.head = next;immediateQueue.tail = oldTail;}// 下一個事件循環中繼續處理 Immediate 任務隊列process.nextTick(processImmediate);}} }復制代碼前面提到為了獲得v8優化的 tryOnImmediate 在 try/finally 中將執行節點的callback放在了 runCallback 這個 small function 中。
runCallback
function runCallback(timer) {const argv = timer._argv;const argc = argv ? argv.length : 0;switch (argc) {// 這里可以回頭看看上面開始的創建時的參數處理case 0:return timer._callback();case 1:return timer._callback(argv[0]);case 2:return timer._callback(argv[0], argv[1]);case 3:return timer._callback(argv[0], argv[1], argv[2]);// more than 3 arguments run slower with .applydefault:return timer._callback.apply(timer, argv);} }復制代碼好像終于把 setImmediate 的創建處理部分 ?看完了
setTimeout
這里的參數處理和之前 setImmediate 參數處理很像
exports.setTimeout = function(callback, after, arg1, arg2, arg3) {if (typeof callback !== 'function') {throw new TypeError('"callback" argument must be a function');}var len = arguments.length;var args;if (len === 3) {args = [arg1];} else if (len === 4) {args = [arg1, arg2];} else if (len > 4) {args = [arg1, arg2, arg3];for (var i = 5; i < len; i++)args[i - 2] = arguments[i];}return createSingleTimeout(callback, after, args); };復制代碼createSingleTimeout
這里開始有點不一樣了,繼續看代碼
function createSingleTimeout(callback, after, args) {// 嘗試轉換為 Number 或者 NaNafter *= 1;// 如果 after 小于 1 或者 after > TIMEOUT_MAX// after = 1if (!(after >= 1 && after <= TIMEOUT_MAX))after = 1;// 根據參數創建新的 Timeout 隊列節點var timer = new Timeout(after, callback, args);if (process.domain)timer.domain = process.domain;// 加入到Timeout 隊列active(timer);return timer; }復制代碼const TIMEOUT_MAX = 2147483647; // 2^31-1
補充一下, TIMEOUT_MAX 的值為 2^31-1,也就是我們最多可以通過 setTimeout 延遲執行大約 2147483647 ms,也就是 24 天左右。
Timeout 節點的構造函數
function Timeout(after, callback, args) {this._called = false;this._idleTimeout = after;this._idlePrev = this;this._idleNext = this;this._idleStart = null;this._onTimeout = callback;this._timerArgs = args;// 這里會和setInterval聯系起來this._repeat = null; }復制代碼將 timeout 計時器插入計時器列表
這里的叫做 時間輪算法,這里給相同 ms 級的 timeout 任務共用了一個 timeWrap,相同時間的任務分配在同一個鏈表,使計時任務的調度和新增的復雜度都是 O(1), 也達到高效復用了同一個 timeWrap。
const active = exports.active = function(item) {insert(item, false); };// 計時器的調度或者重新調度的底層邏輯 // 將會添加計時器到已存在的計時器列表的末尾,或者創建新的列表function insert(item, unrefed) {const msecs = item._idleTimeout;if (msecs < 0 || msecs === undefined) return;// TimerWrap 是原生模塊 timer_wrapitem._idleStart = TimerWrap.now();const lists = unrefed === true ? unrefedLists : refedLists;// 創建或者使用已存在的隊列var list = lists[msecs];if (!list) {debug('no %d list was found in insert, creating a new one', msecs);lists[msecs] = list = createTimersList(msecs, unrefed);}L.append(list, item);assert(!L.isEmpty(list)); // list is not empty }復制代碼創建 timeout 計時器列表
function createTimersList (msecs, unrefed) {// 創建一個新的鏈表并創建一個 TimerWrap 實例來對鏈表進行調度const list = new TimersList(msecs, unrefed);L.init(list);list._timer._list = list;if (unrefed === true) list._timer.unref();list._timer.start(msecs);list._timer[kOnTimeout] = listOnTimeout;return list; }復制代碼TimersList
這里的鏈表節點和之前的 Immediate 不同的地方是 this._timer = new TimerWrap(), 這里創建了一個新的 TimerWrap 實例。
function TimersList (msecs, unrefed) {this._idleNext = null; // Create the list with the linkedlist properties tothis._idlePrev = null; // prevent any unnecessary hidden class changes.this._timer = new TimerWrap();this._unrefed = unrefed;this.msecs = msecs;this.nextTick = false; }復制代碼TimerWrap
TimerWrap 是 Nodejs中的一個類,實現在 /src/timer_wrap.cc, 是一個 uv_timer_t 的封裝,是連接 JavaScript 和 libuv 的一個 brige。
我們先通過這個例子來看看 TimerWrap 能實現什么功能。
const TimerWrap = process.binding('timer_wrap').Timer const kOnTimeout = TimerWrap.kOnTimeout | 0let timer = new TimerWrap(); timer.start(2333);console.log('started');timer[kOnTimeout] = function () {console.log('2333!'); };輸出: started2333 // 2.333s之后復制代碼在 libuv 的 uv_timer_t 實現中使用的是 最小堆 的數據結構,節點的最小判斷依據就是它的 timeout, 如果是相同 timeout 的話,則判斷兩個節點的 start_id, start_id 是一個遞增的節點計數,這樣也就保證了調用時序。
// deps/uv/src/unix/timer.cstatic int timer_less_than(const struct heap_node* ha,const struct heap_node* hb) {const uv_timer_t* a;const uv_timer_t* b;a = container_of(ha, uv_timer_t, heap_node);b = container_of(hb, uv_timer_t, heap_node);if (a->timeout < b->timeout)return 1;if (b->timeout < a->timeout)return 0;/* Compare start_id when both have the same timeout. start_id is* allocated with loop->timer_counter in uv_timer_start().*/if (a->start_id < b->start_id)return 1;if (b->start_id < a->start_id)return 0;return 0; }復制代碼TimerWrap 源碼
TimerWrap 作為一個連接 libuv 的 birge,所以我們容易看到在 Start 方法中調用了uv_timer_start,傳遞了自己的指針,第二個參數為回調,第三個參數便是 timeout。
我們繼續看看 OnTimeout, 它的主要工作就是調用 key 為 kOnTimeout 的回調,也就觸發了我們 JavaScript 層的回調函數了。
// src/timer_wrap.cc class TimerWrap : public HandleWrap { ... private:static void Start(const FunctionCallbackInfo<Value>& args) {TimerWrap* wrap = Unwrap<TimerWrap>(args.Holder());CHECK(HandleWrap::IsAlive(wrap));int64_t timeout = args[0]->IntegerValue();int err = uv_timer_start(&wrap->handle_, OnTimeout, timeout, 0);args.GetReturnValue().Set(err);}static void OnTimeout(uv_timer_t* handle) {TimerWrap* wrap = static_cast<TimerWrap*>(handle->data);Environment* env = wrap->env();HandleScope handle_scope(env->isolate());Context::Scope context_scope(env->context());wrap->MakeCallback(kOnTimeout, 0, nullptr);}復制代碼我們先回到 createTimersList, 剛才簡單介紹的 TimerWrap ,現在,我們就能繼續愉快往下看了。
function createTimersList (msecs, unrefed) {// 創建一個新的鏈表并創建一個 TimerWrap 實例來對鏈表進行調度const list = new TimersList(msecs, unrefed);L.init(list);list._timer._list = list;if (unrefed === true) list._timer.unref();// 這里設置延時list._timer.start(msecs);// 這里設置延時的回調函數, 下一步,繼續看? listOnTimeoutlist._timer[kOnTimeout] = listOnTimeout;return list; }復制代碼listOnTimeout
這里的套路到是和 processImmediate 類似
function listOnTimeout() {var list = this._list;var msecs = list.msecs;// 如果 list.nextTick 為 true, 在下一個事件循環調用 listOnTimeoutNT 立即執行if (list.nextTick) {list.nextTick = false;process.nextTick(listOnTimeoutNT, list);return;}debug('timeout callback %d', msecs);// 獲取當前運行時間var now = TimerWrap.now();debug('now: %d', now);var diff, timer;while (timer = L.peek(list)) {diff = now - timer._idleStart;// 判斷這里的循環是否被過早調用if (diff < msecs) {var timeRemaining = msecs - (TimerWrap.now() - timer._idleStart);if (timeRemaining < 0) {timeRemaining = 0;}this.start(timeRemaining);debug('%d list wait because diff is %d', msecs, diff);return;}// 開始進入 timeout 邏輯 // 從鏈表中刪除當前計時器節點 L.remove(timer);// 檢測是否從鏈表中移除assert(timer !== L.peek(list));// 沒有回調函數的情況,跳到下一次循環if (!timer._onTimeout) continue;var domain = timer.domain;if (domain) {// 如果計數器回調拋出錯誤, domain 和 uncaughtException 都忽略異常,其他計時器正常執行// https://github.com/nodejs/node-v0.x-archive/issues/2631if (domain._disposed)continue;domain.enter();}tryOnTimeout(timer, list);if (domain)domain.exit();}// 計時器已經全部被調用,鏈表也已經清空,調用 TimerWrap 的 close 進行清理處理debug('%d list empty', msecs);assert(L.isEmpty(list));this.close();// Either refedLists[msecs] or unrefedLists[msecs] may have been removed and// recreated since the reference to `list` was created. Make sure they're// the same instance of the list before destroying.// 清理if (list._unrefed === true && list === unrefedLists[msecs]) {delete unrefedLists[msecs];} else if (list === refedLists[msecs]) {delete refedLists[msecs];} }復制代碼tryOnTimeout
tryOnTimeout 和之前的 tryOnImmediate的處理方式大體還是一樣
// 這里和 tryOnImmediate一樣 也考慮到 v8 的優化,所以使用 small function 來執行 timerfunction tryOnTimeout(timer, list) {timer._called = true;var threw = true;try {ontimeout(timer);threw = false;} finally {// 如果沒拋出錯誤,直接結束if (!threw) return;// 拋出錯誤未正常執行情況下// 為了保證執行順序,推遲列表中所有事件到下一周期。const lists = list._unrefed === true ? unrefedLists : refedLists;for (var key in lists) {if (key > list.msecs) {lists[key].nextTick = true;}}// We need to continue processing after domain error handling// is complete, but not by using whatever domain was left over// when the timeout threw its exception.const domain = process.domain;process.domain = null;// 如果拋出錯誤,在 nextTick 中執行接下來的計數器回調process.nextTick(listOnTimeoutNT, list);process.domain = domain;} }復制代碼ontimeout
function ontimeout(timer) {var args = timer._timerArgs;var callback = timer._onTimeout;if (!args)callback.call(timer);else {switch (args.length) {case 1:callback.call(timer, args[0]);break;case 2:callback.call(timer, args[0], args[1]);break;case 3:callback.call(timer, args[0], args[1], args[2]);break;default:callback.apply(timer, args);}}// 這里就是 setInterval 的實現了,之后再細看if (timer._repeat)rearm(timer); }復制代碼setInterval
這里的實現和 setTimeout , setImmediate 幾乎一樣。
exports.setInterval = function(callback, repeat, arg1, arg2, arg3) {if (typeof callback !== 'function') {throw new TypeError('"callback" argument must be a function');}var len = arguments.length;var args;if (len === 3) {args = [arg1];} else if (len === 4) {args = [arg1, arg2];} else if (len > 4) {args = [arg1, arg2, arg3];for (var i = 5; i < len; i++)// extend array dynamically, makes .apply run much faster in v6.0.0args[i - 2] = arguments[i];}return createRepeatTimeout(callback, repeat, args); };復制代碼interval === repeat timeout ?
setInterval 的實現和 setTimeout 不同在于 timer._repeat = repeat
function createRepeatTimeout(callback, repeat, args) {repeat *= 1; // coalesce to number or NaNif (!(repeat >= 1 && repeat <= TIMEOUT_MAX))repeat = 1; // schedule on next tick, follows browser behaviourvar timer = new Timeout(repeat, callback, args);timer._repeat = repeat;if (process.domain)timer.domain = process.domain;active(timer);return timer; }復制代碼clear
之前看了創建 3 種時間調度的方法,在看看清理的 timer 的代碼。
clearImmediate
exports.clearImmediate = function(immediate) {if (!immediate) return;immediate._onImmediate = null;immediateQueue.remove(immediate);if (!immediateQueue.head) {process._needImmediateCallback = false;} };復制代碼clearTimeout
const clearTimeout = exports.clearTimeout = function(timer) {if (timer && (timer[kOnTimeout] || timer._onTimeout)) {timer[kOnTimeout] = timer._onTimeout = null;if (timer instanceof Timeout) {timer.close(); // for after === 0} else {unenroll(timer);}} };復制代碼Timeout.unref
這里的 timer 提供了 close,unref,ref 3 個方法,其中 ref 和 unref 通過 TimerWrap 調用底層的 uv_ref() 和 uv_unref()。
在 Nodejs 官方文檔提到
When called, the active Timeout object will not require the Node.js event loop to remain active. If there is no other activity keeping the event loop running, the process may exit before the Timeout object's callback is invoked.
主動調用 unref(),如果沒有其他活躍的對象,可能會使 Nodejs 的事件循環提前退出
Timeout.prototype.unref = function() {if (this._handle) {this._handle.unref();} else if (typeof this._onTimeout === 'function') {var now = TimerWrap.now();if (!this._idleStart) this._idleStart = now;var delay = this._idleStart + this._idleTimeout - now;if (delay < 0) delay = 0;// 防止在調用 `unref()`之后 再次運行回調if (this._called && !this._repeat) {unenroll(this);return;}var handle = reuse(this);this._handle = handle || new TimerWrap();this._handle.owner = this;this._handle[kOnTimeout] = unrefdHandle;this._handle.start(delay);this._handle.domain = this.domain;this._handle.unref();}return this; };復制代碼Timeout.ref
Timeout.prototype.ref = function() {if (this._handle)this._handle.ref();return this; };復制代碼Timeout.close
Timeout.prototype.close = function() {this._onTimeout = null;if (this._handle) {this._idleTimeout = -1;this._handle[kOnTimeout] = null;this._handle.close();} else {unenroll(this);}return this; };// 移除計時器,取消延時以及重置有關的計時器屬性 const unenroll = exports.unenroll = function(item) {var handle = reuse(item);if (handle) {debug('unenroll: list empty');handle.close();}// 確保之后不會被繼續插入隊列item._idleTimeout = -1; };// 為了復用 TimerWrap 的一簡單的轉換函數 // // This mostly exists to fix https://github.com/nodejs/node/issues/1264. // Handles in libuv take at least one `uv_run` to be registered as unreferenced. // Re-using an existing handle allows us to skip that, so that a second `uv_run` // will return no active handles, even when running `setTimeout(fn).unref()`.function reuse(item) {L.remove(item);var list = refedLists[item._idleTimeout];// if empty - reuse the watcherif (list && L.isEmpty(list)) {debug('reuse hit');list._timer.stop();delete refedLists[item._idleTimeout];return list._timer;}return null; }復制代碼clearInterval
exports.clearInterval = function(timer) {if (timer && timer._repeat) {timer._repeat = null;clearTimeout(timer);} };復制代碼結尾 ?
先上圖
┌───────────────────────┐ ┌─>│ timers │ │ └──────────┬────────────┘ │ ┌──────────┴────────────┐ │ │ I/O callbacks │ │ └──────────┬────────────┘ │ ┌──────────┴────────────┐ │ │ idle, prepare │ │ └──────────┬────────────┘ ┌───────────────┐ │ ┌──────────┴────────────┐ │ incoming: │ │ │ poll │<─────┤ connections, │ │ └──────────┬────────────┘ │ data, etc. │ │ ┌──────────┴────────────┐ └───────────────┘ │ │ check │ │ └──────────┬────────────┘ │ ┌──────────┴────────────┐ └──┤ close callbacks │└───────────────────────┘復制代碼setImmediate 一般在 check 階段執行,也有可能在 poll 階段執行
setTimeout setInterval 在 timer 階段執行
來一個問題:setTimeout(fn, 0) setImmediate(fn) 誰會先執行?
setTimeout(console.log, 0, 1); setImmediate(console.log, 2);// event loop 每個階段都比較空閑的話,一次 event loop 小于 1ms 時: 2 1// 超過 1ms 時也可能是1 2復制代碼如果在一個I/O循環內調用,immediate 始終會比 setTimeout 先執行。因為immediate 會在 event loop 中 poll 完成之后立即執行,setTimeout 則是到下一個 timers 階段。
var fs = require('fs')fs.readFile(__filename, () => {setTimeout(console.log, 0, 1);setImmediate(console.log, 2); })// 輸出: 2 1復制代碼再來一個
我們在 Nodejs 中這樣寫, 會怎么輸出?
var a = setTimeout(console.log, 50, 2333); a._repeat = true;復制代碼這樣呢?
var a = setTimeout(console.log, 1000, 2333); a.close()復制代碼這樣呢?
var a = setTimeout(console.log, 1000, 2333); a.unref()復制代碼參考資料:
node/lib/timers.js
node/lib/internal/linkedlist.js
node/src/timer_wrap.cc
event-loop-timers-and-nexttick
Optimizing _unrefActive
總結
以上是生活随笔為你收集整理的Node.js 代码阅读笔记系列(0)Timer 的实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: go实例之函数
- 下一篇: Windows Server 2008