我有一个数据流工作,将单个文件分割成x个记录(表)。这些流在bigQuery没有问题。
不过,我发现没有办法在结果出来后执行管道中的另一个阶段。
举个例子
# Collection1- filtered on first two characters = 95
collection1 = (
rows | 'Build pCollection1' >> beam.Filter(lambda s: data_ingestion.filterRowCollection(s, '95'))
| 'p1 Entities to JSON' >> beam.Map(lambda s: data_ingestion.SplitRowDict(s, '95'))
| 'Load p1 to BIGQUERY' >> beam.io.WriteToBigQuery(
data_ingestion.spec1,
schema=parse_table_schema_from_json(data_ingestion.getBqSchema('95')),
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) # Write to Bigquery
)
# Collection2 - filtered on first two characters = 99
collection2 = (
rows | 'Build pCollection2' >> beam.Filter(lambda s: data_ingestion.filterRowCollection(s, '99'))
| 'p2 Split Entities to JSON' >> beam.Map(lambda s: data_ingestion.SplitRowDict(s, '99'))
| 'Load p2 to BIGQUERY' >> beam.io.WriteToBigQuery(
data_ingestion.spec2,
schema=parse_table_schema_from_json(data_ingestion.getBqSchema('99')),
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) # Write to Bigquery)
根据上述内容,我希望运行以下内容:
final_output = (
collection1, collection2
| 'Log Completion' >> beam.io.WriteToPubSub('<topic>'))
是有无论如何运行管道的另一个部分后,up到bigQuery或这是不可能的?提前感谢。
从技术上讲,没有办法完全按照你的要求去做。beam.io.WriteToBigQuery
消耗pCollection
,不留下任何内容。
然而,将输入复制到beam很简单。木卫一。在调用
,并沿每个路径发送您的pCollection副本。请参阅此答案,其中引用了文档中的此示例beam之前,在
。木卫一。WriteToBigqueryparDo
中写入bigquerydoFn
我有一个批次处理作业在数据流运行在gcp下版本apache-梁[gcp]==2.19.0的数据流运行。我为作业创建了一个自定义模板。作业正在按预期运行,但我还想添加最大作业持续时间。我在wait_until_finish()方法中找到了持续时间(毫秒)参数,它应该是可用的。问题是:如何让模板化批处理作业在运行时间超过持续时间时自动停止?我不需要保存任何数据,我只希望工作运行时间过长时停止。我已经实
我试图在Apache Beam中使用BigtableIO的运行时参数来写入BigTable。 我创建了一个从 BigQuery 读取并写入 Bigtable 的管道。当我提供静态参数时,管道工作正常(使用 ConfigBigtableIO 和 ConfigBigtableConfiguration,请参阅此处的示例 - https://github.com/GoogleCloudPlatform/