当前位置: 首页 > 知识库问答 >
问题:

通过在谷歌云数据流中使用Python SDK推断模式来读写avro文件-Apache Beam

戎亦
2023-03-14

问题:我正在尝试创建一个云数据流管道,该管道使用Python SDK从Google云存储读取Avro文件,进行一些处理并在Google云存储上写回Avro文件。在查看ApacheBeam网站上提供的一些示例后,我尝试运行以下代码。我使用了ReadFromAvroWriteToAvro函数。我试图实现的是读取一个Avro文件并使用Dataflow写入同一个Avro文件,但它给了我以下警告,并且没有输出Avro文件。

警告/错误:

/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/io/gcp/gcsio.py:121: DeprecationWarning: object() takes no parameters
  super(GcsIO, cls).__new__(cls, storage_client))
INFO:root:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Finished the size estimation of the input at 1 files. Estimation took 0.31790304184 seconds
Traceback (most recent call last):
  File "/Users/USER/PycharmProjects/GCP-gcs_to_bq/gcs-bq.py", line 52, in <module>
    run()
  File "/Users/USER/PycharmProjects/GCP-gcs_to_bq/gcs-bq.py", line 47, in run
    records | WriteToAvro(known_args.output)
TypeError: __init__() takes at least 3 arguments (2 given)

代码:

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam
from apache_beam.io import ReadFromAvro
from apache_beam.io import WriteToAvro
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        default='gs://BUCKET/000000_0.avro',
                        help='Input file to process.')
    parser.add_argument('--output',
                        dest='output',
                        default='gs://BUCKET/',
                        #required=True,
                        help='Output file to write results to.')
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
        # CHANGE 2/5: (OPTIONAL) Change this to DataflowRunner to
        # run your pipeline on the Google Cloud Dataflow Service.
        '--runner=DataflowRunner',
        # CHANGE 3/5: Your project ID is required in order to run your pipeline on
        # the Google Cloud Dataflow Service.
        '--project=PROJECT_NAME',
        # CHANGE 4/5: Your Google Cloud Storage path is required for staging local
        # files.
        '--staging_location=gs://BUCKET/staging',
        # CHANGE 5/5: Your Google Cloud Storage path is required for temporary
        # files.
        '--temp_location=gs://BUCKET/temp',
        '--job_name=parse-avro',
    ])
    pipeline_options = PipelineOptions(pipeline_args)
    p = beam.Pipeline(options=pipeline_options)

    # Read the avro file[pattern] into a PCollection.
    records = p | ReadFromAvro(known_args.input)
    records | WriteToAvro(known_args.output)


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

编辑:

我尝试将架构添加到WriteToAvro函数,但现在出现以下错误:

错误:

/usr/local/bin/python /Users/USER/PycharmProjects/GCP-gcs_to_bq/gcs-bq.py
No handlers could be found for logger "oauth2client.contrib.multistore_file"

/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: <type 'NoneType'>.
  warnings.warn('Using fallback coder for typehint: %r.' % typehint)

架构:

