我想每小时设置一个管道,以解析 GCS 存储桶不同文件夹中的 2000 个原始 protobuf 格式文件,并将数据加载到大查询中。到目前为止,我能够成功解析原型数据。
我知道读取文件夹中所有文件的通配符方法,但我现在不想这样做,因为我有来自不同文件夹的数据,我想像并行一样更快地运行,而不是以顺序方式运行
像下面一样
for x,filename enumerate(file_separted_comma):
--read data from prto
--load data to bigquery
现在我想知道以下方法是否是解析 apache beam 中不同文件夹中的多个文件并将数据加载到大查询中的最佳或推荐方法。
还有一件事,从proto解析后的每条记录,我正在将其制作成JSON记录以加载到大查询中,并且不知道这也是将数据加载到大查询而不是直接加载反序列化(解析)原型数据的好方法。
我正在从Hadoop工作转移到数据流,以通过建立这条管道来降低成本。
我是apache-beam新手,不知道什么是缺点
import time
import sys
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import csv
import base64
import rtbtracker_log_pb2
from google.protobuf import timestamp_pb2
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import MessageToJson
import io
from apache_beam.io.filesystems import FileSystems
def get_deserialized_log(serialized_log):
log = rtbtracker_log_pb2.RtbTrackerLogProto()
log.ParseFromString(serialized_log)
return log
def print_row(message):
message=message[3]
message = message.replace('_', '/');
message = message.replace('*', '=');
message = message.replace('-', '+');
#finalbunary=base64.b64decode(message.decode('UTF-8'))
finalbunary=base64.b64decode(message)
msg=get_deserialized_log(finalbunary)
jsonObj = MessageToDict(msg)
#jsonObj = MessageToJson(msg)
return jsonObj
def parse_file(element):
for line in csv.reader([element], quotechar='"', delimiter='\t', quoting=csv.QUOTE_ALL, skipinitialspace=True):
return line
def run():
parser = argparse.ArgumentParser()
parser.add_argument("--input", dest="input", required=False)
parser.add_argument("--output", dest="output", required=False)
app_args, pipeline_args = parser. parse_known_args()
with beam.Pipeline(options=PipelineOptions()) as p:
input_list=app_args.input
file_list = input_list.split(",")
res_list = ["/home/file_{}-00000-of-00001.json".format(i) for i in range(len(file_list))]
for i,file in enumerate(file_list):
onesec=p | "Read Text {}".format(i) >> beam.io.textio.ReadFromText(file)
parsingProtoFile=onesec | 'Parse file{}'.format(i) >> beam.Map(parse_file)
printFileConetent=parsingProtoFile | 'Print output {}'.format(i) >>beam.Map(print_row)
#i want to load to bigquery here
##LOAD DATA TO BIGQUERY
#secondsec=printFileConetent | "Write TExt {}".format(i) >> ##beam.io.WriteToText("/home/file_{}".format(i),file_name_suffix=".json",
###num_shards=1 ,
##append_trailing_newlines = True)
if __name__ == '__main__':
run()
在本地运行下面的代码
python3 another_main.py --input=tracker_one.gz,tracker_two.gz
输出路径我没有确定,因为我不想将数据保存到 GCS,因为我会将其加载到 bigquery 中
并且像下面这样在dataflowrunner中运行
python3 final_beam_v1.py --input gs://bucket/folder/2020/12/23/00/00/fileread.gz --output gs://bucket/beamoutput_four/ --runner DataflowRunner --project PROJECT --staging_location gs://bucket/staging_four --temp_location gs://bucket/temp_four --region us-east1 --setup_file ./setup.py --job_name testing
这种读取文件的方法很好(只要输入文件的数量不是太大)。但是,如果您可以将要读取的文件集表示为通配符表达式(可以与多个文件夹匹配),则可能会表现得更好,Dataflow将并行读取所有与模式匹配的文件。
对于写入BigQuery,最好使用内置的BigQuery接收器。默认行为是创建JSON格式的临时文件,然后将其加载到BigQuery中,但也可以改用Avro,这会更有效。您还可以使用Flatten将所有输入组合到一个PCollection中,这样您的管道中只需要一个BigQuery接收器。
我有一个处理CSV文件并返回一些分析的应用程序。我的用户将文件存储在GCP云存储桶中,我希望他们能够向我传递一个存储桶URL和一些身份验证令牌/签名URL,然后应用程序将下载文件并根据需要解析它们。 阅读GCP留档时,我遇到了以下gsutil命令: 这正是我所需要的,但是我正在通过一些REST API HTTP请求寻找同样的功能。我肯定有这样的东西存在,但似乎找不到。或者,如果我可以“列出”一个存
问题内容: 我想知道如何从单个文件夹中读取多个文件(无需指定文件名,只是它们是json文件)。 另外,有可能将它们转换为DataFrame吗? 能给我一个基本的例子吗? 问题答案: 一种选择是使用os.listdir列出目录中的所有文件,然后仅查找以’.json’结尾的文件: 现在,您可以使用pandas DataFrame.from_dict将json(此时为python字典)读入pandas数
每个上传类别有多个s3桶,还是一个带子文件夹的桶,还是一个链接的s3桶更好?我确信用户图像会比剖析图片多,每个桶有5TB的限制,每个账户有100个桶。我正在使用aws boto库和https://github.com/amol-/depot 我的文件夹的结构是以下哪种方式? 最后一个意味着它实际上是一个10TB的存储桶,当bucket_1中的文件超过5TB时,就会创建一个新的存储桶。但所有上传的内
我正在使用Spring引导连接到谷歌云存储。 我能够连接到一个文件,并从谷歌云存储桶中读取内容。 但是我不能得到谷歌云存储桶中所有文件的列表。 请帮帮我。 这很有效 这不工作。 有什么原因吗?
我是Google Cloud Platform的新手。我已经在datalab上训练了我的模型,并将模型文件夹保存在云存储中。我可以通过右键单击文件将桶中的现有文件下载到本地计算机-
问题内容: 我对此进行了一些讨论,但还不太了解正确的解决方案:我想将S3中的数百个文件加载到RDD中。这是我现在的做法: 在不使用实际的阅读客户端: 我从在Scala中针对相同问题看到的答案中“翻译”了一下。我认为也可以将整个路径列表传递给,但是我不确定哪种是最佳做法。 问题答案: 根本的问题是,在s3中列出对象的速度确实很慢,并且每当执行树遍历时,看起来像目录树的方式都会降低性能(就像路径的通配
问题内容: 到目前为止,我们有一个项目结构,其中包含名为的单个源文件夹,其中包含三个模块的源代码。我想做的是: 1)编译源代码。这可以通过sourceSets定义轻松完成: 2)将编译结果放入三个jar中。我通过三个“ jar”类型的任务来做到这一点: 我现在通过三个单独的任务来执行此操作: util.jar } client.jar } server.jar } 问题是应该包含and 中不包含的