当前位置: 首页 > 文档资料 > NodeJS Stream 手册 >

流模块基础

优质
小牛编辑
136浏览
2023-12-01

流模块基础

在node中,一共有五种类型的流:readable,writable,transform,duplex以及”classic”

pipe

无论哪一种流,都会使用.pipe()方法来实现输入和输出。

.pipe()函数很简单,它仅仅是接受一个源头src并将数据输出到一个可写的流dst中:

  1. src.pipe(dst)

.pipe(dst)将会返回dst因此你可以链式调用多个流:

  1. a.pipe(b).pipe(c).pipe(d)

上面的代码也可以等价为:

  1. a.pipe(b);
  2. b.pipe(c);
  3. c.pipe(d);

这和你在unix中编写流代码很类似:

  1. a | b | c | d

只不过此时你是在node中编写而不是在shell中!

readable流

Readable流可以产出数据,你可以将这些数据传送到一个writable,transform或者duplex流中,只需要调用pipe()方法:

  1. readableStream.pipe(dst)

创建一个readable流

现在我们就来创建一个readable流!

  1. var Readable = require('stream').Readable;
  2. var rs = new Readable;
  3. rs.push('beep ');
  4. rs.push('boop\n');
  5. rs.push(null);
  6. rs.pipe(process.stdout);

下面运行代码:

  1. $ node read0.js
  2. beep boop

在上面的代码中rs.push(null)的作用是告诉rs输出数据应该结束了。

需要注意的一点是我们在将数据输出到process.stdout之前已经将内容推送进readable流rs中,但是所有的数据依然是可写的。

这是因为在你使用.push()将数据推进一个readable流中时,一直要到另一个东西来消耗数据之前,数据都会存在一个缓存中。

然而,在更多的情况下,我们想要的是当需要数据时数据才会产生,以此来避免大量的缓存数据。

我们可以通过定义一个._read函数来实现按需推送数据:

  1. var Readable = require('stream').Readable;
  2. var rs = Readable();
  3. var c = 97;
  4. rs._read = function () {
  5. rs.push(String.fromCharCode(c++));
  6. if (c > 'z'.charCodeAt(0)) rs.push(null);
  7. };
  8. rs.pipe(process.stdout);

代码的运行结果如下所示:

  1. $ node read1.js
  2. abcdefghijklmnopqrstuvwxyz

在这里我们将字母az推进了rs中,但是只有当数据消耗者出现时,数据才会真正实现推送。

_read函数也可以获取一个size参数来指明消耗者想要读取多少比特的数据,但是这个参数是可选的。

需要注意到的是你可以使用util.inherit()来继承一个Readable流。

为了说明只有在数据消耗者出现时,_read函数才会被调用,我们可以将上面的代码简单的修改一下:

  1. var Readable = require('stream').Readable;
  2. var rs = Readable();
  3. var c = 97 - 1;
  4. rs._read = function () {
  5. if (c >= 'z'.charCodeAt(0)) return rs.push(null);
  6. setTimeout(function () {
  7. rs.push(String.fromCharCode(++c));
  8. }, 100);
  9. };
  10. rs.pipe(process.stdout);
  11. process.on('exit', function () {
  12. console.error('\n_read() called ' + (c - 97) + ' times');
  13. });
  14. process.stdout.on('error', process.exit);

运行上面的代码我们可以发现如果我们只请求5比特的数据,那幺_read只会运行5次:

  1. $ node read2.js | head -c5
  2. abcde
  3. _read() called 5 times

在上面的代码中,setTimeout很重要,因为操作系统需要花费一些时间来发送程序结束信号。

另外,process.stdout.on('error',fn)处理器也很重要,因为当head不再关心我们的程序输出时,操作系统将会向我们的进程发送一个SIGPIPE信号,此时process.stdout将会捕获到一个EPIPE错误。

上面这些复杂的部分在和操作系统相关的交互中是必要的,但是如果你直接和node中的流交互的话,则可有可无。

如果你创建了一个readable流,并且想要将任何的值推送到其中的话,确保你在创建流的时候指定了objectMode参数,Readable({ objectMode: true })

消耗一个readable流

大部分时候,将一个readable流直接pipe到另一种类型的流或者使用through或者concat-stream创建的流中,是一件很容易的事情。但是有时我们也会需要直接来消耗一个readable流。

  1. process.stdin.on('readable', function () {
  2. var buf = process.stdin.read();
  3. console.dir(buf);
  4. });