{"fields": [{"default": null, "type": ["null", {"logicalType": "timestamp-millis", "type": "long"}], "name": "_col0"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col1"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col2"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col3"}, {"default": null, "type": ["null", "long"], "name": "_col4"}, {"default": null, "type": ["null", {"logicalType": "char", "type": "string", "maxLength": 1}], "name": "_col5"}, {"default": null, "type": ["null", {"logicalType": "varchar", "type": "string", "maxLength": 10}], "name": "_col6"}, {"default": null, "type": ["null", "double"], "name": "_col7"}, {"default": null, "type": ["null", "long"], "name": "_col8"}, {"default": null, "type": ["null", {"logicalType": "varchar", "type": "string", "maxLength": 6}], "name": "_col9"}, {"default": null, "type": ["null", {"logicalType": "varchar", "type": "string", "maxLength": 6}], "name": "_col10"}], "type": "record", "name": "baseRecord"}

代码:

pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)

schema = avro.schema.parse(open("avro.avsc", "rb").read())

# Read the avro file[pattern] into a PCollection.
records = p | ReadFromAvro(known_args.input)
records | WriteToAvro(known_args.output, schema=schema)

共有2个答案

燕青青
2023-03-14

该错误表示您的代码没有将所有必需的参数传递给WriteToAvro()转换的构造函数——事实上,它至少需要2个参数(文件名前缀和模式),但是这段代码只传递1个参数(文件名前缀)。

WriteToAvro当前需要该架构:它不是可选参数,没有避免指定它的解决方法。原因是Avro文件通常需要在创建文件之前预先知道模式,因此WriteToAvro也需要知道模式。

此外,我们无法从ReadFromAvro返回的集合中明确推断模式:假设用户将一个文件模式作为--input传递,该文件模式匹配具有多个不同模式的Avro文件-这些模式中的哪一个将WriteToAvro必须使用?

武晨
2023-03-14

问题是数据管道实际上没有被执行。我设法修好了它。解决方案是,您需要在以下2个选项中的任何一个中运行波束管道:

备选案文1:

p = beam.Pipeline(options=pipeline_options)

schema = avro.schema.parse(open("avro.avsc", "rb").read())

records = p | 'Read from Avro' >> ReadFromAvro(known_args.input)

# Write the file
records | 'Write to Avro' >> WriteToAvro(known_args.output, schema=schema, file_name_suffix='.avro')

# Run the pipeline
result = p.run()
result.wait_until_finish()

选项2:使用python关键字执行管道:

schema = avro.schema.parse(open("avro.avsc", "rb").read())

with beam.Pipeline(options=pipeline_options) as p:
    records = p | ReadFromAvro(known_args.input)
    records | WriteToAvro(known_args.output, schema=schema, file_name_suffix='.avro')
 类似资料:
  • 我需要从压缩的GCS文件中解析json数据,因为文件扩展名是。gz,所以它应该由dataflow正确地重新组织和处理,但是作业日志打印出不可读的字符和未处理的数据。当我处理未压缩的数据时,它工作得很好。我使用以下方法映射/解析JSON: 你知道原因是什么吗? 运行时的配置: 输入文件名示例:file.gz,命令gsutil ls-l gs://bucket/input/file.gz grep c

  • 结果如何在工作人员之间分配?是使用查询结果创建一个表,工作人员从中读取页面,还是每个工作人员运行查询并读取不同的页面或。。。怎样

  • 我正在运行数据流作业从气流。我需要说我是气流的新手。数据流(从气流运行)正在成功运行,但我可以看到气流在获得工作状态时遇到了一些问题,我收到了无限的消息,比如: 谷歌云数据流作业尚不可用。。 以下是将所有步骤添加到数据流后的日志(我将{project ectID}和{jobID}放在它所在的位置): 你知道这是什么原因吗?我找不到与此问题相关的任何解决方案。我应该提供更多信息吗? 这是我在DAG中

  • 我试图上传图像文件到我在GCS的桶中,只在我的页面上插入了一个文件输入和一个按钮进行测试,当我点击按钮时,它没有做任何事情,没有错误,什么都没有。我在谷歌云平台计算引擎中使用的是Ubuntu VM。实际上,我无法使用谷歌云存储客户端做任何事情,无法上传对象、列出对象、列出存储桶,我已经将它上传到我的网站,但它不工作,首先在localhost上尝试,但它给了我这个错误:cURL错误60: SSL证书

  • 我正在尝试使用谷歌云数据流将谷歌PubSub消息写入谷歌云存储。PubSub消息采用json格式,我要执行的唯一操作是从json到parquet文件的转换。

  • 假设gcs中的文件以以下格式存储:-.avro。尝试在google dataflow作业中使用读取文件,使用apache Beam的fileio.matchall库读取基于时间戳间隔的文件。例如,gcs中的文件: 现在我们要获取所有大于时间戳20200101000000直到当前时间戳的文件,我可以使用什么文件模式?