我一直在编写一个Apache Beam/Dataflow管道,用于在GCS桶中存储1M+文件的usecase;Bigquery行输出还需要每个文件的路径。每个输入文件都是一个单行json文件。
下面是我当前的管道代码片段:
input_file_path = 'gs://' + BUCKET + '/**'
with beam.Pipeline(options=options) as p:
(p | 'Reading input file' >> beam.io.ReadFromTextWithFilename(input_file_path)
| 'Converting from json to dict' >> beam.ParDo(JSONtoDict())
| 'Write entries into Bigquery' >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
您可以尝试现有的Python转换。有一个已知的问题,数据流作业可能会由于源创建太多分裂而失败。JavaWithIntMatchesmanyFiles
。通过使用pardo
读取来解决这个问题,但它将禁用动态工作重新平衡。Dataflow Runner v2支持大型globs,同时支持动态工作再平衡。
问题内容: 我们正在为IMAP帐户开发基于Java的邮件客户端,并使用最新的Java邮件api(1.5.6)。我们的客户拥有超过400个文件夹的邮件帐户。用户在文件夹上执行检查邮件,并在每个文件夹上进行迭代并获取新消息,例如, 或获取未读邮件的数量过多,这是因为文件夹数量巨大。(我们必须遍历400个文件夹) 为了提高性能,我们在线程中使用了并行工作连接,并且我们有一个SESSION实例,但是每个线
问题内容: 我想将文件/文件组添加到现有数据库中,但是我需要从变量获取路径,因为在此脚本完成后它将有所不同。当我在SQL Management Studio 2008 R2中检查脚本时,它在处返回错误。 如何使用该变量? 脚本将不会从命令行运行! 问题答案: 使用动态SQL:
本文向大家介绍基于文件的数据管理系统,包括了基于文件的数据管理系统的使用技巧和注意事项,需要的朋友参考一下 用于组织和维护数据文件的系统称为基于文件的数据系统。这些文件系统用于处理单个或多个文件,效率不高。 功能性 基于文件的数据管理系统的功能如下- 基于文件的系统有助于任何用户的基本数据管理。 基于文件的系统中存储的数据应保持一致。在基于文件的系统中完成的任何事务都不应更改一致性属性。 基于
问题内容: 我在这篇文章中找到了这个ZipUtils类: 如何使用java压缩文件夹本身 我对其进行了修改,以便可以传递一个zip文件名。但是,它的唯一工作方式是使用硬编码的静态字符串。从数据库中获取zippedFile字符串。我已经将dbZippedFile和hardcodedZippedFile进行了比较,它们都是相同的……也许在FileOutputStream中使用非静态字符串会产生问题吗?
问题内容: 我的文件末尾带有数字,例如: 我正在寻找一种简单的方法来识别附加了最大数字的文件。有没有实现这一目标的简单方法?…我当时正在考虑导入文件夹中的所有文件名,提取最后一位数字,转换为数字,然后寻找最大数字,但是对于我认为这是一个相对常见的任务,这似乎有些复杂。 问题答案: 如果文件名确实以一种很好的方式格式化,则可以简单地使用: 但请注意: 因此,您可能想向函数添加某种验证,和/或使用:
我有一个“数据库选择”和体系结构问题。 用例: 客户端将上载大型。json文件(或其他格式,如.tsv,不相关),其中每一行都是关于其客户的数据(例如姓名、地址等) 我的要求: > 数据库应该有某种复制,因为我们不想丢失数据。 不需要索引,因为我们只是流数据。 对于这个问题,您对数据库有什么建议?我们尝试将其上传到Amazon S3并让他们处理缩放等问题,但存在读取/流式传输缓慢的问题。 谢了伊凡