当前位置: 首页 > 知识库问答 >
问题:

读取csv文件,对其进行清理,然后使用Apache Beam dataflow将结果写入csv

卢深
2023-03-14

我想读取一个csv文件,清理它,然后用Apache Beam Dataflow将结果写出csv。目的是使文件可加载到BigQuery中。清理规则是简单地用双引号转义双引号。我的清洁规则管用。我很难把它并入管道。我正在寻求关于我的清洁功能应该返回什么以及如何通过管道调用它的建议。

import apache_beam as beam
import csv
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import WriteToText

lines = p | ReadFromText(file_pattern="gs://dev/clean_input/input01.csv")

def parse_method(line):
    
    CSV_PARSING_KWARGS = {
        'doublequote': True,
        'escapechar': '\\',
        'quotechar': '"',
        'delimiter': ','
    }

    reader = csv.reader(csv_file, CSV_PARSING_KWARGS)
    for rec in reader:
        cw = csv.writer(out_file, escapechar='"', quoting=csv.QUOTE_MINIMAL)
        cw.writerow(rec)
        return rec
        

def run(region, project, bucket, temploc ):
    argv = [
           # Passed in args 
           '--region={}'.format(region),
           '--project={}'.format(project),
           '--temp_location={}'.format(temploc),
           # Constructs
           '--staging_location=gs://{}/clean_input/stg/'.format(bucket),
           # Mandatory constants
           '--job_name=cleammycsv',
           '--runner=DataflowRunner'     
           ]
      
    options = PipelineOptions(
    flags=argv
    )

    pipeline = beam.Pipeline(options=options)
  
    clean_csv = (pipeline
    lines = lines| 'Read' >> beam.Map(parse_method)
    line = lines | 'Output to file' >> WriteToText(file_pattern="gs://dev/clean_output/output_file.csv")
    )   
    pipeline.run()

if __name__ == '__main__':
   import argparse
   
   # Create the parser  
   parser = argparse.ArgumentParser(description='Run the CSV cleaning pipeline')   

   parser.add_argument('-r','--region', help='Region ID where data flow job to run', default='australia-southeast1')
   parser.add_argument('-p','--project', help='Unique project ID', required=True)
   parser.add_argument('-b','--bucket', help='Bucket name', required=True)
   parser.add_argument('-t','--temploc', help='Bucket name and folder', required=True)
   
   # Execute the parse_args() method
   args = vars(parser.parse_args())

   run(project=args['project'], bucket=args['bucket'], region=args['region'],temploc=args['temploc'])

共有1个答案

魏元白
2023-03-14

我终于找到了能起作用的东西。

import apache_beam as beam
import csv
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import WriteToText


def parse_file(element):
  for line in csv.reader([element], quotechar='"', delimiter=',', quoting=csv.QUOTE_ALL):
      line = [s.replace('\"', '') for s in line]
      clean_line = '","'.join(line)
      final_line = '"'+ clean_line +'"'
      return final_line



def run(region, project, bucket, temploc ):
    argv = [
           # Passed in args 
           '--region={}'.format(region),
           '--project={}'.format(project),
           '--temp_location={}'.format(temploc),
           # Constructs
           '--staging_location=gs://{}/clean_input/stg/'.format(bucket),
       # Mandatory constants
           '--job_name=cleammycsv',
           '--runner=DataflowRunner'     
          ]
    filename_in = 'gs://{}/clean_input/IN_FILE.csv'.format(bucket)
    files_output = 'gs://{}/clean_output/OUT_FILE.csv'.format(bucket)
    
    options = PipelineOptions(
    flags=argv
    )

    pipeline = beam.Pipeline(options=options)
   

    clean_csv = (pipeline 
    | 'Read input file' >> beam.io.ReadFromText(filename_in)
    | 'Parse file' >> beam.Map(parse_file)
    | 'writecsv' >> beam.io.WriteToText(files_output,num_shards=10)
   )
   
    pipeline.run()

if __name__ == '__main__':
   import argparse
   
   # Create the parser  
   parser = argparse.ArgumentParser(description='Run the CSV cleaning pipeline')   

   parser.add_argument('-r','--region', help='Region ID where data flow job to run', required=True)
   parser.add_argument('-p','--project', help='Unique project ID', required=True)
   parser.add_argument('-b','--bucket', help='Bucket name', required=True)
   parser.add_argument('-t','--temploc', help='Bucket name and folder', required=True)
   
   # Execute the parse_args() method
   args = vars(parser.parse_args())

   run(project=args['project'], bucket=args['bucket'], region=args['region'],temploc=args['temploc'])
 类似资料:
  • 问题内容: 我的源数据在一个TSV文件中,包含6列和超过200万行。 这是我要完成的工作: 我需要读取此源文件中3列(3、4、5)中的数据 第五列是整数。我需要使用此整数值来复制行条目,并使用第三和第四列中的数据(按整数倍)。 我想将#2的输出写入CSV格式的输出文件。 以下是我想到的。 我的问题:这是一种有效的方法吗?尝试进行200万行时,它似乎很密集。 首先,我制作了一个示例选项卡单独的文件以

  • 问题内容: 我想从标准输入中读取CSV文件,并处理每一行。我的CSV输出代码逐行写入行,但是我的阅读器在迭代行之前等待流终止。这是模块的限制吗?难道我做错了什么? 我的读者代码: 我的作者代码: 输出: 如您所见,所有打印语句都在同一时间执行,但是我希望会有500ms的间隔。 问题答案: 如文档中所述, 为了使循环成为遍历文件行的最有效方法(一种非常常见的操作),该方法使用了隐藏的预读缓冲区。 您

  • 问题内容: 将结构转储到提供的csv文件中的惯用golang方法是什么?我在一个func里面,我的结构作为接口{}传递: 为什么要使用界面{}?-从JSON读取数据,可能会返回一些不同的结构,因此尝试编写足够通用的函数。 我的类型的一个例子: 问题答案: 如果您使用具体类型,将会容易得多。您可能想要使用该软件包,这是一个相关示例;https://golang.org/pkg/encoding/cs

  • 我正在为类创建一个EMPLOYeE记录文件的程序。我创建了两个结构。一个叫做雇员,一个叫做日期。EMPLOYeE结构有一个字符数组、一个int、6个浮点值和DATE(另一个结构)。DATE结构只有三个int值(月、日、年)。 我创建了一个名为person[1000]的EMPLOYEE类型数组。 这是我的代码,我一直得到一个调试断言失败的错误,在可视化工作室指向fWrite. c表达式:(流!=NU

  • 问题内容: 我想以编程方式编辑python源代码。基本上,我想读取一个文件,生成AST,然后写回修改后的python源代码(即另一个文件)。 有多种方法可以使用标准python模块(例如或)来解析/编译python源代码。但是,我认为它们都不支持修改源代码(例如删除此函数声明)然后写回修改后的python源代码的方法。 更新:我想这样做的原因是我想为python编写一个Mutation测试库,主要