使用Google-Cloud-data flow/Cloud Composer将CSV转换为Avro,一切都可以在我的本地环境中运行。当试图读取。avsc文件,它包含来自云存储桶的Avro模式,我一直得到:IOError: [Errno 2]没有这样的文件或目录:“gs://my-bucket/xxx.avsc”
法典:
from __future__ import absolute_import
import argparse
import logging
import ntpath
import avro.schema
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import SetupOptions
from datetime import datetime
class RowTransformer(object):
def __init__(self, delimiter, header, filename):
self.delimiter = delimiter
self.keys = re.split(',', header)
self.filename = filename
def parse(self, row):
self.load_dt = datetime.utcnow()
split_row = row.split(self.delimiter)
#Need to cast anything that is not a string into proper type
split_row[8] = float('0' if not split_row[8] else split_row[8])
split_row[9] = float('0' if not split_row[9] else split_row[9])
split_row[10] = float('0' if not split_row[10] else split_row[10])
split_row[11] = float('0' if not split_row[11] else split_row[11])
split_row[12] = float('0' if not split_row[12] else split_row[12])
split_row[13] = float('0' if not split_row[13] else split_row[13])
split_row[14] = float('0' if not split_row[14] else split_row[14])
split_row[15] = float('0' if not split_row[15] else split_row[15])
split_row[16] = float('0' if not split_row[16] else split_row[16])
split_row[17] = float('0' if not split_row[17] else split_row[17])
split_row[18] = str('0' if not split_row[18] else split_row[18])
split_row[19] = str('0' if not split_row[19] else split_row[19])
split_row.append(self.filename)
split_row.append(self.load_dt.strftime('%Y-%m-%d %H:%M:%S.%f'))
decode_row = [i.decode('UTF-8') if isinstance(i, basestring) else i for i in split_row]
row = dict(zip(self.keys, decode_row))
return row
def run(argv=None):
"""The main function which creates the pipeline and runs it."""
parser = argparse.ArgumentParser()
parser.add_argument('--input', dest='input', required=False,
help='Input file to read. This can be a local file or '
'a file in a Google Storage Bucket.',
default='gs://my-bucket/receive/xxx.txt')
parser.add_argument('--output', dest='output', required=False,
help='Output Avro to Cloud Storage',
default='gs://my-bucket/')
parser.add_argument('--schema', dest='schema', required=False,
help='Avro Schema',
default='gs://my-bucket/xxx.avsc')
parser.add_argument('--delimiter', dest='delimiter', required=False,
help='Delimiter to split input records.',
default='|')
parser.add_argument('--fields', dest='fields', required=False,
help='list of field names expected',
default='Col1,Col2...etc')
known_args, pipeline_args = parser.parse_known_args(argv)
row_transformer = RowTransformer(delimiter=known_args.delimiter,
header=known_args.fields,
filename=ntpath.basename(known_args.input))
p_opts = pipeline_options.PipelineOptions(pipeline_args)
with beam.Pipeline(options=p_opts) as pipeline:
schema_file = avro.schema.parse(open(known_args.schema, "rb").read())
rows = pipeline | "Read from text file" >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
dict_records = rows | "Convert to Avro" >> beam.Map(lambda r: row_transformer.parse(r))
dict_records | "Write to Cloud Storage as Avro" >> beam.io.avroio.WriteToAvro(known_args.output,schema=schema_file)
run()
您需要使用<code>apache_beam.io.gcp。gcsioclass而不是beam.io。ReadFromText
只读取本地文件,https://beam.apache.org/documentation/sdks/pydoc/2.6.0/apache_beam.io.gcp.gcsio.html
Google Cloud Bigtable和Google Cloud Datastore/App Engine Datastore有什么区别,主要的实际优势/劣势是什么?AFAIK云数据存储构建在BigTable之上。
我不能得到一个签名的URL与谷歌Cload CDN的URLPrefix一起工作。 https://example.com/foo.mp4?expires=[expersion]&keyname=[KEY_NAME]&SIGNATURE=[SIGNATURE] URLPrefix=AHR0CHM6LY9ZCHLUYWWUCMNMC29MDHDHCMUUAW8VC2FTCGXLLZM2MHAV&Exp
我在我的节点应用程序中使用了@google cloud/secret manager插件,该插件以前托管在google App Engine上。 在我将代码移动到Cloud Run之前,它一直运行良好。我现在收到以下错误:错误: 错误:500未定义:从插件获取元数据失败,错误:无法刷新访问令牌:响应状态代码失败。 下面是我的代码示例: 调试后,运行accessSecretVersion函数时似乎会
我想使用谷歌的无服务器选项之一部署容器化代码。据我所知,谷歌有两种选择: Google App Engine灵活环境 Google Cloud Run(测试版) 我已经看了2019年谷歌下一场演讲,我应该在哪里运行代码?从5个计算选项中选择。我读了Jerry101对“谷歌应用程序引擎和谷歌云运行之间有什么区别?”这一一般性问题的回答。 在我看来,云运行基本上是对使用Google App Engin
问题内容: 我正在尝试从Go连接到云数据存储。我使用了此处提供的示例代码-https: //github.com/GoogleCloudPlatform/gcloud- golang 。 这些是我的代码的相关位: 我始终收到此错误。 我已禁用并重新启用了数据存储区api几次如此处的建议:所有请求都返回403 Unauthorized。我也尝试过删除和添加服务帐户。 (我试图连接我的计算引擎实例数据
我计划在谷歌云平台上为1700多个域(不同的网站)设置HTTP/HTTPS负载平衡(https://cloud.google.com/compute/docs/load-balancing/http/);所有人都将拥有TLS/SSL。但是,每个负载均衡器最多只能添加10个SSL证书,如下所示:https://cloud.google.com/compute/docs/load-balancing/
我创建了一个spring boot项目,它与云SQL(MySQL)连接。我已经在google cloud(cloudrun)中部署了这一功能,目前正在使用中。 现在,我正在尝试在kubernates enigne GKE中部署Spring boot应用程序的相同容器映像,我预计这将与mysql云sql实例连接。但是,我在应用程序POD日志中收到以下错误。 请帮忙。。。解决方案是什么
Google Cloud Platform提供云计算服务,可在云环境中运行Spring Boot应用程序。 在本章中,我们将了解如何在GCP应用引擎平台中部署Spring Boot应用程序。 首先,从Spring Initializer页面www.start.spring.io下载Gradle build Spring Boot应用程序。 请注意以下屏幕截图。 现在,在build.gradle文件