当前位置: 首页 > 知识库问答 >
问题:

apachebeam-bigqueryupsert

杨飞
2023-03-14

我有一个数据流工作,将单个文件分割成x个记录(表)。这些流在bigQuery没有问题。

不过,我发现没有办法在结果出来后执行管道中的另一个阶段。

举个例子

# Collection1- filtered on first two characters = 95
collection1 = (
    rows    | 'Build pCollection1' >> beam.Filter(lambda s: data_ingestion.filterRowCollection(s, '95'))
            | 'p1 Entities to JSON' >> beam.Map(lambda s: data_ingestion.SplitRowDict(s, '95'))
            | 'Load p1 to BIGQUERY' >> beam.io.WriteToBigQuery(
                    data_ingestion.spec1,
                    schema=parse_table_schema_from_json(data_ingestion.getBqSchema('95')),
                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) # Write to Bigquery
            )

# Collection2 - filtered on first two characters = 99
collection2 = (
    rows    | 'Build pCollection2' >> beam.Filter(lambda s: data_ingestion.filterRowCollection(s, '99'))
            | 'p2 Split Entities to JSON' >> beam.Map(lambda s: data_ingestion.SplitRowDict(s, '99'))
            | 'Load p2 to BIGQUERY' >> beam.io.WriteToBigQuery(
                    data_ingestion.spec2,
                    schema=parse_table_schema_from_json(data_ingestion.getBqSchema('99')),
                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) # Write to Bigquery)

根据上述内容,我希望运行以下内容:

final_output = (
    collection1, collection2
       | 'Log Completion' >> beam.io.WriteToPubSub('<topic>'))

是有无论如何运行管道的另一个部分后,up到bigQuery或这是不可能的?提前感谢。

共有1个答案

詹钊
2023-03-14

从技术上讲,没有办法完全按照你的要求去做。beam.io.WriteToBigQuery消耗pCollection,不留下任何内容。

然而,将输入复制到beam很简单。木卫一。在调用beam之前,在parDo中写入bigquery。木卫一。WriteToBigquery,并沿每个路径发送您的pCollection副本。请参阅此答案,其中引用了文档中的此示例doFn

 类似资料:
  • 我有一个批次处理作业在数据流运行在gcp下版本apache-梁[gcp]==2.19.0的数据流运行。我为作业创建了一个自定义模板。作业正在按预期运行,但我还想添加最大作业持续时间。我在wait_until_finish()方法中找到了持续时间(毫秒)参数,它应该是可用的。问题是:如何让模板化批处理作业在运行时间超过持续时间时自动停止?我不需要保存任何数据,我只希望工作运行时间过长时停止。我已经实

  • 我试图在Apache Beam中使用BigtableIO的运行时参数来写入BigTable。 我创建了一个从 BigQuery 读取并写入 Bigtable 的管道。当我提供静态参数时,管道工作正常(使用 ConfigBigtableIO 和 ConfigBigtableConfiguration,请参阅此处的示例 - https://github.com/GoogleCloudPlatform/

相关问答

相关文章

相关阅读