当前位置: 首页 > 工具软件 > node-mkv > 使用案例 >

Node.js 指南(流中的背压)

司寇山
2023-12-01

流中的背压

在数据处理过程中会出现一个叫做背压的常见问题,它描述了数据传输过程中缓冲区后面数据的累积,当传输的接收端具有复杂的操作时,或者由于某种原因速度较慢时,来自传入源的数据就有累积的趋势,就像阻塞一样。

要解决这个问题,必须有一个委托系统来确保数据从一个源到另一个源的平滑流动,不同的社区已经针对他们的程序独特地解决了这个问题,Unix管道和TCP套接字就是很好的例子,并且通常被称为流量控制,在Node.js中,流是已采用的解决方案。

本指南的目的是进一步详细说明背压是什么,以及精确流如何在Node.js的源代码中解决这个问题,本指南的第二部分将介绍建议的最佳实践,以确保在实现流时应用程序的代码是安全的和优化的。

我们假设你对Node.js中背压BufferEventEmitter的一般定义以及Stream的一些经验有所了解。如果你还没有阅读这些文档,那么首先查看API文档并不是一个坏主意,因为它有助于在阅读本指南时扩展你的理解。

数据处理的问题

在计算机系统中,数据通过管道、sockets和信号从一个进程传输到另一个进程,在Node.js中,我们找到了一种名为Stream的类似机制。流很好!他们为Node.js做了很多事情,几乎内部代码库的每个部分都使用该模块,作为开发人员,我们鼓励你使用它们!

const readline = require('readline');

// process.stdin and process.stdout are both instances of Streams
const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout
});

rl.question('Why should you use streams? ', (answer) => {
  console.log(`Maybe it's ${answer}, maybe it's because they are awesome! :)`);

  rl.close();
});

通过比较Node.js的Stream实现的内部系统工具,可以证明为什么通过流实现背压机制是一个很好的优化的一个很好的例子。

在一种情况下,我们将使用一个大文件(约〜9gb)并使用熟悉的zip(1)工具对其进行压缩。

$ zip The.Matrix.1080p.mkv

虽然这需要几分钟才能完成,但在另一个shell中我们可以运行一个脚本,该脚本采用Node.js的模块zlib,它包含另一个压缩工具gzip(1)

const gzip = require('zlib').createGzip();
const fs = require('fs');

const inp = fs.createReadStream('The.Matrix.1080p.mkv');
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz');

inp.pipe(gzip).pipe(out);

要测试结果,请尝试打开每个压缩文件,zip(1)工具压缩的文件将通知你文件已损坏,而Stream完成的压缩将无错误地解压缩。

注意:在此示例中,我们使用.pipe()将数据源从一端获取到另一端,但是,请注意没有附加正确的错误处理程序。如果无法正确接收数据块,Readable源或gzip流将不会被销毁,pump是一个实用工具,如果其中一个流失败或关闭,它将正确地销毁管道中的所有流,并且在这种情况下是必须的!

只有Nodejs 8.x或更早版本才需要pump,对于Node 10.x或更高版本,引入pipeline来替换pump。这是一个模块方法,用于在流传输之间转发错误和正确清理,并在管道完成时提供回调。

以下是使用管道的示例:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge video file efficiently:

pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

你还可以在管道上调用promisify以将其与async/await一起使用:

const stream = require('stream');
const fs = require('fs');
const zlib = require('zlib');

const pipeline = util.promisify(stream.pipeline);

async function run() {
    try {
        await pipeline(
            fs.createReadStream('The.Matrix.1080p.mkv'),
            zlib.createGzip(),
            fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
        );
        console.log('Pipeline succeeded');
    } catch (err) {
        console.error('Pipeline failed', err);
    }
}

太多的数据,太快

有些情况下,Readable流可能会过快地为Writable提供数据 — 远远超过消费者可以处理的数据!

当发生这种情况时,消费者将开始排队所有数据块以供以后消费,写入队列将变得越来越长,因此在整个过程完成之前,必须将更多数据保存在内存中。

