当前位置: 首页 > 面试题库 >

NodeJS,Promise,流-​​处理大型CSV文件

史英飙
2023-03-14
问题内容

我需要构建一个函数来处理大型CSV文件,以便在bluebird.map()调用中使用。考虑到文件的潜在大小,我想使用流媒体。

此函数应接受一个流(一个CSV文件)和一个函数(处理该流中的块),并在读取文件到末尾(已解决)或错误(已拒绝)时返回promise。

所以,我开始:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });

  var db = pgp(api.config.mailroom.fileMakerDbConfig);

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });

}

现在,我有两个相互关联的问题:

  1. 我需要限制正在处理的实际数据量,以免造成内存压力。
  2. 作为processor参数传递的函数通常将是异步的,例如通过基于Promise的库将文件的内容保存到db中(现在:)pg-promise。这样,它将在内存中创建一个承诺并不断重复。

pg-promise库具有管理此功能的功能,例如page(),但是我无法围绕如何将流事件处理程序与这些promise方法混合使用的问题。现在,我readable在each后面的部分的处理程序中返回一个promise
read(),这意味着我创建了大量承诺的数据库操作,最终由于遇到进程内存限制而出现故障。

有没有人有一个可行的例子可以作为我的切入点?

更新 :可能有不止一种方式为猫皮,但这有效:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  // some checks trimmed out for example

  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();

    if(records.length)
      return records;
  };

  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };

  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}

有人看到这种方法有潜在的问题吗?


问题答案:

在下面找到一个完整的应用程序,该应用程序可以正确执行所需的任务:它将文件读取为流,将其解析为CSV,并将每一行插入数据库。

const fs = require('fs');
const promise = require('bluebird');
const csv = require('csv-parse');
const pgp = require('pg-promise')({promiseLib: promise});

const cn = "postgres://postgres:password@localhost:5432/test_db";
const rs = fs.createReadStream('primes.csv');

const db = pgp(cn);

function receiver(_, data) {
    function source(index) {
        if (index < data.length) {
            // here we insert just the first column value that contains a prime number;
            return this.none('insert into primes values($1)', data[index][0]);
        }
    }

    return this.sequence(source);
}

db.task(t => {
    return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
})
    .then(data => {
        console.log('DATA:', data);
    }
    .catch(error => {
        console.log('ERROR:', error);
    });

请注意,我唯一改变的是:使用library csv-parse代替csv,作为更好的选择。

增加了使用方法stream.read从SPEX库,它正确地供应可读与承诺使用流。



 类似资料:
  • 我一直得到“UnhandledPromiseRejectionWarning:UnhandledPromiseRejection.此错误源于在没有catch块的异步函数内部抛出,或拒绝未用.catch()处理的Promission.(拒绝ID:2)” 我已经处理了所有的承诺,但错误仍然存在

  • 问题内容: 我正在使用Python脚本处理大型CSV文件(数以10M行的GB数)。 这些文件具有不同的行长,并且无法完全加载到内存中进行分析。 每行由脚本中的一个函数分别处理。分析一个文件大约需要20分钟,并且看来磁盘访问速度不是问题,而是处理/函数调用。 代码看起来像这样(非常简单)。实际的代码使用Class结构,但这是相似的: 鉴于计算需要共享的数据结构,使用多核在Python中并行运行分析的

  • 我需要将大型csv文件从node发送到Python。这段代码适用于小文件,但不适用于大文件。我也试过产卵过程。我不明白问题出在哪里。如果有人知道正确的代码,请分享 代码: 错误

  • 问题内容: 我有一个很大的xml文件,其中包含许多子元素。我希望能够运行一些xpath查询。我尝试在Java中使用vtd- xml,但有时会出现内存不足错误,因为xml太大,无法容纳到内存中。是否有替代方法来处理如此大的xml。 问题答案: 尝试http://code.google.com/p/jlibs/wiki/XMLDog 它使用sax执行xpaths,而无需创建xml文档的内存表示形式。

  • 我正在导入大小为24MB的csv文件,但每次它都被截断为0KB,并显示以下错误。 导入Csv:无法处理参数,因为参数“name”的值无效。更改“name”参数的值,然后再次运行该操作。 $data=导入Csv 在此对象上找不到属性“Column4”;确保它存在且可设置。 CategoryInfo:InvalidOperation:(Column4:String)[],RuntimeExceptio

  • 问题内容: 我正在尝试处理较大的(可能多达200M)JSON文件。文件的结构基本上是对象数组。 因此,遵循以下原则: 每个对象都具有任意属性,不必与数组中的其他对象共享它们(例如,具有相同的属性)。 我想对数组中的每个对象进行处理,并且由于文件可能很大,因此无法将整个文件内容都包含在内存中,无法解码JSON并遍历PHP数组。 因此,理想情况下,我想读取文件,为每个对象获取足够的信息并进行处理。如果