如果可能,我想通过管道将两个Node.js流合并为一个。我正在使用转换流。
换句话说,我希望我的图书馆归还myStream
给人们使用。例如,他们可以写:
process.stdin.pipe(myStream).pipe(process.stdout);
在内部,我使用的是第三方vendorStream
,它可以完成一些工作,并插入包含在我自己的逻辑中myInternalStream
。因此,以上内容将转换为:
process.stdin.pipe(vendorStream).pipe(myInternalStream).pipe(process.stdout);
我可以那样做吗?我已经尝试过了,var myStream = vendorStream.pipe(myInternalStream)
但这显然行不通。
为了与进行类比bash
,假设我想编写一个程序来检查h
某个流(tail -n 1 | grep h
)的最后一行中是否存在该字母,我可以创建一个shell脚本:
# myscript.sh
tail -n 1 | grep h
然后,如果人们这样做:
$ printf "abc\ndef\nghi" | . myscript.sh
它只是工作。
这是我到目前为止的内容:
// Combine a pipe of two streams into one stream
var util = require('util')
, Transform = require('stream').Transform;
var chunks1 = [];
var stream1 = new Transform();
var soFar = '';
stream1._transform = function(chunk, encoding, done) {
chunks1.push(chunk.toString());
var pieces = (soFar + chunk).split('\n');
soFar = pieces.pop();
for (var i = 0; i < pieces.length; i++) {
var piece = pieces[i];
this.push(piece);
}
return done();
};
var chunks2 = [];
var count = 0;
var stream2 = new Transform();
stream2._transform = function(chunk, encoding, done) {
chunks2.push(chunk.toString());
count = count + 1;
this.push(count + ' ' + chunk.toString() + '\n');
done();
};
var stdin = process.stdin;
var stdout = process.stdout;
process.on('exit', function () {
console.error('chunks1: ' + JSON.stringify(chunks1));
console.error('chunks2: ' + JSON.stringify(chunks2));
});
process.stdout.on('error', process.exit);
// stdin.pipe(stream1).pipe(stream2).pipe(stdout);
// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]
// Best working solution I could find
var stream3 = function(src) {
return src.pipe(stream1).pipe(stream2);
};
stream3(stdin).pipe(stdout);
// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]
这是可能吗?让我知道我要做什么不清楚。
谢谢!
您可以查看要传送到流中的内容,然后将其传送到您unpipe
感兴趣的流中:
var PassThrough = require('stream').PassThrough;
var stream3 = new PassThrough();
// When a source stream is piped to us, undo that pipe, and save
// off the source stream piped into our internally managed streams.
stream3.on('pipe', function(source) {
source.unpipe(this);
this.transformStream = source.pipe(stream1).pipe(stream2);
});
// When we're piped to another stream, instead pipe our internal
// transform stream to that destination.
stream3.pipe = function(destination, options) {
return this.transformStream.pipe(destination, options);
};
stdin.pipe(stream3).pipe(stdout);
您可以将此功能提取到自己的可构造流类中:
var util = require('util');
var PassThrough = require('stream').PassThrough;
var StreamCombiner = function() {
this.streams = Array.prototype.slice.apply(arguments);
this.on('pipe', function(source) {
source.unpipe(this);
for(i in this.streams) {
source = source.pipe(this.streams[i]);
}
this.transformStream = source;
});
};
util.inherits(StreamCombiner, PassThrough);
StreamCombiner.prototype.pipe = function(dest, options) {
return this.transformStream.pipe(dest, options);
};
var stream3 = new StreamCombiner(stream1, stream2);
stdin.pipe(stream3).pipe(stdout);
我正在尝试从管道中自动创建Jenkins管道构建。 我有一个管道,它创建一个比特桶存储库并向其提交一些代码,包括一个Jenkinsfile。 我需要向此管道添加另一个步骤,然后为其创建管道生成,这将运行 Jenkinsfile 中的步骤。 我认为Jobs DSL应该能够处理这一点,但我找到的文档非常稀少,我仍然不完全确定是否有可能或如何做到这一点。 任何帮助都将不胜感激。我想生成的Pipeline
问题内容: 我想一个拆分成的基础上的内容。结果将包含原始流数据的一部分。 我的实际应用程序更加复杂(将时间间隔列表中的日志行分组),但是我的问题是如何处理流,因此在这里我问一个简化的示例。 我希望能够基于重复的相同数字将a拆分为一个,而仅将流保留为奇数。 例如,以下流包含: 需要产生包含以下内容的流: 通过使用过滤器开始(或结束),可以省去偶数: 这是不希望的,因为这意味着对每个输入值进行两次评估
问题内容: 如何在node.js中创建命名管道? PS:目前,我正在按如下方式创建命名管道。但是我认为这不是最好的方法 问题答案: 看起来,Node核心不支持名称管道-从Ben Noordhuis 11/10/11开始: Windows具有命名管道的概念,但是自您提到以来,我假设您的意思是UNIX FIFO。 我们不支持它们,可能永远也不会(在非阻塞模式下的FIFO可能会死锁事件循环),但是如果您
我想根据的内容将单个拆分为的。生成的应该包含原始流数据的一部分。 我的实际应用程序更复杂(它是对时间间隔列表中的日志行进行分组),但我的问题是如何处理流,因此这里我询问一个简化的示例。 我希望能够拆分
问题内容: 我正在使用ya-csv库,它希望将文件或流作为输入,但是我有一个字符串。 如何将该字符串转换为Node中的流? 问题答案: 从节点12.3开始,stream.Readable有一个方法可以轻松地从任何可迭代对象(包括数组文字)创建流:
问题内容: 我想使用Dataflow将数据从发布/订阅移到GCS。因此,基本上我希望Dataflow在固定的时间量(例如15分钟)内累积一些消息,然后在经过该时间量后将这些数据作为文本文件写入GCS。 我的最终目标是创建一个自定义管道,因此“ Pub / Sub to Cloud Storage”模板对我来说还不够,而且我完全不了解Java,这使我开始使用Python进行调整。 这是到目前为止我所