当前位置: 首页 > 知识库问答 >
问题:

管道中的csv解析错误处理

充普松
2023-03-14

作为我正在构建的应用程序的一部分,我正在使用csv-parse读取和操作大型(约5.5GB,800万行)csv文件。我让这个过程运行得相对平稳,但我被困在一个项目上——捕捉由不一致的列数引发的错误。

我之所以使用管道函数,是因为它与应用程序的其余部分配合得很好,但我的问题是,如何将解析器抛出的错误重定向到日志并允许该过程继续?

我认识到,我可以使用relax\u column\u count选项跳过列数不一致的记录,该选项几乎就足够了。问题是,出于数据质量评估的目的,我需要记录这些记录,以便可以返回并查看导致列数不正确的原因(该过程是一个包含许多潜在故障点的提要)。

作为旁注,我知道解决这个问题的最简单方法是清理这个过程的上游数据,但不幸的是,我无法控制数据源。

例如,在示例集中,我得到以下错误:

事件。js:141投掷者;//未处理的“错误”事件
错误:行上的列数(行号)与标题不匹配

样本数据(实际上不是我的数据,但展示了同样的问题):

year, month, value1, value2
2012, 10, A, B
2012, 11, B, C,
2012, 11, C, D,
2013, 11, D, E,
2013, 11, E, F,
2013, 11, F, 
2013, 11, G, G,
2013, 1, H, H,
2013, 11, I, I,
2013, 12, J, J,
2014, 11, K, K,
2014, 4, L, L,
2014, 11, M, M,
2014, 5, N, 
2014, 11, O, N,
2014, 6, P, O,
2015, 11, Q, P,
2015, 11, R, Q,
2015, 11, S, R,
2015, 11, T, S, 

代码:

const fs = require('fs');
const parse = require('csv-parse');
const stringify = require('csv-stringify');
const transform = require('stream-transform');

const paths = {
    input: './sample.csv',
    output: './output.csv',
    error: './errors.csv',
}

var input  = fs.createReadStream(paths.input);
var output = fs.createWriteStream(paths.output);
var error  = fs.createWriteStream(paths.error);

var stringifier = stringify({
    header: true,
    quotedString: true,
});
var parser = parse({
    relax: true,
    delimiter: ',', 
    columns: true, 
    //relax_column_count: true,
})
var transformer = transform((record, callback) => {
    callback(null, record);
}, {parallel: 10});

input.pipe(parser).pipe(transformer).pipe(stringifier).pipe(output);

想法?

共有1个答案

微生自怡
2023-03-14

我想出了解决这个问题的办法。它不使用管道API,而是使用CSV包的回调API。它没有我所希望的那么优雅,但是它是功能性的,并且有显式错误处理的好处,这不会导致进程在不一致的列数上停止。

该过程逐行读取文件,根据设置对象(设置.映射)中的预期字段列表解析该行,然后转换、字符串化并将结果输出行写入新的csv。

我将其设置为记录错误,这是由于与文件头不一致的列数以及一些额外的数据(执行的日期时间、行数和作为诊断信息文本的整行)。我没有设置其他错误类型的日志记录,因为它们都是csv结构错误的下游,但是您也可以修改代码来写入这些错误。(你也可以将它们写入JSON或MySQL数据库,但一次只做一件事)。

好消息是,与直接方法相比,使用这种方法不会对性能造成太大的影响。我没有做过任何正式的性能测试,但是在一个60MB的文件上,两种方法的性能大致相同(假设文件没有不一致的行)。明确的下一步是研究将写入绑定到磁盘以减少I/O。

我仍然非常感兴趣的是,是否有更好的方法来做到这一点,所以如果你有一个想法,一定要发布一个答案!同时,我想我应该发布这个有效的答案,以防它对其他正在处理相同类型的格式不一致的源代码的人有用。

