当前位置: 首页 > 知识库问答 >
问题:

数据流:使用python管道更新BigQuery行

景英杰
2023-03-14

想象一个简单的Google数据流管道。在这个管道中,您使用apache beam函数从BQ读取数据,并根据返回的pcollection更新这些行

Journeys = (p
                    | 'Read from BQ' >> beam.io.Read(
                    beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True)))

Update = ( Journeys
                   | 'Updating Journey Table' >> beam.Map(UpdateBQ))

Write = (Journeys
                    | 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))

该管道的问题是,当您读取表(beam.map)时,将对返回的pcollection中的每个项执行UpdateBQ

def UpdateBQ(input):
    from google.cloud import bigquery
    import uuid
    import time
    client = bigquery.Client()
    STD = "#standardSQL"
    QUERY = STD + "\n" + """UPDATE table SET Field= 'YYY' WHERE Field2='XXX'"""
    client.use_legacy_sql = False
    query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4()))  # API request
    query_job.begin()
    <...>

可能的解决办法

with beam.Pipeline(options=options) as p:
    Journeys = (p
                | 'Read from BQ' >> beam.io.Read(
                beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True))
                )

    Write = (Journeys
                | 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))


UpdateBQ();

共有1个答案

濮阳振
2023-03-14

在阅读完BQ后,你是否在使用光束管道进行进一步的改造?或者它只是您在代码中显示的方式,即从BQ中读取,然后在BQ中启动更新命令?在这种情况下,您根本不需要beam。只需使用BQ查询更新使用另一个表的表中的数据。BQ最佳实践建议避免一次插入/更新单行。

 类似资料:
  • 分析输入后,有两个选项可用: 如果x=1->插入 如果x=2->更新 测试 null

  • 我有一个python中的ApacheBeam管道,不管出于什么原因,它都有下面这样的流。 SQL作业-- 当我在本地运行此程序时,此序列工作正常。然而,当我试图将其作为数据流管道运行时,它实际上并没有按此顺序运行。 在数据流上运行时是否有强制依赖关系的方法?

  • 考虑以下设置: 发布/订阅 数据流:用于验证发布/订阅、解包和写入BigQuery的事件的流作业 BigQuery 我们在通过Datafow管道的有效事件上有计数器,并观察到计数器高于发布/订阅中可用的事件量。 注意:我们似乎在BigQuery中也看到了重复项,但我们仍在调查中。 在数据流日志中可以观察到以下错误: 请注意,数据流作业是在发布/订阅中已有数百万条消息等待时启动的。 问题: 这是否会

  • 如何使用带有DataflowRunner的apache光束从Google BigQuery数据集获取表列表? 我找不到如何从指定的数据集中获取表。我想使用数据流的并行处理编程模型将表从位于美国的数据集迁移到位于欧盟的数据集。

  • 只是想知道新版本(3.X)的python是否提供了更多的管道I/O和运行时参数。如果我是正确的,那么当前ApacheBeam只提供基于文件的IOs:使用python时提供textio、avroio、tfrecordio。但在Java中,我们有更多的选项,如基于文件的IOs、BigQueryIO、BigtableIO、PubSubIO和SpanRio。 在我的需求中,我想使用Python 3在GCP

  • 我正在尝试使用云数据流运行器从我的beam管道中的bigquery读取数据。我想提供访问项目的凭据。 我见过Java中的例子,但没有Python中的例子。 有什么想法吗?