写入磁盘比从磁盘读取要慢很多,因此,当我们尝试压缩文件并将其写入我们的硬盘时,将发生背压,因为写入磁盘将无法跟上读取的速度。

// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!"
// Data will begin to build up on the read-side of the data buffer as
// `write` tries to keep up with the incoming data flow.
inp.pipe(gzip).pipe(outputFile);

这就是背压机制很重要的原因,如果没有背压系统,该进程会耗尽系统的内存,有效地减缓了其他进程,并独占你系统的大部分直到完成。

这导致了一些事情:

  • 减缓所有其他当前进程。
  • 一个非常超负荷的垃圾收集器。
  • 内存耗尽。

在下面的示例中,我们将取出.write()函数的返回值并将其更改为true,这有效地禁用了Node.js核心中的背压支持,在任何对'modified'二进制文件的引用中,我们正在谈论在没有return ret;行的情况下运行node二进制,而改为return true;

垃圾收集器上的过度负荷

我们来看看快速基准测试,使用上面的相同示例,我们进行几次试验,以获得两个二进制的中位时间。

   trial (#)  | `node` binary (ms) | modified `node` binary (ms)
=================================================================
      1       |      56924         |           55011
      2       |      52686         |           55869
      3       |      59479         |           54043
      4       |      54473         |           55229
      5       |      52933         |           59723
=================================================================
average time: |      55299         |           55975

两者都需要大约一分钟来运行,因此根本没有太大差别,但让我们仔细看看以确认我们的怀疑是否正确,我们使用Linux工具dtrace来评估V8垃圾收集器发生了什么。

GC(垃圾收集器)测量时间表示垃圾收集器完成单次扫描的完整周期的间隔:

approx. time (ms) | GC (ms) | modified GC (ms)
=================================================
          0       |    0    |      0
          1       |    0    |      0
         40       |    0    |      2
        170       |    3    |      1
        300       |    3    |      1

         *             *           *
         *             *           *
         *             *           *

      39000       |    6    |     26
      42000       |    6    |     21
      47000       |    5    |     32
      50000       |    8    |     28
      54000       |    6    |     35

虽然这两个过程开始时相同,但似乎以相同的速率运行GC,很明显,在适当工作的背压系统几秒钟后,它将GC负载分布在4-8毫秒的一致间隔内,直到数据传输结束。

但是,当背压系统不到位时,V8垃圾收集开始拖延,正常二进制文件在一分钟内调用GC约75次,然而,修改后的二进制文件仅触发36次。

这是由于内存使用量增加而累积的缓慢而渐进的债务,随着数据传输,在没有背压系统的情况下,每个块传输使用更多内存。

分配的内存越多,GC在一次扫描中需要处理的内存就越多,扫描越大,GC就越需要决定可以释放什么,并且在更大的内存空间中扫描分离的指针将消耗更多的计算能力。

内存耗尽

为确定每个二进制的内存消耗,我们使用/usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js单独为每个进程计时。

这是正常二进制的输出:

Respecting the return value of .write()
=============================================
real        58.88
user        56.79
sys          8.79
  87810048  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
     19427  page reclaims
      3134  page faults
         0  swaps
         5  block input operations
       194  block output operations
         0  messages sent
         0  messages received
         1  signals received
        12  voluntary context switches
    666037  involuntary context switches

虚拟内存占用的最大字节大小约为87.81mb。

现在更改.write()函数的返回值,我们得到:

Without respecting the return value of .write():
==================================================
real        54.48
user        53.15
sys          7.43
1524965376  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
    373617  page reclaims
      3139  page faults
         0  swaps
        18  block input operations
       199  block output operations
         0  messages sent
         0  messages received
         1  signals received
        25  voluntary context switches
    629566  involuntary context switches

虚拟内存占用的最大字节大小约为1.52gb。

如果没有流来委托背压,则分配的内存空间要大一个数量级 — 同一进程之间的巨大差异!

这个实验展示了Node.js的反压机制是如何优化和节省成本的,现在,让我们分析一下它是如何工作的!

背压如何解决这些问题?

将数据从一个进程传输到另一个进程有不同的函数,在Node.js中,有一个名为.pipe()的内部内置函数,还有其他包也可以使用!但最终,在这个过程的基本层面,我们有两个独立的组件:数据来源和消费者。

当从源调用.pipe()时,它向消费者发出信号,告知有数据要传输,管道函数有助于为事件触发器设置适当的背压闭合。

在Node.js中,源是Readable流,而消费者是Writable流(这些都可以与DuplexTransform流互换,但这超出了本指南的范围)。

触发背压的时刻可以精确地缩小到Writable.write()函数的返回值,当然,该返回值由几个条件决定。

在数据缓冲区已超过highWaterMark或写入队列当前正忙的任何情况下,.write()将返回false

当返回false值时,背压系统启动,它会暂停传入的Readable流发送任何数据,并等待消费者再次准备就绪,清空数据缓冲区后,将发出.drain()事件并恢复传入的数据流。

队列完成后,背压将允许再次发送数据,正在使用的内存空间将自行释放并为下一批数据做好准备。

这有效地允许在任何给定时间为.pipe()函数使用固定数量的内存,没有内存泄漏,没有无限缓冲,垃圾收集器只需要处理内存中的一个区域!

那么,如果背压如此重要,为什么你(可能)没有听说过它?答案很简单:Node.js会自动为你完成所有这些工作。

那太好了!但是当我们试图了解如何实现我们自己的自定义流时,也不是那么好。

注意:在大多数机器中,有一个字节大小可以确定缓冲区何时已满(在不同的机器上会有所不同),Node.js允许你设置自己的自定义highWaterMark,但通常,默认设置为16kb16384,或objectMode流为16),在你可能希望提高该值的情况下,可以尝试,但是要小心!