代码运行结果如下所示:

  1. $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js
  2. <Buffer 61 62 63 0a>
  3. <Buffer 64 65 66 0a>
  4. <Buffer 67 68 69 0a>
  5. null

当数据可用时,readable事件将会被触发,此时你可以调用.read()方法来从缓存中获取这些数据。

当流结束时,.read()将返回null,因为此时已经没有更多的字节可以供我们获取了。

你也可以告诉.read()方法来返回n个字节的数据。虽然所有核心对象中的流都支持这种方式,但是对于对象流来说这种方法并不可用。

下面是一个例子,在这里我们制定每次读取3个字节的数据:

  1. process.stdin.on('readable', function () {
  2. var buf = process.stdin.read(3);
  3. console.dir(buf);
  4. });

运行上面的例子,我们将获取到不完整的数据:

  1. $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js
  2. <Buffer 61 62 63>
  3. <Buffer 0a 64 65>
  4. <Buffer 66 0a 67>

这是因为多余的数据都留在了内部的缓存中,因此这个时候我们需要告诉node我们还对剩下的数据感兴趣,我们可以使用.read(0)来完成这件事:

  1. process.stdin.on('readable', function () {
  2. var buf = process.stdin.read(3);
  3. console.dir(buf);
  4. process.stdin.read(0);
  5. });

到现在为止我们的代码和我们所期望的一样了!

  1. $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js
  2. <Buffer 61 62 63>
  3. <Buffer 0a 64 65>
  4. <Buffer 66 0a 67>
  5. <Buffer 68 69 0a>

我们也可以使用.unshift()方法来放置多余的数据。

使用unshift()方法能够防止我们进行不必要的缓存拷贝。在下面的代码中我们将创建一个分割新行的可读解析器:

  1. var offset = 0;
  2. process.stdin.on('readable', function () {
  3. var buf = process.stdin.read();
  4. if (!buf) return;
  5. for (; offset < buf.length; offset++) {
  6. if (buf[offset] === 0x0a) {
  7. console.dir(buf.slice(0, offset).toString());
  8. buf = buf.slice(offset + 1);
  9. offset = 0;
  10. process.stdin.unshift(buf);
  11. return;
  12. }
  13. }
  14. process.stdin.unshift(buf);
  15. });

代码的运行结果如下所示:

  1. $ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js
  2. 'hearties'
  3. 'heartiest'
  4. 'heartily'
  5. 'heartiness'
  6. 'heartiness\'s'
  7. 'heartland'
  8. 'heartland\'s'
  9. 'heartlands'
  10. 'heartless'
  11. 'heartlessly'

当然,已经有很多这样的模块比如split来帮助你完成这件事情,你完全不需要自己写一个。

writable流

一个writable流指的是只能流进不能流出的流:

  1. src.pipe(writableStream)

创建一个writable流

只需要定义一个._write(chunk,enc,next)函数,你就可以将一个readable流的数据释放到其中:

  1. var Writable = require('stream').Writable;
  2. var ws = Writable();
  3. ws._write = function (chunk, enc, next) {
  4. console.dir(chunk);
  5. next();
  6. };
  7. process.stdin.pipe(ws);

代码运行结果如下所示:

  1. $ (echo beep; sleep 1; echo boop) | node write0.js
  2. <Buffer 62 65 65 70 0a>
  3. <Buffer 62 6f 6f 70 0a>

第一个参数,chunk代表写进来的数据。

第二个参数enc代表编码的字符串,但是只有在opts.decodeStringfalse的时候你才可以写一个字符串。

第三个参数,next(err)是一个回调函数,使用这个回调函数你可以告诉数据消耗者可以写更多的数据。你可以有选择性的传递一个错误对象error,这时会在流实体上触发一个emit事件。

在从一个readable流向一个writable流传数据的过程中,数据会自动被转换为Buffer对象,除非你在创建writable流的时候制定了decodeStrings参数为false,Writable({decodeStrings: false})

如果你需要传递对象,需要指定objectMode参数为trueWritable({ objectMode: true })

向一个writable流中写东西

如果你需要向一个writable流中写东西,只需要调用.write(data)即可。

  1. process.stdout.write('beep boop\n');

