Node.js Stream 模塊 Pipe 方法使用與實現原理分析
通過流我們可以將一大塊數據拆分為一小部分一點一點的流動起來,而無需一次性全部讀入,在 Linux 下我們可以通過 | 符號實現,類似的在 Nodejs 的 Stream 模塊中同樣也為我們提供了 pipe() 方法來實現。
1. Stream pipe 基本示例
選擇 Koa 來實現這個簡單的 Demo,因為之前有人在 “Nodejs技術棧” 交流群問過一個問題,怎么在 Koa 中返回一個 Stream,順便在下文借此機會提下。
1.1 未使用 Stream pipe 情況
在 Nodejs 中 I/O 操作都是異步的,先用 util 模塊的 promisify 方法將 fs.readFile 的 callback 形式轉為 Promise 形式,這塊代碼看似沒問題,但是它的體驗不是很好,因為它是將數據一次性讀入內存再進行的返回,當數據文件很大的時候也是對內存的一種消耗,類似內存泄漏這種問題也很容易出現,因此不推薦它。
- const Koa = require('koa');
- const fs = require('fs');
- const app = new Koa();
- const { promisify } = require('util');
- const { resolve } = require('path');
- const readFile = promisify(fs.readFile);
- app.use(async ctx => {
- try {
- ctx.body = await readFile(resolve(__dirname, 'test.json'));
- } catch(err) { ctx.body = err };
- });
- app.listen(3000);
1.2 使用 Stream pipe 情況
下面,再看看怎么通過 Stream 的方式在 Koa 框架中響應數據
- ...
- app.use(async ctx => {
- try {
- const readable = fs.createReadStream(resolve(__dirname, 'test.json'));
- ctx.body = readable;
- } catch(err) { ctx.body = err };
- });
以上在 Koa 中直接創建一個可讀流賦值給 ctx.body 就可以了,你可能疑惑了為什么沒有 pipe 方法,因為框架給你封裝好了,不要被表象所迷惑了,看下相關源碼:
- // https://github.com/koajs/koa/blob/master/lib/application.js#L256
- function respond(ctx) {
- ...
- let body = ctx.body;
- if (body instanceof Stream) return body.pipe(res);
- ...
- }
沒有神奇之處,框架在返回的時候做了層判斷,因為 res 是一個可寫流對象,如果 body 也是一個 Stream 對象(此時的 Body 是一個可讀流),則使用 body.pipe(res) 以流的方式進行響應。
1.3 使用 Stream VS 不使用 Stream
看到一個圖片,不得不說畫的實在太萌了,來源 https://www.cnblogs.com/vajoy/p/6349817.html
2. pipe 的調用過程與實現原理
以上最后以流的方式響應數據最核心的實現就是使用 pipe 方法來實現的輸入、輸出,本節的重點也是研究 pipe 的實現,最好的打開方式通過閱讀源碼一起來看看吧。
2.1 順藤摸瓜
在應用層我們調用了 fs.createReadStream() 這個方法,順藤摸瓜找到這個方法創建的可讀流對象的 pipe 方法實現,以下僅列舉核心代碼實現,基于 Nodejs v12.x 源碼。
2.1.1 /lib/fs.js
導出一個 createReadStream 方法,在這個方法里面創建了一個 ReadStream 可讀流對象,且 ReadStream 來自 internal/fs/streams 文件,繼續向下找。
- // https://github.com/nodejs/node/blob/v12.x/lib/fs.js
- // 懶加載,主要在用到的時候用來實例化 ReadStream、WriteStream ... 等對象
- function lazyLoadStreams() {
- if (!ReadStream) {
- ({ ReadStream, WriteStream } = require('internal/fs/streams'));
- [ FileReadStream, FileWriteStream ] = [ ReadStream, WriteStream ];
- }
- }
- function createReadStream(path, options) {
- lazyLoadStreams();
- return new ReadStream(path, options); // 創建一個可讀流
- }
- module.exports = fs = {
- createReadStream, // 導出 createReadStream 方法
- ...
- }
2.1.2 /lib/internal/fs/streams.js
這個方法里定義了構造函數 ReadStream,且在原型上定義了 open、_read、_destroy 等方法,并沒有我們要找的 pipe 方法。
但是呢通過 ObjectSetPrototypeOf 方法實現了繼承,ReadStream 繼承了 Readable 在原型中定義的函數,接下來繼續查找 Readable 的實現。
- // https://github.com/nodejs/node/blob/v12.x/lib/internal/fs/streams.js
- const { Readable, Writable } = require('stream');
- function ReadStream(path, options) {
- if (!(this instanceof ReadStream))
- return new ReadStream(path, options);
- ...
- Readable.call(this, options);
- ...
- }
- ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
- ObjectSetPrototypeOf(ReadStream, Readable);
- ReadStream.prototype.open = function() { ... };
- ReadStream.prototype._read = function(n) { ... };;
- ReadStream.prototype._destroy = function(err, cb) { ... };
- ...
- module.exports = {
- ReadStream,
- WriteStream
- };
2.1.3 /lib/stream.js
在 stream.js 的實現中,有條注釋:在 Readable/Writable/Duplex/... 之前導入 Stream,原因是為了避免 cross-reference(require),為什么會這樣?
第一步 stream.js 這里將 require('internal/streams/legacy') 導出復制給了 Stream。
在之后的 _stream_readable、Writable、Duplex ... 模塊也會反過來引用 stream.js 文件,具體實現下面會看到。
Stream 導入了 internal/streams/legacy
上面 /lib/internal/fs/streams.js 文件從 stream 模塊獲取了一個 Readable 對象,就是下面的 Stream.Readable 的定義。
- // https://github.com/nodejs/node/blob/v12.x/lib/stream.js
- // Note: export Stream before Readable/Writable/Duplex/...
- // to avoid a cross-reference(require) issues
- const Stream = module.exports = require('internal/streams/legacy');
- Stream.Readable = require('_stream_readable');
- Stream.Writable = require('_stream_writable');
- Stream.Duplex = require('_stream_duplex');
- Stream.Transform = require('_stream_transform');
- Stream.PassThrough = require('_stream_passthrough');
- ...
2.1.4 /lib/internal/streams/legacy.js
上面的 Stream 等于 internal/streams/legacy,首先繼承了 Events 模塊,之后呢在原型上定義了 pipe 方法,剛開始看到這里的時候以為實現是在這里了,但后來看 _stream_readable 的實現之后,發現 _stream_readable 繼承了 Stream 之后自己又重新實現了 pipe 方法,那么疑問來了這個模塊的 pipe 方法是干嘛的?什么時候會被用?翻譯文件名 “legacy=遺留”?有點沒太理解,難道是遺留了?有清楚的大佬可以指點下,也歡迎在公眾號 “Nodejs技術棧” 后臺加我微信一塊討論下!
- // https://github.com/nodejs/node/blob/v12.x/lib/internal/streams/legacy.js
- const {
- ObjectSetPrototypeOf,
- } = primordials;
- const EE = require('events');
- function Stream(opts) {
- EE.call(this, opts);
- }
- ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
- ObjectSetPrototypeOf(Stream, EE);
- Stream.prototype.pipe = function(dest, options) {
- ...
- };
- module.exports = Stream;
2.1.5 /lib/_stream_readable.js
在 _stream_readable.js 的實現里面定義了 Readable 構造函數,且繼承于 Stream,這個 Stream 正是我們上面提到的 /lib/stream.js 文件,而在 /lib/stream.js 文件里加載了 internal/streams/legacy 文件且重寫了里面定義的 pipe 方法。
經過上面一系列的分析,終于找到可讀流的 pipe 在哪里,同時也更進一步的認識到了在創建一個可讀流時的執行調用過程,下面將重點來看這個方法的實現。
- module.exports = Readable;
- Readable.ReadableState = ReadableState;
- const EE = require('events');
- const Stream = require('stream');
- ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
- ObjectSetPrototypeOf(Readable, Stream);
- function Readable(options) {
- if (!(this instanceof Readable))
- return new Readable(options);
- ...
- Stream.call(this, options); // 繼承自 Stream 構造函數的定義
- }
- ...
2.2 _stream_readable 實現分析
2.2.1 聲明構造函數 Readable
聲明構造函數 Readable 繼承 Stream 的構造函數和原型。
Stream 是 /lib/stream.js 文件,上面分析了,這個文件繼承了 events 事件,此時也就擁有了 events 在原型中定義的屬性,例如 on、emit 等方法。
- const Stream = require('stream');
- ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
- ObjectSetPrototypeOf(Readable, Stream);
- function Readable(options) {
- if (!(this instanceof Readable))
- return new Readable(options);
- ...
- Stream.call(this, options);
- }
2.2.2 聲明 pipe 方法,訂閱 data 事件
在 Stream 的原型上聲明 pipe 方法,訂閱 data 事件,src 為可讀流對象,dest 為可寫流對象。
我們在使用 pipe 方法的時候也是監聽的 data 事件,一邊讀取數據一邊寫入數據。
看下 ondata() 方法里的幾個核心實現:
- dest.write(chunk):接收 chunk 寫入數據,如果內部的緩沖小于創建流時配置的 highWaterMark,則返回 true,否則返回 false 時應該停止向流寫入數據,直到 'drain' 事件被觸發。
- src.pause():可讀流會停止 data 事件,意味著此時暫停數據寫入了。
之所以調用 src.pause() 是為了防止讀入數據過快來不及寫入,什么時候知道來不及寫入呢,要看 dest.write(chunk) 什么時候返回 false,是根據創建流時傳的 highWaterMark 屬性,默認為 16384 (16KB),對象模式的流默認為 16。
注意:是 16KB 不是 16Kb,也是之前犯的一個錯誤,大寫的 B 和小寫的 b 在這里是有區別的。計算機中所有數據都以 0 和 1 表示,其中 0 或 1 稱作一個位(bit),用小寫的 b 表示。大寫的 B 表示字節(byte),1byte = 8bit,大寫 K 表示千,所以是千個位(Kb)和千個字節(KB),一般都是使用 KB 表示一個文件的大小。
- Readable.prototype.pipe = function(dest, options) {
- const src = this;
- src.on('data', ondata);
- function ondata(chunk) {
- const ret = dest.write(chunk);
- if (ret === false) {
- ...
- src.pause();
- }
- }
- ...
- };
2.2.3 訂閱 drain 事件,繼續流動數據
上面提到在 data 事件里,如果調用 dest.write(chunk) 返回 false,就會調用 src.pause() 停止數據流動,什么時候再次開啟呢?
如果說可以繼續寫入事件到流時會觸發 drain 事件,也是在 dest.write(chunk) 等于 false 時,如果 ondrain 不存在則注冊 drain 事件。
- Readable.prototype.pipe = function(dest, options) {
- const src = this;
- src.on('data', ondata);
- function ondata(chunk) {
- const ret = dest.write(chunk);
- if (ret === false) {
- ...
- if (!ondrain) {
- // When the dest drains, it reduces the awaitDrain counter
- // on the source. This would be more elegant with a .once()
- // handler in flow(), but adding and removing repeatedly is
- // too slow.
- ondrain = pipeOnDrain(src);
- dest.on('drain', ondrain);
- }
- src.pause();
- }
- }
- ...
- };
- // 當可寫入流 dest 耗盡時,它將會在可讀流對象 source 上減少 awaitDrain 計數器
- // 為了確保所有需要緩沖的寫入都完成,即 state.awaitDrain === 0 和 src 可讀流上的 data 事件存在,切換流到流動模式
- function pipeOnDrain(src) {
- return function pipeOnDrainFunctionResult() {
- const state = src._readableState;
- debug('pipeOnDrain', state.awaitDrain);
- if (state.awaitDrain)
- state.awaitDrain--;
- if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
- state.flowing = true;
- flow(src);
- }
- };
- }
- // stream.read() 從內部緩沖拉取并返回數據。如果沒有可讀的數據,則返回 null。在可讀流上 src 還有一個 readable 屬性,如果可以安全地調用 readable.read(),則為 true
- function flow(stream) {
- const state = stream._readableState;
- debug('flow', state.flowing);
- while (state.flowing && stream.read() !== null);
- }
2.2.4 觸發 data 事件
調用 readable 的 resume() 方法,觸發可讀流的 'data' 事件,進入流動模式。
- Readable.prototype.pipe = function(dest, options) {
- const src = this;
- // Start the flow if it hasn't been started already.
- if (!state.flowing) {
- debug('pipe resume');
- src.resume();
- }
- ...
然后實例上的 resume(Readable 原型上定義的)會在調用 resume() 方法,在該方法內部又調用了 resume_(),最終執行了 stream.read(0) 讀取了一次空數據(size 設置的為 0),將會觸發實例上的 _read() 方法,之后會在觸發 data 事件。
- function resume(stream, state) {
- ...
- process.nextTick(resume_, stream, state);
- }
- function resume_(stream, state) {
- debug('resume', state.reading);
- if (!state.reading) {
- stream.read(0);
- }
- ...
- }
2.2.5 訂閱 end 事件
end 事件:當可讀流中沒有數據可供消費時觸發,調用 onend 函數,執行 dest.end() 方法,表明已沒有數據要被寫入可寫流,進行關閉(關閉可寫流的 fd),之后再調用 stream.write() 會導致錯誤。
- Readable.prototype.pipe = function(dest, options) {
- ...
- const doEnd = (!pipeOpts || pipeOpts.end !== false) &&
- dest !== process.stdout &&
- dest !== process.stderr;
- const endFn = doEnd ? onend : unpipe;
- if (state.endEmitted)
- process.nextTick(endFn);
- else
- src.once('end', endFn);
- dest.on('unpipe', onunpipe);
- ...
- function onend() {
- debug('onend');
- dest.end();
- }
- }
2.2.6 觸發 pipe 事件
在 pipe 方法里面最后還會觸發一個 pipe 事件,傳入可讀流對象
- Readable.prototype.pipe = function(dest, options) {
- ...
- const source = this;
- dest.emit('pipe', src);
- ...
- };
在應用層使用的時候可以在可寫流上訂閱 pipe 事件,做一些判斷,具體可參考官網給的這個示例 stream_event_pipe
2.2.7 支持鏈式調用
最后返回 dest,支持類似 unix 的用法:A.pipe(B).pipe(C)
- Stream.prototype.pipe = function(dest, options) {
- return dest;
- };
3. 總結
本文總體分為兩部分:
- 第一部分相對較基礎,講解了 Nodejs Stream 的 pipe 方法在 Koa2 中是怎么去應用的。
- 第二部分仍找它的實現,以及對源碼的一個簡單分析,其實 pipe 方法核心還是要去監聽 data 事件,向可寫流寫入數據,如果內部緩沖大于創建流時配置的 highWaterMark,則要停止數據流動,直到 drain 事件觸發或者結束,當然還要監聽 end、error 等事件做一些處理。
4. Reference
- nodejs.cn/api/stream.html
- cnodejs.org/topic/56ba030271204e03637a3870
- github.com/nodejs/node/blob/master/lib/_stream_readable.js
本文轉載自微信公眾號「Nodejs技術棧」,可以通過以下二維碼關注。轉載本文請聯系Nodejs技術棧公眾號。