.pipe()的生命周期

为了更好地理解背压,下面是一个关于Readable流的生命周期的流程图,该流被管道传输到Writable流中:

                                                     +===================+
                         x-->  Piping functions   +-->   src.pipe(dest)  |
                         x     are set up during     |===================|
                         x     the .pipe method.     |  Event callbacks  |
  +===============+      x                           |-------------------|
  |   Your Data   |      x     They exist outside    | .on('close', cb)  |
  +=======+=======+      x     the data flow, but    | .on('data', cb)   |
          |              x     importantly attach    | .on('drain', cb)  |
          |              x     events, and their     | .on('unpipe', cb) |
+---------v---------+    x     respective callbacks. | .on('error', cb)  |
|  Readable Stream  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable Stream  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Is this chunk too big?  |
  ^       |       |     emit .end();             |    Is the queue busy?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  No  |        |  Yes  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            when queue is empty     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Buffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   add chunk to queue    |
                                       |            <---^---------------------<
                                       +============+

注意:如果要设置管道以将一些流链接在一起来操作数据,则很可能会实现Transform流。

在这种情况下,你的Readable流的输出将输入到Transform中,并将管道到Writable中。

Readable.pipe(Transformable).pipe(Writable);

背压将自动应用,但请注意,Transform流的输入和输出highWaterMark都可能被操纵并将影响背压系统。

背压指南

从Node.js v0.10开始,Stream类提供了通过使用这些相应函数的下划线版本来修改.read().write()的行为的功能(._read()._write())。

对于实现Readable流和Writable流,有文档化的指南,我们假设你已阅读过这些内容,下一节将更深入一些。

实现自定义流时要遵守的规则

流的黄金法则始终是尊重背压,最佳实践的构成是非矛盾的实践,只要你小心避免与内部背压支持相冲突的行为,你就可以确定你遵循良好做法。

一般来说:

  1. 如果你没有被要求,永远不要.push()
  2. 永远不要在返回false后调用.write(),而是等待'drain'。
  3. 流在不同的Node.js版本和你使用的库之间有变化,小心并测试一下。

