当前位置: 首页 > 知识库问答 >
问题:

从Dataflow(Python)向BigQuery写入嵌套模式

督灿
2023-03-14

我有一个数据流作业要写入BigQuery。它对于非嵌套模式很好,但是对于嵌套模式却失败了。

下面是我的数据流管道:

pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions)

  schema = 'url: STRING,' \
           'ua: STRING,' \
           'method: STRING,' \
           'man: RECORD,' \
           'man.ip: RECORD,' \
           'man.ip.cc: STRING,' \
           'man.ip.city: STRING,' \
           'man.ip.as: INTEGER,' \
           'man.ip.country: STRING,' \
           'man.res: RECORD,' \
           'man.res.ip_dom: STRING'

  first = p | 'read' >> ReadFromText(wordcount_options.input)
  second = (first
            | 'process' >> (beam.ParDo(processFunction()))
            | 'write' >> beam.io.WriteToBigQuery(
              'myBucket:tableFolder.test_table',
              schema=schema)
  )

我使用以下模式创建了BigQuery表:

[
  {
    "mode": "NULLABLE",
    "name": "url",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "ua",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "method",
    "type": "STRING"
  },
  {
    "mode": "REPEATED",
    "name": "man",
    "type": "RECORD",
    "fields":
      [
        {
          "mode": "REPEATED",
          "name": "ip",
          "type": "RECORD",
          "fields":
            [
              {
                "mode": "NULLABLE",
                "name": "cc",
                "type": "STRING"
              },
              {
                "mode": "NULLABLE",
                "name": "city",
                "type": "STRING"
              },
              {
                "mode": "NULLABLE",
                "name": "as",
                "type": "INTEGER"
              },
              {
                "mode": "NULLABLE",
                "name": "country",
                "type": "STRING"
              }
            ]
        },
        {
          "mode": "REPEATED",
          "name": "res",
          "type": "RECORD",
          "fields":
            [
              {
                "mode": "NULLABLE",
                "name": "ip_dom",
                "type": "STRING"
              }
            ]
        }
      ]
  }
]

我得到以下错误:

BigQuery creation of import job for table "test_table" in dataset "tableFolder" in project "myBucket" failed., BigQuery execution failed., HTTP transport error:
 Message: Invalid value for: url is not a valid value
 HTTP Code: 400

有人能给我指路吗?我做错了什么?此外,如果有更好的方法迭代所有嵌套模式并写入BigQuery,请建议?

其他信息我的数据文件:

{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"IN","city":"delhi","as":274,"country":"States"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"DK","city":"munlan","as":4865,"country":"United"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"GET","man":{"ip":{"cc":"BS","city":"sind","as":7655,"country":"India"},"res":{"ip_dom":"v1"}}}

共有1个答案

裴经义
2023-03-14

您的代码存在的问题是,在将BigQuery表模式指定为string的同时尝试使用嵌套字段,这是不受支持的。为了将嵌套记录从Apache Beam推送到BigQuery中,您需要创建TableSchema对象,即使用内置解析器:

from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
table_schema = parse_table_schema_from_json(your_bigquery_json_schema)

您需要在那里将模式作为JSON字符串传递,您可以在终端中使用以下命令获得它(我假设您安装了GCloud Tools):

bq --project=your-gcp-project-name --format=json show your.table.name > schema.json

并在Python中按如下方式使用:

table_schema = parse_table_schema_from_json(json.dumps(json.load(open("schema.json"))["schema"]))

则在您的管道中:

 beam.io.WriteToBigQuery(
              'myBucket:tableFolder.test_table',
              schema=table_schema)

您还可以查看手动创建TableSchema对象的示例:https://github.com/apache/beam/blob/474345f5987e47a22d063c7bfcb3638c85a57e64/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py

它是(来自链接的示例):

from apache_beam.io.gcp.internal.clients import bigquery
table_schema = bigquery.TableSchema()
full_name_schema = bigquery.TableFieldSchema()
full_name_schema.name = 'fullName'
full_name_schema.type = 'string'
full_name_schema.mode = 'required'
table_schema.fields.append(full_name_schema)

# A nested field
phone_number_schema = bigquery.TableFieldSchema()
phone_number_schema.name = 'phoneNumber'
phone_number_schema.type = 'record'
phone_number_schema.mode = 'nullable'
number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
phone_number_schema.fields.append(number)

table_schema.fields.append(phone_number_schema)
area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
phone_number_schema.fields.append(area_code)
table_schema.fields.append(phone_number_schema)

然后只需在beam.io.WriteTobigQuery中使用table_schema变量。

 类似资料:
  • 我试图使用Apache Beam将一些看起来像Json的数据流作为字符串写入BigQuery。数据是一个数组中的一个数组,由于不可能将嵌套数组写入Bigquery,所以我在主记录地址中创建了一个子记录Address_Instance。我需要帮助将我的嵌套记录写入BigQuery。 下面是我的数据的样子,以及我如何将其转换为Bigquery。 这就是我得到的错误: 下面是我的BigQuery模式的样

  • 我可以以大约每秒10,000次插入的速度将插入直接流式传输到BigQuery,但是当我试图使用Dataflow插入时,'tobqrow'步骤(如下所示)非常慢。每10分钟只有50排,这是4名工人。知道为什么吗?以下是相关代码:

  • 问题内容: 我正在尝试将嵌套的字典写入.csv文件。这是一个简单的示例: 这使我得到一个包含两列的表:第一个包含; 第二个包含[2,1,1](或子词典中的相应值)。我想要一个有四列的表:一列对应的列表元素,然后三列对应的列表元素。 问题答案: 更改: 至: 否则,您会尝试向csv编写类似的内容,而您的意思是。 如Padraic所述,您可能希望更改为或避免随机行为。

  • 我们的一个数据流作业将其输出写入BigQuery。我对这是如何在幕后实现的理解是,Dataflow实际上将JSON格式的结果(sharded)写入GCS,然后启动BigQuery load作业来导入该数据。 但是,我们注意到,无论作业成功还是失败,有些JSON文件都不会在作业完成后删除。错误消息中没有文件不会被删除的警告或建议。当我们注意到这一点时,我们查看了我们的bucket,它有数百个来自失败

  • 我希望从ParDo函数中调用操作,为中的每个键生成单独的BigQuery表(我使用的是python SDK)。这里有两个类似的线程,不幸的是没有帮助: 1)https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey 当我执行以下代码时,第一个键的行被插入到BigQuery,然后管道失败,出现以下错误

  • 问题内容: 我可以使用Golang将平面对象插入BigQuery中-如何将嵌套数据插入表中? 我的BigQuery模式如下所示(从示例中): 我的第一次插入尝试如下所示(示例): 哪个变平并插入没有问题。我只是在用visit_duration 但是,我需要遍历一个片段并添加到访问记录中。我试图建立一个 访问 对象(没有要测试的循环)并将其添加到行中,但它没有插入,并且我没有收到任何错误: - -[