流的pipe操作主要用于readable流向writable流传递数据,之前只是从经验角度去猜测pipe的工作原理,但遇到问题时才发现不靠谱的猜测并不能为解决问题带来帮助,所以花些时间了解一下pipe的工作原理,本文主要从源代码角度解释pipe的几个内部细节。
通过阅读源代码,发现同一个readable流支持多个pipe下游,pipe函数的第一段逻辑就是在自己的_readableState
中维护所有pipe下游的相关信息,主要包括pipes与pipesCount,当pipesCount为1时,pipes直接指向下游流对象,当pipesCount大于1时,pipes是所有pipe下游流对象数组,相关代码:
var state = this._readableState;
switch (state.pipesCount) {
case 0:
state.pipes = dest;
break;
case 1:
state.pipes = [state.pipes, dest];
break;
default:
state.pipes.push(dest);
break;
}
state.pipesCount += 1;
测试多个下游,新建test.js:
const fs = require('fs');
const Readable = require('stream').Readable;
let src = new Readable();
src.push('some data');
src.push(null);
src.pipe(fs.createWriteStream('./dest1'));
src.pipe(fs.createWriteStream('./dest2'));
执行node test.js会发现目录下多出dest1与dest2文件,内容都是’some data’,即证实一个readable流支持pipe给多个writable流。
通过pipe,数据是自发的流动不需要人为干涉,当上游数据传输完成时其end事件触发,下游的end事件是否触发由pipe方法的第二个参数决定,相关代码:
var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
dest !== process.stdout &&
dest !== process.stderr;
var endFn = doEnd ? onend : unpipe;
if (state.endEmitted)
process.nextTick(endFn);
else
src.once('end', endFn);
dest.on('unpipe', onunpipe);
function onend() {
debug('onend');
dest.end();
}
pipeOpts就是pipe方法的第二个参数,pipeOpts.end默认为true,表明自动触发下游的end事件,但通过代码发现针对两个特殊的下游对象process.stdout与process.stderr,pipe会忽略pipeOpts.end参数而选择不触发下游end事件。
pipe是把上游与下游通过管道连起来传输数据,那么unpipe就是从两者之间切断管道,具体做法是在pipe内注册下游的unpipe事件处理函数,当用户调用unpipe时触发下游的unpipe事件,从而移除所有之前在pipe中绑定到上下游的事件,相关代码:
dest.on('unpipe', onunpipe);
function onunpipe(readable, unpipeInfo) {
debug('onunpipe');
if (readable === src) {
if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
unpipeInfo.hasUnpiped = true;
cleanup();
}
}
}
function cleanup() {
debug('cleanup');
dest.removeListener('close', onclose);
dest.removeListener('finish', onfinish);
dest.removeListener('drain', ondrain);
dest.removeListener('error', onerror);
dest.removeListener('unpipe', onunpipe);
src.removeListener('end', onend);
src.removeListener('end', unpipe);
src.removeListener('data', ondata);
cleanedUp = true;
if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain))
ondrain();
}
数据传递过程比较简单,主要是监听上游的ondata事件获取数据,再将数据写入下游,这个过程是自动触发的,但上下游的数据处理速度也许有所差异,比如上游的读取速度是10M/s,而下游的处理速度是5M/s,这种情况会通过背压反馈让上游的速度相应调整到5M/s左右。
针对writable流,数据处理不过来时会先写入一个内部缓冲buffer,当缓冲buffer中的数据大小超过_writableState.highWaterMark时,write操作会返回false以提醒调用者数据处理不过来需要暂停数据写入,等到缓冲数据处理完成时,会触发drain事件通知调用者继续写入。
如果不顾write操作返回false而单方面一直写入,则缓冲区大小会一直增加,相应的应用占用内存也会激增,最终可能导致应用OOM异常退出。
针对writable流的这个特性,pipe操作会动态调整上游的读取速度以适应下游所需,当发现下游write返回false时,就暂停上游的数据读取直到触发drain事件,相关代码:
var ondrain = pipeOnDrain(src);
dest.on('drain', ondrain);
// If the user pushes more data while we're writing to dest then we'll end up
// in ondata again. However, we only want to increase awaitDrain once because
// dest will only emit one 'drain' event for the multiple writes.
// => Introduce a guard on increasing awaitDrain.
var increasedAwaitDrain = false;
src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
increasedAwaitDrain = false;
var ret = dest.write(chunk);
if (false === ret && !increasedAwaitDrain) {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
if (((state.pipesCount === 1 && state.pipes === dest) ||
(state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) &&
!cleanedUp) {
debug('false write response, pause', src._readableState.awaitDrain);
src._readableState.awaitDrain++;
increasedAwaitDrain = true;
}
src.pause();
}
}
function pipeOnDrain(src) {
return function() {
var state = src._readableState;
debug('pipeOnDrain', state.awaitDrain);
if (state.awaitDrain)
state.awaitDrain--;
if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
state.flowing = true;
flow(src);
}
};
}
function flow(stream) {
const state = stream._readableState;
debug('flow', state.flowing);
while (state.flowing && stream.read() !== null);
}
异常处理主要是处理下游的error、close、finish事件,发生上述事件时及时unpipe即可。
学习pipe的原因是因为在实践过程中不确定pipe下游end事件的触发时机与条件,主要原因是不了解pipe以及文档看的不仔细,刚好借此机会对流有了进一步的认识,收获不少。
同时由于不单单是自己看一遍,还用写博客的方式总结了一番,理解的更细致,印象也更为深刻。
写博客的这两个月,越发觉得写博客是一个自我提升非常有效的方式,加油!