当前位置: 首页 > 面试题库 >

等待最后一个stream.on('data')事件中的异步函数回调

钱黎明
2023-03-14
问题内容

我正在使用fast-csv使用来遍历CSV文件stream。对于CSV文件的每一行,我想在redis中创建一个作业,为此我使用kue。解析行是同步功能。整个过程看起来像这样:

var csvStream = fastCsv(_config.csvOptions)
  .on('data', function(data) {
    var stream = this;
    stream.pause();

    var payload = parseRow(data, _config);
    console.log(payload); // the payload is always printed to the console

    var job = kue.create('csv-row', {
        payload: payload
      })
      .save(function(err) {
        if (!err) console.log('Enqueued at ' + job.id);
        else console.log('Redis error ' + JSON.stringify(err));

        stream.resume();
      });
  })
  .on('end', function() {
    callback(); // this ends my parsing
  });

console.log(payload);CSV文件每一行的简单显示,但是未创建作业。即,没有save打印出回调中的输出,并且该作业不在我的Redis中。

我假设,因为它是CSV文件的最后一行,所以该流已经发出end,因此最后一个kue.create()不能在进程终止之前执行吗?

有没有办法在end完成kue之前停止流的播放?


问题答案:

您可以使用异步库解决此问题。您可以对任何流使用以下模式。

var AsyncLib = require('async');

var worker = function (payload, cb) {
    //do something with payload and call callback
    return cb();
};

var concurrency = 5;
var streamQueue = AsyncLib.queue(worker, concurrency);

var stream = //some readable stream;

stream.on('data', function(data) {
    //no need to pause and resume
    var payload = '//some payload';
    streamQueue.push(payload);
})
.on('end', function() {
    //register drain event on end and callback
    streamQueue.drain = function () {
        callback();
    };
});


 类似资料:
  • 我试图利用es7异步功能,即。 在这里,所有promise*函数都进行ajax调用,并返回或如果ajax响应满足传递的参数,我相信我不能连续使用3个等待,因此需要一种方法来等待所有这些调用以某种方式返回它们的值。

  • 我的JavaScript代码如下所示: 完成所有这些异步调用后,我想计算所有数组的最小值。 我怎么能等到他们所有人呢? 我现在唯一的想法是有一个布尔数组叫做done,并在第i个回调函数中将done[i]设置为true,然后说while(not all are done){} edit:我想一个可能的,但很难看的解决方案是在每个回调中编辑done数组,然后在每个回调中设置了所有其他done的情况下调

  • 问题内容: 我的代码在javascript中看起来像这样: 在完成所有这些异步调用之后,我想计算所有数组的最小值。 我要如何等待所有人? 我现在唯一的想法是拥有一个名为done的布尔数组,并在第i个回调函数中将done [i]设置为true,然后说while(不是全部都完成了){} 编辑:我想一个可能但很丑陋的解决方案是在每个回调中编辑完了的数组,然后如果每个回调中都设置了所有其他完成,则调用一个

  • 我正试图将图像上传到firebase存储,但调用该函数时,未执行wait以获取url。我错过了什么? 看看这个其他主题,我发现问题可能是“然后”,但我如何设置代码以等待url? 异步/等待/然后飞镖/颤振 谢谢

  • 问题内容: 目前,我正在尝试在类构造函数中使用。这样一来,我就可以为正在从事的Electron项目获取自定义标签。 但是,目前该项目无法正常工作,并出现以下错误: 有没有办法避免这种情况,以便我可以在其中使用异步/等待?而不需要回调或.then()? 问题答案: 这 永远 行不通。 该关键字允许在标记为函数中使用,但它也是功能转换成一个承诺发生器。因此,标有的函数将返回承诺。另一方面,构造函数返回

  • 问题内容: 我知道这个问题以前曾被问过,但是所有解决方案都不适合我。 我有一个将参数发送到API的函数,并以列表的形式返回数据。我有一个UITableView设置为使用该列表,但是它在列表分配给变量之前运行。 码: 如果不立即将其作为重复投票,我将不胜感激,这是我尝试的方法。 派遣组 信号量计时 运行变量 其中包括= self和= self 。 编辑:要求提取项目, 问题答案: 您不能-也不应该-