当前位置: 首页 > 知识库问答 >
问题:

Google DataFlow:在流管道中的BigQuery中插入+更新

葛志国
2023-03-14

分析输入后,有两个选项可用:

  • 如果x=1->插入
  • 如果x=2->更新

测试

    null
     UPDATE or DELETE statement over table table would affect rows in the streaming buffer, which is not supported
def insertCanonicalBQ(input):
    from google.cloud import bigquery
    client = bigquery.Client(project='project')
    dataset = client.dataset('dataset')
    table = dataset.table('table' )
    table.reload()
    table.insert_data(
        rows=[[values]])
def UpdateBQ(input):
    from google.cloud import bigquery
    import uuid
    import time
    client = bigquery.Client()
    STD= "#standardSQL"
    QUERY= STD + "\n" + """UPDATE table SET field1 = 'XXX' WHERE field2=  'YYY'"""
    client.use_legacy_sql = False    
    query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4()))  # API request
    query_job.begin()
    while True:
         query_job.reload()  # Refreshes the state via a GET request.
         if query_job.state == 'DONE':
             if query_job.error_result:
                 raise RuntimeError(query_job.errors)
             print "done"
             return input
             time.sleep(1)

共有1个答案

芮立果
2023-03-14

即使该行不在流缓冲区中,这仍然不是BigQuery中解决这个问题的方法。BigQuery存储更适合于批量突变,而不是像这样通过update突变单个实体。您的模式与我期望的事务性用例而不是分析性用例一致。

为此考虑一个基于追加的模式。每次处理实体消息时,通过流式插入将其写入BigQuery。然后,在需要时,您可以通过查询获得所有实体的最新版本。

例如,我们假设一个任意模式:IDField是您唯一的实体键/标识符,message_time表示消息发出的时间。您的实体可能有许多其他字段。要获取实体的最新版本,我们可以运行以下查询(并可能将其写入另一个表):

#standardSQL
SELECT
  idfield,
  ARRAY_AGG(
    t ORDER BY message_time DESC LIMIT 1
  )[OFFSET(0)].* EXCEPT (idfield)
FROM `myproject.mydata.mytable` AS t
GROUP BY idfield
 类似资料:
  • 想象一个简单的Google数据流管道。在这个管道中,您使用apache beam函数从BQ读取数据,并根据返回的pcollection更新这些行 该管道的问题是,当您读取表(beam.map)时,将对返回的pcollection中的每个项执行UpdateBQ 可能的解决办法

  • 考虑以下设置: 发布/订阅 数据流:用于验证发布/订阅、解包和写入BigQuery的事件的流作业 BigQuery 我们在通过Datafow管道的有效事件上有计数器,并观察到计数器高于发布/订阅中可用的事件量。 注意:我们似乎在BigQuery中也看到了重复项,但我们仍在调查中。 在数据流日志中可以观察到以下错误: 请注意,数据流作业是在发布/订阅中已有数百万条消息等待时启动的。 问题: 这是否会

  • 问题内容: 有没有人有创建Java中的管道对象,任何好的建议 是 从Java既是一个InputStream和和OutputStream没有多重继承和两个流是抽象类,而不是接口? 基本需求是有一个可以传递给需要InputStream或OutputStream的对象的对象,该对象需要将一个线程的输出传递给另一个线程的输入。 问题答案: 看来这个问题的重点已被遗漏。如果我对您的理解正确,那么您希望一个对

  • 我有一个简单的流程,目的是在一个BigQuery表中写两行。我使用动态目标,因为之后我将在多个表上写,在那个例子中是同一个表...问题是我的BigQuery表最后只有一行。在第二次插入时,我看到以下错误 "状态:{code: 6 消息:"已存在:作业sampleProject et3:b9912b9b05794aec8f4292b2ae493612_eeb0082ade6f4a58a14753d1

  • 问题内容: 我有一个不同阶段的管道。我希望当前作业检查上一个版本中经过了多少个阶段并将其记录在控制台中? 考虑这是我当前的管道 我想要一个时髦的脚本给我这样的东西 我的代码的目的是跟踪构建过程中不同阶段的成功与失败。有没有其他替代方法? 问题答案: 您绝对可以使用Pipeline REST API插件,对我来说,Jenkins 2.13 可以直接使用它。 通过解析结果JSON,您可以获得与您期望的

  • 当我执行这段代码时,它会在流管道中打开许多文件: 我得到一个例外: 问题是流。当完成对流的遍历时,count不关闭流。但我不明白为什么不应该,因为这是一个终端操作。对于其他终端操作,如和,也是如此<另一方面,代码>平面图关闭它所包含的流。 文档告诉我在必要时使用try with resources语句来关闭流。在我的例子中,我可以用以下内容替换计数行: 但这是嘈杂和丑陋的,在某些情况下,对于大型复