当前位置: 首页 > 面试题库 >

从两个管道流创建Node.js流

宓昂雄
2023-03-14
问题内容

如果可能,我想通过管道将两个Node.js流合并为一个。我正在使用转换流。

换句话说,我希望我的图书馆归还myStream给人们使用。例如,他们可以写:

process.stdin.pipe(myStream).pipe(process.stdout);

在内部,我使用的是第三方vendorStream,它可以完成一些工作,并插入包含在我自己的逻辑中myInternalStream。因此,以上内容将转换为:

process.stdin.pipe(vendorStream).pipe(myInternalStream).pipe(process.stdout);

我可以那样做吗?我已经尝试过了,var myStream = vendorStream.pipe(myInternalStream)但这显然行不通。

为了与进行类比bash,假设我想编写一个程序来检查h某个流(tail -n 1 | grep h)的最后一行中是否存在该字母,我可以创建一个shell脚本

# myscript.sh
tail -n 1 | grep h

然后,如果人们这样做:

$ printf "abc\ndef\nghi" | . myscript.sh

它只是工作。

这是我到目前为止的内容:

// Combine a pipe of two streams into one stream

var util = require('util')
  , Transform = require('stream').Transform;

var chunks1 = [];
var stream1 = new Transform();
var soFar = '';
stream1._transform = function(chunk, encoding, done) {
  chunks1.push(chunk.toString());
  var pieces = (soFar + chunk).split('\n');
  soFar = pieces.pop();
  for (var i = 0; i < pieces.length; i++) {
    var piece = pieces[i];
    this.push(piece);
  }
  return done();
};

var chunks2 = [];
var count = 0;
var stream2 = new Transform();
stream2._transform = function(chunk, encoding, done) {
  chunks2.push(chunk.toString());
  count = count + 1;
  this.push(count + ' ' + chunk.toString() + '\n');
  done();
};

var stdin = process.stdin;
var stdout = process.stdout;

process.on('exit', function () {
    console.error('chunks1: ' + JSON.stringify(chunks1));
    console.error('chunks2: ' + JSON.stringify(chunks2));
});
process.stdout.on('error', process.exit);


// stdin.pipe(stream1).pipe(stream2).pipe(stdout);

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]

// Best working solution I could find
var stream3 = function(src) {
  return src.pipe(stream1).pipe(stream2);
};
stream3(stdin).pipe(stdout);

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]

这是可能吗?让我知道我要做什么不清楚。

谢谢!


问题答案:

您可以查看要传送到流中的内容,然后将其传送到您unpipe感兴趣的流中:

var PassThrough = require('stream').PassThrough;

var stream3 = new PassThrough();

// When a source stream is piped to us, undo that pipe, and save
// off the source stream piped into our internally managed streams.
stream3.on('pipe', function(source) {
  source.unpipe(this);
  this.transformStream = source.pipe(stream1).pipe(stream2);
});

// When we're piped to another stream, instead pipe our internal
// transform stream to that destination.
stream3.pipe = function(destination, options) {
  return this.transformStream.pipe(destination, options);
};

stdin.pipe(stream3).pipe(stdout);

您可以将此功能提取到自己的可构造流类中:

var util = require('util');
var PassThrough = require('stream').PassThrough;

var StreamCombiner = function() {
  this.streams = Array.prototype.slice.apply(arguments);

  this.on('pipe', function(source) {
    source.unpipe(this);
    for(i in this.streams) {
      source = source.pipe(this.streams[i]);
    }
    this.transformStream = source;
  });
};

util.inherits(StreamCombiner, PassThrough);

StreamCombiner.prototype.pipe = function(dest, options) {
  return this.transformStream.pipe(dest, options);
};

var stream3 = new StreamCombiner(stream1, stream2);
stdin.pipe(stream3).pipe(stdout);


 类似资料:
  • 我正在尝试从管道中自动创建Jenkins管道构建。 我有一个管道,它创建一个比特桶存储库并向其提交一些代码,包括一个Jenkinsfile。 我需要向此管道添加另一个步骤,然后为其创建管道生成,这将运行 Jenkinsfile 中的步骤。 我认为Jobs DSL应该能够处理这一点,但我找到的文档非常稀少,我仍然不完全确定是否有可能或如何做到这一点。 任何帮助都将不胜感激。我想生成的Pipeline

  • 问题内容: 我想一个拆分成的基础上的内容。结果将包含原始流数据的一部分。 我的实际应用程序更加复杂(将时间间隔列表中的日志行分组),但是我的问题是如何处理流,因此在这里我问一个简化的示例。 我希望能够基于重复的相同数字将a拆分为一个,而仅将流保留为奇数。 例如,以下流包含: 需要产生包含以下内容的流: 通过使用过滤器开始(或结束),可以省去偶数: 这是不希望的,因为这意味着对每个输入值进行两次评估

  • 问题内容: 如何在node.js中创建命名管道? PS:目前,我正在按如下方式创建命名管道。但是我认为这不是最好的方法 问题答案: 看起来,Node核心不支持名称管道-从Ben Noordhuis 11/10/11开始: Windows具有命名管道的概念,但是自您提到以来,我假设您的意思是UNIX FIFO。 我们不支持它们,可能永远也不会(在非阻塞模式下的FIFO可能会死锁事件循环),但是如果您

  • 我想根据的内容将单个拆分为的。生成的应该包含原始流数据的一部分。 我的实际应用程序更复杂(它是对时间间隔列表中的日志行进行分组),但我的问题是如何处理流,因此这里我询问一个简化的示例。 我希望能够拆分

  • 问题内容: 我正在使用ya-csv库,它希望将文件或流作为输入,但是我有一个字符串。 如何将该字符串转换为Node中的流? 问题答案: 从节点12.3开始,stream.Readable有一个方法可以轻松地从任何可迭代对象(包括数组文字)创建流:

  • 问题内容: 我想使用Dataflow将数据从发布/订阅移到GCS。因此,基本上我希望Dataflow在固定的时间量(例如15分钟)内累积一些消息,然后在经过该时间量后将这些数据作为文本文件写入GCS。 我的最终目标是创建一个自定义管道,因此“ Pub / Sub to Cloud Storage”模板对我来说还不够,而且我完全不了解Java,这使我开始使用Python进行调整。 这是到目前为止我所