我试图用Apache Beam阅读并应用一些子集在GCP中的多个文件上。我准备了两个管道,它们只适用于一个文件,但当我在多个文件上尝试它们时失败了。除此之外,如果可能的话,我会很方便地将我的管道组合成一个,或者有一种方法来编排它们,以便它们有序地工作。现在管道在本地工作,但我的最终目标是用Dataflow运行它们。
def toJson(file):
with open(file) as f:
return json.load(f)
with beam.Pipeline(options=PipelineOptions()) as p:
files = (p
| beam.io.textio.ReadFromText("gs://my_bucket/file1.txt.gz", skip_header_lines = 0)
| beam.io.WriteToText("/home/test",
file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| 'read_data' >> beam.Create(['test-00000-of-00001.json'])
| "toJson" >> beam.Map(toJson)
| "takeItems" >> beam.FlatMap(lambda line: line["Items"])
| "takeSubjects" >> beam.FlatMap(lambda line: line['data']['subjects'])
| beam.combiners.Count.PerElement()
| beam.io.WriteToText("/home/items",
file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))
我解决了如何使它工作于多个文件,但不能使它在一个管道内运行。我使用循环,然后波束。平坦选项。
以下是我的解决方案:
file_list = ["gs://my_bucket/file*.txt.gz"]
res_list = ["/home/subject_test_{}-00000-of-00001.json".format(i) for i in range(len(file_list))]
with beam.Pipeline(options=PipelineOptions()) as p:
for i,file in enumerate(file_list):
(p
| "Read Text {}".format(i) >> beam.io.textio.ReadFromText(file, skip_header_lines = 0)
| "Write TExt {}".format(i) >> beam.io.WriteToText("/home/subject_test_{}".format(i),
file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))
pcols = []
with beam.Pipeline(options=PipelineOptions()) as p:
for i,res in enumerate(res_list):
pcol = (p | 'read_data_{}'.format(i) >> beam.Create([res])
| "toJson_{}".format(i) >> beam.Map(toJson)
| "takeItems_{}".format(i) >> beam.FlatMap(lambda line: line["Items"])
| "takeSubjects_{}".format(i) >> beam.FlatMap(lambda line: line['data']['subjects']))
pcols.append(pcol)
out = (pcols
| beam.Flatten()
| beam.combiners.Count.PerElement()
| beam.io.WriteToText("/home/items",
file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))
问题内容: 我有许多文本文件,希望将它们放入for循环中。 我从上一个活动中获得了一个具有资源名称的Extra,并且有一个数组,其原始资源中的文本文件的资源名称来自{d0,d1,d2,d3,…,d79},我想检查名称和数组名称,然后将查找名称放入资源!我的代码(res = R.raw。(d [i]))出现错误: 问题答案: 您可以使用getIdentifier(字符串名称,字符串defType,字
问题内容: 我知道如何读取字节,但是如何在Python中读取位? 我只需要从二进制文件中读取5位(而不是8位[1字节]) 有什么想法或方法吗? 问题答案: Python一次只能读取一个字节。您需要读完整的字节,然后从该字节中提取所需的值,例如 或者,如果您想要5个最低有效位,而不是5个最高有效位: 一些其他有用的位操作信息可以在这里找到:http : //wiki.python.org/moin/
我正在使用python,我有一个文件(
问题内容: 我想知道如何从单个文件夹中读取多个文件(无需指定文件名,只是它们是json文件)。 另外,有可能将它们转换为DataFrame吗? 能给我一个基本的例子吗? 问题答案: 一种选择是使用os.listdir列出目录中的所有文件,然后仅查找以’.json’结尾的文件: 现在,您可以使用pandas DataFrame.from_dict将json(此时为python字典)读入pandas数
问题内容: 我想读取一个文件,该文件位于类路径中所包含的 之一内。我如何读取其中包含的任何文件? 问题答案: 如果要从应用程序内部读取该文件,请使用: 路径以“ /”开头,但这不是文件系统中的路径,而是类路径中的路径。因此,如果你的文件位于类路径“ org.xml”中,并且名为myxml.xml,则路径类似于“ /org/xml/myxml.xml”。 InputStream读取文件的内容。如果需
我有一个Azure Functions应用程序(Python),其中我必须读取存储在Azure存储帐户(StorageV2)中的多个CSV文件来验证它们。 但是,此文件夹中的文件名和CSV文件数量会随时间变化。应用程序使用HTTP绑定触发,最好动态检查文件夹的内容,然后依次处理文件夹中的所有CSV文件。 从文档中可以看出,Azure函数似乎对输入和输出使用绑定,然而,示例中只显示了指向单个文件的(