nodejs源码_格物致知记一次nodejs源码分析的经历
昨天分析http模塊相關的代碼時,遇到了一個晦澀的邏輯,看了想,想了看還是沒看懂。百度、谷歌了很多帖子也沒看到合適的答案。突然看到一個題目有點相識的搜索結果,點進去是Stack Overflow上的帖子,但是已經404,最后還是通過快照功能成功看到內容。這個帖子[1]和我的疑惑不相關,但是突然給了我一些靈感。沿著這個靈感去看了代碼,最后下載nodejs源碼,加了一些log,編譯了一夜(太久了,等不及編譯完成,得睡覺了)。上午起來驗證,終于揭開了疑惑。這個問題源于下面這段代碼。
function connectionListenerInternal(server, socket) { socket.server = server; // 分配一個http解析器 const parser = parsers.alloc(); // 解析請求報文 parser.initialize( HTTPParser.REQUEST, new HTTPServerAsyncResource('HTTPINCOMINGMESSAGE', socket), server.maxHeaderSize || 0, server.insecureHTTPParser === undefined ? isLenient() : server.insecureHTTPParser, ); parser.socket = socket; // 開始解析頭部的開始時間 parser.parsingHeadersStart = nowDate(); socket.parser = parser; const state = { onData: null, onEnd: null, onClose: null, onDrain: null, // 同一tcp連接上,請求和響應的的隊列 outgoing: [], incoming: [], outgoingData: 0, keepAliveTimeoutSet: false }; state.onData = socketOnData.bind(undefined, server, socket, parser, state); socket.on('data', state.onData); if (socket._handle && socket._handle.isStreamBase && !socket._handle._consumed) { parser._consumed = true; socket._handle._consumed = true; parser.consume(socket._handle); } parser[kOnExecute] = onParserExecute.bind(undefined, server, socket, parser, state); socket._paused = false;}這段代碼看起來很多,這是啟動http服務器后,有新的tcp連接建立時執行的回調。問題在于tcp上有數據到來時,是怎么處理的,上面代碼中nodejs監聽了socket的data事件,同時注冊了鉤子kOnExecute。data事件我們都知道是流上有數據到來時觸發的事件。我們看一下socketOnData做了什么事情。
function socketOnData(server, socket, parser, state, d) { // 交給http解析器處理,返回已經解析的字節數 const ret = parser.execute(d); onParserExecuteCommon(server, socket, parser, state, ret, d);}這看起來沒有問題,socket上有數據,然后交給http解析器處理。幾乎所有http模塊源碼解析的文章也是這樣分析的,我第一反應也覺得這個沒問題,那kOnExecute是做什么的呢?kOnExecute鉤子函數的值是onParserExecute,這個看起來也是解析tcp上的數據的,看起來和onSocketData是一樣的作用,難道tcp上的數據有兩個消費者?我們看一下kOnExecute什么時候被回調的。
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override { Local ret = Execute(buf.base, nread); Local cb = object()->Get(env()->context(), kOnExecute).ToLocalChecked(); MakeCallback(cb.As(), 1, &ret); }在node_http_parser.cc中的OnStreamRead中被回調,那么OnStreamRead又是什么時候被回調的呢?OnStreamRead是nodejs中c++層流操作的通用函數,當流有數據的時候就會執行該回調。而且OnStreamRead中也會把數據交給http解析器解析。這看起來真的有兩個消費者?這就很奇怪,為什么一份數據會交給http解析器處理兩次?這時候我的想法就是這兩個地方肯定是互斥的。但是我一直沒有找到是哪里做了處理。最后在connectionListenerInternal的一段代碼中找到了答案。
if (socket._handle && socket._handle.isStreamBase && !socket._handle._consumed) { parser._consumed = true; socket._handle._consumed = true; parser.consume(socket._handle); }因為tcp流是繼承StreamBase類的,所以if成立(后面會具體分析)。我們看一下consume的實現。
static void Consume(const FunctionCallbackInfo& args) { Parser* parser; ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder()); CHECK(args[0]->IsObject()); StreamBase* stream = StreamBase::FromObjject(args[0].As()); CHECK_NOT_NULL(stream); stream->PushStreamListener(parser); }http解析器把自己注冊為tcp stream的一個listener。這里涉及到了c++層對流的設計。我們從頭開始??匆幌翽ushStreamListener做了什么事情。c++層中,流的操作由類StreamResource進行了封裝。
class StreamResource { public: virtual ~StreamResource(); virtual int ReadStart() = 0; virtual int ReadStop() = 0; virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; virtual int DoTryWrite(uv_buf_t** bufs, size_t* count); virtual int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, uv_stream_t* send_handle) = 0; void PushStreamListener(StreamListener* listener); void RemoveStreamListener(StreamListener* listener); protected: uv_buf_t EmitAlloc(size_t suggested_size); void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0)); StreamListener* listener_ = nullptr; uint64_t bytes_read_ = 0; uint64_t bytes_written_ = 0; friend class StreamListener;};我們看到StreamResource是一個基類,定義了操作流的公共方法。其中有一個成員是StreamListener類的實例。我們看看StreamListener的實現。
class StreamListener { public: virtual ~StreamListener(); virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0; virtual void OnStreamRead(ssize_t nread, const uv_buf_t& buf) = 0; virtual void OnStreamDestroy() {} inline StreamResource* stream() { return stream_; } protected: void PassReadErrorToPreviousListener(ssize_t nread); StreamResource* stream_ = nullptr; StreamListener* previous_listener_ = nullptr; friend class StreamResource;};StreamListener是一個負責消費流數據的類。StreamListener 和StreamResource類的關系如下。我們看到一個流可以注冊多個listener,多個listener形成一個鏈表。接著我們看一下創建一個c++層的tcp對象是怎樣的。下面是TCPWrap的繼承關系。
class TCPWrap : public ConnectionWrap{}class ConnectionWrap : public LibuvStreamWrap{}class LibuvStreamWrap : public HandleWrap, public StreamBase{}class StreamBase : public StreamResource {}我們看到tcp流是繼承于StreamResource的。新建一個tcp的c++的對象時(tcp_wrap.cc),會不斷往上調用父類的構造函數,其中在StreamBase中有一個關鍵的操作。
inline StreamBase::StreamBase(Environment* env) : env_(env) { PushStreamListener(&default_listener_);}EmitToJSStreamListener default_listener_;StreamBase會默認給流注冊一個listener。我們看下EmitToJSStreamListener 具體的定義。
class ReportWritesToJSStreamListener : public StreamListener { public: void OnStreamAfterWrite(WriteWrap* w, int status) override; void OnStreamAfterShutdown(ShutdownWrap* w, int status) override; private: void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);};class EmitToJSStreamListener : public ReportWritesToJSStreamListener { public: uv_buf_t OnStreamAlloc(size_t suggested_size) override; void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;};EmitToJSStreamListener繼承StreamListener ,定義了分配內存和讀取接收數據的函數。接著我們看一下PushStreamListener做了什么事情。
inline void StreamResource::PushStreamListener(StreamListener* listener) { // 頭插法 listener->previous_listener_ = listener_; listener->stream_ = this; listener_ = listener;}PushStreamListener就是構造出上圖的結構。對應到創建一個c++層的tcp對象中,如下圖。然后我們看一下對于流來說,讀取數據的整個鏈路。首先是js層調用readStart
function tryReadStart(socket) { socket._handle.reading = true; const err = socket._handle.readStart(); if (err) socket.destroy(errnoException(err, 'read'));}// 注冊等待讀事件Socket.prototype._read = function(n) { tryReadStart(this);};我們看看readStart
int LibuvStreamWrap::ReadStart() { return uv_read_start(stream(), [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static_cast(handle->data)->OnUvAlloc(suggested_size, buf); }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { static_cast(stream->data)->OnUvRead(nread, buf); });}ReadStart調用libuv的uv_read_start注冊等待可讀事件,并且注冊了兩個回調函數OnUvAlloc和OnUvRead。
void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) { EmitRead(nread, *buf);}inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) { // bytes_read_表示已讀的字節數 if (nread > 0) bytes_read_ += static_cast(nread); listener_->OnStreamRead(nread, buf);}通過層層調用最后會調用listener_的OnStreamRead。我們看看tcp的OnStreamRead
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { StreamBase* stream = static_cast(stream_); Environment* env = stream->stream_env(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); AllocatedBuffer buf(env, buf_); stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());}繼續回調CallJSOnreadMethod
MaybeLocal StreamBase::CallJSOnreadMethod(ssize_t nread, Local ab, size_t offset, StreamBaseJSChecks checks) { Environment* env = env_; // ... AsyncWrap* wrap = GetAsyncWrap(); CHECK_NOT_NULL(wrap); Local onread = wrap->object()->GetInternalField(kOnReadFunctionField); CHECK(onread->IsFunction()); return wrap->MakeCallback(onread.As(), arraysize(argv), argv);}CallJSOnreadMethod會回調js層的onread回調函數。onread會把數據push到流中,然后觸發data事件。這是tcp里默認的數據讀取過程。而文章開頭講到的parser.consume打破了這個默認行為。stream->PushStreamListener(parser);修改了tcp流的listener鏈,http parser把自己作為數據的接收者。所以這時候tcp流上的數據是直接由node_http_parser.cc的OnStreamRead消費的。而不是觸發socket的data事件,最后通過在nodejs源碼中加log,重新編譯驗證的確如文中所述。最后提一個這個過程中還有一個關鍵的地方是調用consume函數的前提是socket._handle.isStreamBase為true。isStreamBase是在StreamBase::AddMethods中定義為true的,而tcp對象創建的過程中,調用了這個方法,所以tcp的isStreamBase是true,才會執行consume,才會執行kOnExecute回調。
References
[1]?帖子:?http://cache.baiducontent.com/c?m=rZy2XovtTdJJuXWLM-s8wgpaz8NFubewtolyiC19iAKFJrbGdx2EFnArzlAIDisNP70zWWsCPv-4jwMHTGNcLaUsMVr-lvLqYmmHD-w_fUYz6a5K6OQRC9kZmLYN5RXsb34OdINb8xHIJsdyClaEWOtCGKMQ2saYK7ed7OG8v0E1pRKR4K46phl0rCBrw6amXE3QpPo62dMhvu_VASYYqq&p=cb77c64ad49111a05bee9e264d5693&newp=882a9646dc9712a05ab7cc374f0ccc231615d70e3ad3d501298ffe0cc4241a1a1a3aecbf2d29170ed6c27f630bae4856ecf630723d0834f1f689df08d2ecce7e7b&s=cfcd208495d565ef&user=baidu&fm=sc&query=onParserExecute&qid=869f73bc002e44f5&p1=11
總結
以上是生活随笔為你收集整理的nodejs源码_格物致知记一次nodejs源码分析的经历的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 图像缩放算法_opencv缩放算法
- 下一篇: html页面源码_整合SpringMVC