我正在开发一个从谷歌云存储(GCS)目录中读取约500万个文件的管道。我已经将其配置为在谷歌云数据流上运行。
问题是,当我启动管道时,需要花费数小时“计算”所有文件的大小:
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 5549.38778591156 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 7563.196493148804 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
如你所见,计算大约5.5M文件的大小需要一个半小时(5549秒),然后从头开始!又花了2个小时运行第二遍,然后又启动了第三遍!截至本文撰写之时,该作业在数据流控制台中仍然不可用,这使我相信这一切都发生在我的本地机器上,并且没有利用任何分布式计算。
当我使用较小的输入数据集(2个文件)测试管道时,它会重复大小估计4次:
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.33771586418151855 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.1244659423828125 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.13422417640686035 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.14139890670776367 seconds.
以这种速度,仅执行4次所有5.5M文件的GCS大小估计就需要大约8小时,所有这些都在数据流作业开始之前。
我的管道配置了--runner=DataflowRunner
选项,因此它应该在Dataflow中运行:
python bigquery_import.py --runner=DataflowRunner #other options...
管道从GCS读取如下内容:
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
required=True,
help='Input Cloud Storage directory to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
files = p | beam.io.ReadFromText('gs://project/dir/*.har.gz')
有关完整代码,请参阅GitHub上的bigquery_import.py。
我不明白为什么这个繁琐的过程发生在数据流环境之外,为什么它需要多次执行。我从地面军事系统读取的文件是正确的还是有更有效的方法?
谢谢你的报道。Beam有两个转换用于读取文本。ReadFromText
和ReadAllFromText
。ReadFromText
将遇到此问题,但ReadAllFromText
不应该遇到此问题。
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py#L438
ReadAllFromText
的缺点是它不会执行动态工作重新平衡,但在读取大量文件时,这应该不是问题。
创建https://issues.apache.org/jira/browse/BEAM-9620用于跟踪ReadFromText(通常是基于文件的源)的问题。
问题内容: 结果是 在哪里重复,因此哈希函数无法按预期工作。我将如何覆盖String数组的Hash方法。或就此而言,通用数组?有没有更好的方法来完成我要做的事情? 问题答案: 你不能 数组使用默认的基于身份的Object.hashCode()实现,无法覆盖它。不要在HashMap / HashSet中将数组用作键! 请改用一组列表。
我正在开发一个连接到Firebase Cloud Firestore和存储的Flutter应用程序,我想在Cloud Firestore中创建包含存储中文件的下载URL的文档。但是,我还需要文件名是同一个Firestore文档的唯一ID。现在,我在创建文档后上传文件,但之后更新Firestore文档需要额外的写操作。 其中是一个将文件上载到存储的函数,并可能返回DownloadURL。我希望能够像
packagingOptions{exclude“meta-inf/manifest.mf”exclude“meta-inf/asl2.0”exclude“meta-inf/license.txt”exclude“meta-inf/license”exclude“meta-inf/notice.txt”exclude“meta-inf/license”exclude“meta-inf/depend
我试图设置一个谷歌云存储桶,以便我上传的任何文件都自动gzip,并设置“content-encoding:gzip”。 我尝试了“gsutil defacl set public-read gs://Bucket”,其基础是将Google Cloud Storage Bucket中的所有文件默认设置为public,但没有成功。 有什么想法吗?
问题内容: 我正在尝试创建一个文件下载程序作为后台服务,但是当计划了一个大文件时,首先将其放入内存中,然后在下载结束时将文件写入磁盘。 考虑到我可能同时下载许多文件,如何使文件逐渐写入磁盘保留内存? 这是我使用的代码: 问题答案: 我将回调更改为: 这工作得很好。
我正在上传图片,一切工作都很好,但我有100张图片,我想在我的中显示所有的图片,当我在一个文件夹中获得图片的完整列表时,我找不到任何API。