当前位置: 首页 > 知识库问答 >
问题:

使用Google-cloud-data flow beam . io . avroio . writetoavro(

羊丰茂
2023-03-14

使用Google-Cloud-data flow/Cloud Composer将CSV转换为Avro,一切都可以在我的本地环境中运行。当试图读取。avsc文件,它包含来自云存储桶的Avro模式,我一直得到:IOError: [Errno 2]没有这样的文件或目录:“gs://my-bucket/xxx.avsc”

法典:

from __future__ import absolute_import
import argparse
import logging
import ntpath
import avro.schema
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import SetupOptions
from datetime import datetime


class RowTransformer(object):
    def __init__(self, delimiter, header, filename):
        self.delimiter = delimiter
        self.keys = re.split(',', header)
        self.filename = filename

    def parse(self, row):
        self.load_dt = datetime.utcnow()
        split_row = row.split(self.delimiter)
        #Need to cast anything that is not a string into proper type
        split_row[8] = float('0' if not split_row[8] else split_row[8])  
        split_row[9] = float('0' if not split_row[9] else split_row[9])  
        split_row[10] = float('0' if not split_row[10] else split_row[10]) 
        split_row[11] = float('0' if not split_row[11] else split_row[11]) 
        split_row[12] = float('0' if not split_row[12] else split_row[12])
        split_row[13] = float('0' if not split_row[13] else split_row[13]) 
        split_row[14] = float('0' if not split_row[14] else split_row[14]) 
        split_row[15] = float('0' if not split_row[15] else split_row[15]) 
        split_row[16] = float('0' if not split_row[16] else split_row[16]) 
        split_row[17] = float('0' if not split_row[17] else split_row[17]) 
        split_row[18] = str('0' if not split_row[18] else split_row[18])   
        split_row[19] = str('0' if not split_row[19] else split_row[19])  
        split_row.append(self.filename)
        split_row.append(self.load_dt.strftime('%Y-%m-%d %H:%M:%S.%f')) 
        decode_row = [i.decode('UTF-8') if isinstance(i, basestring) else i for i in split_row]
        row = dict(zip(self.keys, decode_row))
        return row

def run(argv=None):
    """The main function which creates the pipeline and runs it."""

    parser = argparse.ArgumentParser()
    parser.add_argument('--input', dest='input', required=False,
        help='Input file to read.  This can be a local file or '
             'a file in a Google Storage Bucket.',
        default='gs://my-bucket/receive/xxx.txt')
    parser.add_argument('--output', dest='output', required=False,
                        help='Output Avro to Cloud Storage',
                        default='gs://my-bucket/')
    parser.add_argument('--schema', dest='schema', required=False,
                        help='Avro Schema',
                        default='gs://my-bucket/xxx.avsc')
    parser.add_argument('--delimiter', dest='delimiter', required=False,
                        help='Delimiter to split input records.',
                        default='|')
    parser.add_argument('--fields', dest='fields', required=False,
                        help='list of field names expected',
                        default='Col1,Col2...etc')
    known_args, pipeline_args = parser.parse_known_args(argv)
    row_transformer = RowTransformer(delimiter=known_args.delimiter,
                                     header=known_args.fields,
                                     filename=ntpath.basename(known_args.input))
    p_opts = pipeline_options.PipelineOptions(pipeline_args)

    with beam.Pipeline(options=p_opts) as pipeline:
        schema_file = avro.schema.parse(open(known_args.schema, "rb").read())
        rows = pipeline | "Read from text file" >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
        dict_records = rows | "Convert to Avro" >> beam.Map(lambda r: row_transformer.parse(r))
        dict_records | "Write to Cloud Storage as Avro" >> beam.io.avroio.WriteToAvro(known_args.output,schema=schema_file)
run()

共有1个答案

公西光华
2023-03-14

您需要使用<code>apache_beam.io.gcp。gcsioclass而不是beam.io。ReadFromText只读取本地文件,https://beam.apache.org/documentation/sdks/pydoc/2.6.0/apache_beam.io.gcp.gcsio.html

 类似资料:
  • Google Cloud Bigtable和Google Cloud Datastore/App Engine Datastore有什么区别,主要的实际优势/劣势是什么?AFAIK云数据存储构建在BigTable之上。

  • 我不能得到一个签名的URL与谷歌Cload CDN的URLPrefix一起工作。 https://example.com/foo.mp4?expires=[expersion]&keyname=[KEY_NAME]&SIGNATURE=[SIGNATURE] URLPrefix=AHR0CHM6LY9ZCHLUYWWUCMNMC29MDHDHCMUUAW8VC2FTCGXLLZM2MHAV&Exp

  • 我在我的节点应用程序中使用了@google cloud/secret manager插件,该插件以前托管在google App Engine上。 在我将代码移动到Cloud Run之前,它一直运行良好。我现在收到以下错误:错误: 错误:500未定义:从插件获取元数据失败,错误:无法刷新访问令牌:响应状态代码失败。 下面是我的代码示例: 调试后,运行accessSecretVersion函数时似乎会

  • 我想使用谷歌的无服务器选项之一部署容器化代码。据我所知,谷歌有两种选择: Google App Engine灵活环境 Google Cloud Run(测试版) 我已经看了2019年谷歌下一场演讲,我应该在哪里运行代码?从5个计算选项中选择。我读了Jerry101对“谷歌应用程序引擎和谷歌云运行之间有什么区别?”这一一般性问题的回答。 在我看来,云运行基本上是对使用Google App Engin

  • 问题内容: 我正在尝试从Go连接到云数据存储。我使用了此处提供的示例代码-https: //github.com/GoogleCloudPlatform/gcloud- golang 。 这些是我的代码的相关位: 我始终收到此错误。 我已禁用并重新启用了数据存储区api几次如此处的建议:所有请求都返回403 Unauthorized。我也尝试过删除和添加服务帐户。 (我试图连接我的计算引擎实例数据

  • 我计划在谷歌云平台上为1700多个域(不同的网站)设置HTTP/HTTPS负载平衡(https://cloud.google.com/compute/docs/load-balancing/http/);所有人都将拥有TLS/SSL。但是,每个负载均衡器最多只能添加10个SSL证书,如下所示:https://cloud.google.com/compute/docs/load-balancing/

  • 我创建了一个spring boot项目,它与云SQL(MySQL)连接。我已经在google cloud(cloudrun)中部署了这一功能,目前正在使用中。 现在,我正在尝试在kubernates enigne GKE中部署Spring boot应用程序的相同容器映像,我预计这将与mysql云sql实例连接。但是,我在应用程序POD日志中收到以下错误。 请帮忙。。。解决方案是什么

  • Google Cloud Platform提供云计算服务,可在云环境中运行Spring Boot应用程序。 在本章中,我们将了解如何在GCP应用引擎平台中部署Spring Boot应用程序。 首先,从Spring Initializer页面www.start.spring.io下载Gradle build Spring Boot应用程序。 请注意以下屏幕截图。 现在,在build.gradle文件