应获得学分的学分,特别是两个问题/答案:

  • 解析Node.js中的大型日志文件-逐行读取
    • 这个答案从分割文件的答案中调整了一些核心代码来逐行读取,这可以防止csv-parse组件在失败的行中关闭(以分割文件的代码开销为代价)再往上游)。实际上,我真的推荐使用iconv-lite,因为它在那篇文章中已经完成了,但是它与最小可复制的例子无关,所以我在这篇文章中删除了它。
    • 这通常有助于更好地理解管道的潜力和局限性。从理论上讲,似乎有一种方法可以将本质上相当于管道分离器的东西从解析器放到出站管道上,但是考虑到我目前的时间限制和与异步进程相关的挑战,在流终止方面,异步进程是相当不可预测的,我使用了回调API代替。

    示例代码:

    'use strict'
    // Dependencies
    const es     = require('event-stream');
    const fs     = require('fs');
    const parse = require('csv-parse');
    const stringify = require('csv-stringify');
    const transform = require('stream-transform');
    
    // Reference objects
    const paths = {
        input: 'path to input.csv',
        output: 'path to output.csv',
        error: 'path to error output.csv',
    }
    const settings = {
        mapping: {
            // Each field is an object with the field name as the key
            // and can have additional properties for use in the transform 
            // component of this process
            // Example
            'year' : {
                import: true,
            }
        }
    }
    
    const metadata = {
        records: 0,
        error: 0
    }
    
    // Set up streams
    var input  = fs.createReadStream(paths.input);
    var errors  = fs.createWriteStream(paths.error,  {flags: 'ax'});
    var output = fs.createWriteStream(paths.output, {flags: 'ax'});
    
    // Begin process (can be refactored into function, but simplified here)
    input
      .pipe(es.split()) // split based on row, assumes \n row endings
      .pipe(es.mapSync(line => { // synchronously process each line
    
        // Remove headers, specified through settings
        if (metadata.records === 0) return metadata.records++;
        var id = metadata.records;
    
        // Parse csv by row 
        parse(line, {
            relax: true,
            delimiter: ',', 
            columns: Object.keys(settings.mapping),
        }, (error, record) => {
    
            // Write inconsistent column error 
            if (error) {
                metadata.error++;
                errors.write(
                    new Date() + ', Inconsistent Columns, ' + 
                     id + ', `' +  
                     line + '`\n'
                );
            }
    
        // Apply transform / reduce
        transform(record, (record) => {
            // Do stuff to record
            return record;
        }, (error, record) => {
    
            // Throw tranform errors
            if (error) {
                throw error;
            }
    
        // Stringify results and write to new csv
        stringify(record, {
               header: false,
               quotedString: true,
        }, (error, record) => {
    
            // Throw stringify errors
            if (error) {
                console.log(error);
            }
    
            // Write record to new csv file
            output.write(record);
        });
        });
        })
    
        // Increment record count
        metadata.records++;
    
      }))  
      .on('end', () => {
        metadata.records--;
        console.log(metadata)
      })    
    

 类似资料:
  • 我有由Angular CLI ver 7.3.9(Angular ver 7.2.0)和IDE生成的项目:PhpStrem ver 2019.1.2 在我的组件模板中,我使用UpperCasePipe: 应用程序工作正常。大写有效。字母很大,但 PhpStorm 报告了一个错误: 未解析的管道大写 检查 TypeScript 调用函数是否有效 预览: PS 我已经安装了AngularJS插件

  • 问题内容: 我有这个错误: 当但奇怪的是,更新此插件时发生错误:管道共享Groovy库,在此工作正常之前,我使用jenkins v 2.21和管道2.4,而我的代码是下一个: 问题答案: Jenkins作业可以保存在执行中,这需要对它们进行序列化。rawBuild的内容无法序列化,因此,如果要访问它,则需要在以开头的函数中进行序列化。例如:

  • 问题内容: 我正在使用angular-ui-router 从服务器获取数据,然后再进入一种状态。有时,对服务器的请求失败,我需要通知用户有关失败的信息。如果我从控制器呼叫服务器,则可以在呼叫失败的情况下在其中放置并呼叫通知服务。我将呼叫发送到服务器是因为我希望子孙状态在服务器启动之前等待服务器的结果。 万一对服务器的调用失败,在哪里可以捕获错误?(我已经阅读了文档,但仍然不确定如何使用。此外,我正

  • 目前,我在netty管道的末尾有一个处理程序,它处理几乎所有的请求逻辑。(使用jackson、身份验证、速率限制等从json解析对象。如果把这些放在执行器之后的管道中的各个阶段,是不是更好地使用管道结构?

  • 问题内容: 我有一个XML文件,我想将其用作管道脚本的输入。问题是XMLParser不可序列化,因此我将其放在NonCPS函数中,但是因此丢失了Node对象。 这是管道脚本: 这是一个输入样本: 结果如下: 我正在使用Jenkins 2.7和管道2.1,这是最新的。 问题答案: 您可以使用XmlSlurper,它对我有用。

  • 您好,我在读取每行包含3列的csv文件时遇到问题。我似乎无法将最后一个单元格(3)解析为整数,即使它始终是一个“可解析”字符串:Berlin,Buenos Aires,7402我似乎无法得到7402编译器抛出的所有结果是: “在java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68