为了告诉一个writable流你已经写完毕了,只需要调用.end()方法。你也可以使用.end(data)在结束前再写一些数据。

  1. var fs = require('fs');
  2. var ws = fs.createWriteStream('message.txt');
  3. ws.write('beep ');
  4. setTimeout(function () {
  5. ws.end('boop\n');
  6. }, 1000);

运行结果如下所示:

  1. $ node writing1.js
  2. $ cat message.txt
  3. beep boop

如果你需要调整内部缓冲区大小,那幺需要在创建可写流对象时设置highWaterMark。在调用.write()方法返回false时,说明写入的数据大小超过了该值。

为了避免读写速率不匹配而造成内存上涨,可以监听drain事件,等待可写流内部缓存被清空再继续写入。

transform流

你可以将transform流想象成一个流的中间部分,它可以读也可写,但是并不保存数据,它只负责处理流经它的数据。

duplex流

Duplex流是一个可读也可写的流,就好像一个电话,可以接收也可以发送语音。一个rpc交换是一个duplex流的最好的例子。如果你看到过下面这样的代码:

  1. a.pipe(b).pipe(a)

那幺你需要处理的就是一个duplex流对象。

classic流

Classic流是一个古老的接口,最早出现在node 0.4中。虽然现在不怎幺用,但是我们最好还是来了解一下它的工作原理。

无论何时,只要一个流对象注册了一个data监听器,它就会自动的切换到classic模式,并且根据旧API的方式运行。

classic readable流

Classic readable流只是一个事件发射器,当有数据消耗者出现时发射emit事件,当输出数据完毕时发射end事件。

我们可以同构检查stream.readable来检查一个classic流对象是否可读。

下面是一个简单的readable流对象的例子,程序的运行结果将会输出AJ

  1. var Stream = require('stream');
  2. var stream = new Stream;
  3. stream.readable = true;
  4. var c = 64;
  5. var iv = setInterval(function () {
  6. if (++c >= 75) {
  7. clearInterval(iv);
  8. stream.emit('end');
  9. }
  10. else stream.emit('data', String.fromCharCode(c));
  11. }, 100);
  12. stream.pipe(process.stdout);

运行结果如下所示:

  1. $ node classic0.js
  2. ABCDEFGHIJ

为了从一个classic readable流中读取数据,你可以注册dataend监听器。下面是一个使用旧readable流方式从process.stdin中读取数据的例子:

  1. process.stdin.on('data', function (buf) {
  2. console.log(buf);
  3. });
  4. process.stdin.on('end', function () {
  5. console.log('__END__');
  6. });

运行结果如下所示:

  1. $ (echo beep; sleep 1; echo boop) | node classic1.js
  2. <Buffer 62 65 65 70 0a>
  3. <Buffer 62 6f 6f 70 0a>
  4. __END__

需要注意的一点是当你在一个流对象上注册了一个data监听器,你就将这个流放在了兼容模式下,此时你不能使用两个stream2的api。

如果你自己创建流对象,永远不要绑定dataend监听器。如果你需要和旧版本的流兼容,最好使用第三方库来实现.pipe()方法。

例如,你可以使用through模块来避免显式的使用dataend监听器:

  1. var through = require('through');
  2. process.stdin.pipe(through(write, end));
  3. function write (buf) {
  4. console.log(buf);
  5. }
  6. function end () {
  7. console.log('__END__');
  8. }

程序运行结果如下所示:

  1. $ (echo beep; sleep 1; echo boop) | node through.js
  2. <Buffer 62 65 65 70 0a>
  3. <Buffer 62 6f 6f 70 0a>
  4. __END__

你也可以使用concat-stream模块来将整个流的内容缓存起来:

  1. var concat = require('concat-stream');
  2. process.stdin.pipe(concat(function (body) {
  3. console.log(JSON.parse(body));
  4. }));

程序运行结果如下所示:

  1. $ echo '{"beep":"boop"}' | node concat.js
  2. { beep: 'boop' }

Classic readable流拥有.pause().resume()逻辑来暂停一个流,但是这都是可选的。如果你想要使用.pause().resume()方法,你应该使用through模块来帮助你处理缓存。

classic writable流

Classic writable流非常简单。其中只定义了.write(buf).end(buf),以及.desctory()方法。其中.end(buf)的参数buf是可选参数,但是一般来说node程序员还是喜欢使用.end(buf)这种写法。