結合異步迭代器實現 Node.js 流式數據復制
實現可讀流到可寫流數據復制,就是不斷的讀取->寫入這個過程,那么你首先想到的是不是下面這樣呢?代碼看似很簡單,結果卻是很糟糕的,沒有任何的數據積壓處理。如果讀取的文件很大了,造成的后果就是緩沖區數據溢出,程序會占用過多的系統內存,拖垮服務器上的其它應用,如果不明白的回顧下這篇文章 Node.js Stream 背壓 — 消費端數據積壓來不及處理會怎么樣?。
- // 糟糕的示例,沒有數據積壓處理
- readable.on('data', data => {
- writable.write(data)
- });
類似以上的需求,推薦你用 pipe() 方法以流的形式完成數據的復制。
作為學習,結合異步迭代器以一種簡單的方式實現一個類似于 pipe 一樣的方法完成數據源到目標源的數據復制。
數據寫入方法實現
_write 方法目的是控制可寫流的數據寫入,它返回一個 Promise 對象,如果可寫流的 dest.write() 方法返回 true,表示內部緩沖區未滿,繼續寫入。
當 dest.write() 方法返回 false 表示向流中寫入數據超過了它所能處理的最大能力限制,此時暫停向流中寫入數據,直到 drain 事件觸發,表示緩沖區中的數據已排空了可以繼續寫入,再將 Promise 對象變為解決。
- function _write(dest, chunk) {
- return new Promise(resolve => {
- if (dest.write(chunk)) {
- return resolve(null);
- }
- dest.once('drain', resolve);
- })
- }
結合異步迭代器實現
異步迭代器使從可讀流對象讀取數據變得更簡單,異步的讀取數據并調用我們封裝的 _write(chunk) 方法寫入數據,如果緩沖區空間已滿,這里 await _write(dest, chunk) 也會等待,當緩沖區有空間可以繼續寫入了,再次進行讀取 -> 寫入。
- function myCopy(src, dest) {
- return new Promise(async (resolve, reject) => {
- dest.on('error', reject);
- try {
- for await (const chunk of src) {
- await _write(dest, chunk);
- }
- resolve();
- } catch (err) {
- reject(err);
- }
- });
- }
使用如下所示:
- const readable = fs.createReadStream('text.txt');
- const writable = fs.createWriteStream('dest-text.txt');
- await myCopy(readable, writable);
本文轉載自微信公眾號「Nodejs技術棧」,可以通過以下二維碼關注。轉載本文請聯系Nodejs技術棧公眾號。