当前位置: 首页 > 知识库问答 >
问题:

如何在dataflow中从google bucket中读取csv文件,合并,对dataflow中的数据进行一些转换,然后转储到BigQuery中?

吴驰
2023-03-14

我必须用python编写一个数据流作业,它将从GCS读取两个不同的.csv文件,执行联接操作,对联接的数据流的结果执行转换,然后最后将其发送到BigQuery表?

我对此非常陌生,经过大量的研发,我知道我们可以从Apache.Beam进行所有的管道操作。我终于找到了一个模板,但在给定点上仍然有很多困惑。

import logging
import os

import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.pipeline import PipelineOptions


os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='auth_file.json'


class DataTransformation:
    """A helper class that translates a CSV into a format BigQuery will accept."""

     def __init__(self):
         dir_path = os.path.dirname(os.path.realpath(__file__))
         # Here we read the output schema from a json file.  This is used to specify the types
         # of data we are writing to BigQuery.
         self.schema = os.path.join(dir_path, 'resources',
                                    'gs://wahtch_dog_dataflow/schema.json')
        
     # Parse the input csv and convert into a BigQuery-savable dictionary.
     def read_all_from_url(beam.DoFn):
           with FileSystems.open(url) as f:
            return f.read()
    

def run(argv=None):
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to read. This can be a local file or '
        'a file in a Google Storage Bucket.',
        default = 'gs://wahtch_dog_dataflow/demo.csv')

    parser.add_argument('--output',
                        dest='output',
                        required=False,
                        help='Output BQ table to write results to.',
                        default='watchdog_output.transformed')

    # Parse arguments from the command line.
    known_args, pipeline_args = parser.parse_known_args(argv)

    # DataIngestion is a class we built in this script to hold the logic for
    # transforming the file into a BigQuery table.
    data_ingestion = DataTransformation()
    url = "gs://smart-ivr-dl-pushed-data"
    # Initiate the pipeline using the pipeline arguments passed in from the
    # command line. This includes information such as the project ID and
    # where Dataflow should store temp files.
    p = beam.Pipeline(options=PipelineOptions(pipeline_args))

    (
     p | beam.Create(urls)
       |'Reading latest file' >> beam.ParDo(read_all_from_url())
    
     # This stage of the pipeline translates from a CSV file single row
     # input as a string, to a dictionary object consumable by BigQuery.
     # It refers to a function we have written. This function will
     # be run in parallel on different workers using input from the
     # previous stage of the pipeline.
     | 'String To BigQuery Row' >>
         beam.Map(lambda s: data_ingestion.parse_method(s))
     | 'Write to BigQuery' >> beam.io.Write(
         beam.io.BigQuerySink(
             # The table name is a required argument for the BigQuery sink.
             # In this case we use the value passed in from the command line.
             known_args.output,
             # Here we use the simplest way of defining a schema:
             # fieldName:fieldType

             ###### schema of the ivr
             schema=schema ,

             # Creates the table in BigQuery if it does not yet exist.
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
             # Deletes all data in the BigQuery table before writing.
             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
    p.run().wait_until_finish()

        
if __name__ == '__main__':
     logging.getLogger().setLevel(logging.INFO)
     DataTransformation.run()

我有以下问题:

>

  • read_all_from_url:如何在这里读取多个文件,这里的url是什么?它是桶的名称,还是存储路径?

    如何从bucket中读取schema,(上面我从某个地方找到了它,我们可以这样读,但我怀疑它是否可以像上面那样读schema)

    class ReadOrc(beam.DoFn):
          def process(self, element):
               df = pd.read_csv(element) 
               yield df
            
    
    csv_lines = (p | beam.Create(urls) |
                'Reading latest file' >> beam.ParDo(ReadOrc()) 
              | 'transform' >> beam.ParDo(transform()))
    

    现在在transform中,我想要连接dataframe并执行预处理步骤。

  • 共有1个答案

    满和安
    2023-03-14
    import logging
    import io
    
    import apache_beam as beam
    from apache_beam.io import fileio
    from apache_beam.pipeline import PipelineOptions
    
    p = beam.Pipeline(options=PipelineOptions(pipeline_args))
    
    (
     p 
     | beam.Create(urls)
     | 'Finding latest file' >> fileio.MatchAll()
     | 'Get file handlers' >> fileio.ReadMatches()
     | 'Read each file handler' >> beam.FlatMap(
           lambda rf: csv.reader(io.TextIOWrapper(rf.open())))
     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
             known_args.output,
             schema=schema ,
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
    
    p.run().wait_until_finish()
    

    如果您正在从CSV中读取Dataframes,您可以执行yield df.iterrows()-这将把Dataframe分解为各个行-然后您可以将它们连接起来。

     类似资料:
    • 我想读取一个csv文件,清理它,然后用Apache Beam Dataflow将结果写出csv。目的是使文件可加载到BigQuery中。清理规则是简单地用双引号转义双引号。我的清洁规则管用。我很难把它并入管道。我正在寻求关于我的清洁功能应该返回什么以及如何通过管道调用它的建议。

    • 我有问题,但我不知道怎么解决。有人能帮帮我吗?问题。在文本文件中 我想从文件中获取数据,并使用它。我的代码: 在程序总数不是60.60,但它是需要的。问题在哪里?

    • 假设gcs中的文件以以下格式存储:-.avro。尝试在google dataflow作业中使用读取文件,使用apache Beam的fileio.matchall库读取基于时间戳间隔的文件。例如,gcs中的文件: 现在我们要获取所有大于时间戳20200101000000直到当前时间戳的文件,我可以使用什么文件模式?

    • 问题内容: 我是脚本新手。我有一个表(),我需要创建另一个表,该表的Table1行按列排列,反之亦然。我已经找到了针对Perl和SQL而不是针对Python的解决方案。 我两天前才开始学习Python,所以据我所知: 这只是将列复制为列。我现在想做的是将最后一行写为,但是似乎没有这样的命令,而且我还没有找到将行写为列的另一种方法。 问题答案: 通常,转置可迭代序列的解决方案是:zip(* orig

    • 本文向大家介绍如何读取CSV文件并将值存储到C#中的数组中?,包括了如何读取CSV文件并将值存储到C#中的数组中?的使用技巧和注意事项,需要的朋友参考一下 CSV文件是逗号分隔的文件,用于以有组织的方式存储数据。它通常以表格形式存储数据。大多数企业组织将其数据存储在CSV文件中。 CSV文件是逗号分隔的文件,用于以有组织的方式存储数据。它通常以表格形式存储数据。大多数企业组织将其数据存储在CSV文

    • 我想知道Apache Beam.Google DataFlow是否足够聪明,能够识别数据流图中的重复转换,并只运行一次。例如,如果我有2个分支: null