当前位置: 首页 > 知识库问答 >
问题:

如何从GCP bucket中读取Apache Beam中的多个文件

汝宏伯
2023-03-14

我试图用Apache Beam阅读并应用一些子集在GCP中的多个文件上。我准备了两个管道,它们只适用于一个文件,但当我在多个文件上尝试它们时失败了。除此之外,如果可能的话,我会很方便地将我的管道组合成一个,或者有一种方法来编排它们,以便它们有序地工作。现在管道在本地工作,但我的最终目标是用Dataflow运行它们。

def toJson(file):
    with open(file) as f:
        return json.load(f)


 with beam.Pipeline(options=PipelineOptions()) as p:
       files = (p
        | beam.io.textio.ReadFromText("gs://my_bucket/file1.txt.gz", skip_header_lines = 0)
        | beam.io.WriteToText("/home/test",
                   file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))

 with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p  
            | 'read_data' >> beam.Create(['test-00000-of-00001.json'])
            | "toJson" >> beam.Map(toJson)
            | "takeItems" >> beam.FlatMap(lambda line: line["Items"])
            | "takeSubjects" >> beam.FlatMap(lambda line: line['data']['subjects'])
            | beam.combiners.Count.PerElement()
            | beam.io.WriteToText("/home/items",
                   file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))

共有1个答案

张高义
2023-03-14

我解决了如何使它工作于多个文件,但不能使它在一个管道内运行。我使用循环,然后波束。平坦选项。

以下是我的解决方案:

file_list = ["gs://my_bucket/file*.txt.gz"]
res_list = ["/home/subject_test_{}-00000-of-00001.json".format(i) for i in range(len(file_list))]

with beam.Pipeline(options=PipelineOptions()) as p:
    for i,file in enumerate(file_list):
       (p 
        | "Read Text {}".format(i) >> beam.io.textio.ReadFromText(file, skip_header_lines = 0)
        | "Write TExt {}".format(i) >> beam.io.WriteToText("/home/subject_test_{}".format(i),
                   file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))

pcols = []
with beam.Pipeline(options=PipelineOptions()) as p:
   for i,res in enumerate(res_list):
         pcol = (p   | 'read_data_{}'.format(i) >> beam.Create([res])
            | "toJson_{}".format(i) >> beam.Map(toJson)
            | "takeItems_{}".format(i) >> beam.FlatMap(lambda line: line["Items"])
            | "takeSubjects_{}".format(i) >> beam.FlatMap(lambda line: line['data']['subjects']))
        pcols.append(pcol)
   out = (pcols
    | beam.Flatten()
    | beam.combiners.Count.PerElement()
    | beam.io.WriteToText("/home/items",
                   file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))
 类似资料:
  • 问题内容: 我有许多文本文件,希望将它们放入for循环中。 我从上一个活动中获得了一个具有资源名称的Extra,并且有一个数组,其原始资源中的文本文件的资源名称来自{d0,d1,d2,d3,…,d79},我想检查名称和数组名称,然后将查找名称放入资源!我的代码(res = R.raw。(d [i]))出现错误: 问题答案: 您可以使用getIdentifier(字符串名称,字符串defType,字

  • 问题内容: 我知道如何读取字节,但是如何在Python中读取位? 我只需要从二进制文件中读取5位(而不是8位[1字节]) 有什么想法或方法吗? 问题答案: Python一次只能读取一个字节。您需要读完整的字节,然后从该字节中提取所需的值,例如 或者,如果您想要5个最低有效位,而不是5个最高有效位: 一些其他有用的位操作信息可以在这里找到:http : //wiki.python.org/moin/

  • 我正在使用python,我有一个文件(

  • 问题内容: 我想知道如何从单个文件夹中读取多个文件(无需指定文件名,只是它们是json文件)。 另外,有可能将它们转换为DataFrame吗? 能给我一个基本的例子吗? 问题答案: 一种选择是使用os.listdir列出目录中的所有文件,然后仅查找以’.json’结尾的文件: 现在,您可以使用pandas DataFrame.from_dict将json(此时为python字典)读入pandas数

  • 问题内容: 我想读取一个文件,该文件位于类路径中所包含的 之一内。我如何读取其中包含的任何文件? 问题答案: 如果要从应用程序内部读取该文件,请使用: 路径以“ /”开头,但这不是文件系统中的路径,而是类路径中的路径。因此,如果你的文件位于类路径“ org.xml”中,并且名为myxml.xml,则路径类似于“ /org/xml/myxml.xml”。 InputStream读取文件的内容。如果需

  • 我有一个Azure Functions应用程序(Python),其中我必须读取存储在Azure存储帐户(StorageV2)中的多个CSV文件来验证它们。 但是,此文件夹中的文件名和CSV文件数量会随时间变化。应用程序使用HTTP绑定触发,最好动态检查文件夹的内容,然后依次处理文件夹中的所有CSV文件。 从文档中可以看出,Azure函数似乎对输入和输出使用绑定,然而,示例中只显示了指向单个文件的(