注意:关于第3点,构建浏览器流的非常有用的包是readable-stream,Rodd Vagg撰写了一篇很棒的博客文章,描述了这个库的实用性,简而言之,它为Readable流提供了一种自动优雅降级,并支持旧版本的浏览器和Node.js。

Readable流的特定规则

到目前为止,我们已经了解了.write()如何影响背压,并将重点放在Writable流上,由于Node.js的功能,数据在技术上从Readable流向下游Writable。但是,正如我们可以在数据、物质或能量的任何传输中观察到的那样,源与目标一样重要,Readable流对于如何处理背压至关重要。

这两个过程都相互依赖,有效地进行通信,如果Readable忽略Writable流要求它停止发送数据的时候,那么.write()的返回值不正确就会有问题。

因此,关于.write()返回,我们还必须尊重._read()方法中使用的.push()的返回值,如果.push()返回false值,则流将停止从源读取,否则,它将继续而不会停顿。

以下是使用.push()的不好做法示例:

// This is problematic as it completely ignores return value from push
// which may be a signal for backpressure from the destination stream!
class MyReadable extends Readable {
  _read(size) {
    let chunk;
    while (null !== (chunk = getNextChunk())) {
      this.push(chunk);
    }
  }
}

此外,在自定义流之外,存在忽略背压的陷阱,在这个良好的实践的反例中,应用程序的代码会在数据可用时强制通过(由.data事件发出信号):

// This ignores the backpressure mechanisms Node.js has set in place,
// and unconditionally pushes through data, regardless if the
// destination stream is ready for it or not.
readable.on('data', (data) =>
  writable.write(data)
);

Writable流的特定规则

回想一下.write()可能会根据某些条件返回truefalse,幸运的是,在构建我们自己的Writable流时,流状态机将处理我们的回调并确定何时处理背压并为我们优化数据流。

但是,当我们想直接使用Writable时,我们必须尊重.write()返回值并密切注意这些条件:

  • 如果写队列忙,.write()将返回false
  • 如果数据块太大,.write()将返回false(该值由变量highWaterMark指示)。
// This writable is invalid because of the async nature of JavaScript callbacks.
// Without a return statement for each callback prior to the last,
// there is a great chance multiple callbacks will be called.
class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0)
      callback();
    else if (chunk.toString().indexOf('b') >= 0)
      callback();
    callback();
  }
}

// The proper way to write this would be:
    if (chunk.contains('a'))
      return callback();
    else if (chunk.contains('b'))
      return callback();
    callback();

在实现._writev()时还需要注意一些事项,该函数与.cork()结合使用,但写入时有一个常见错误:

// Using .uncork() twice here makes two calls on the C++ layer, rendering the
// cork/uncork technique useless.
ws.cork();
ws.write('hello ');
ws.write('world ');
ws.uncork();

ws.cork();
ws.write('from ');
ws.write('Matteo');
ws.uncork();

// The correct way to write this is to utilize process.nextTick(), which fires
// on the next event loop.
ws.cork();
ws.write('hello ');
ws.write('world ');
process.nextTick(doUncork, ws);

ws.cork();
ws.write('from ');
ws.write('Matteo');
process.nextTick(doUncork, ws);

// as a global function
function doUncork(stream) {
  stream.uncork();
}

.cork()可以被调用多次,我们只需要小心调用.uncork()相同的次数,使其再次流动。

结论

Streams是Node.js中经常使用的模块,它们对于内部结构非常重要,对于开发人员来说,它们可以跨Node.js模块生态系统进行扩展和连接。

希望你现在能够进行故障排除,安全地编写你自己的WritableReadable流,并考虑背压,并与同事和朋友分享你的知识。

在使用Node.js构建应用程序时,请务必阅读有关其他API函数的Stream的更多信息,以帮助改进和释放你的流功能。


上一篇:使用不同的文件系统

下一篇:域模块剖析

 类似资料: