当前位置: 首页 > 知识库问答 >
问题:

无法将BigQuery表名作为ValueProvider传递给数据流模板

上官霄
2023-03-14

我想把BigQuery表名作为运行时参数传递给我的数据流模板,就像这样简单:

class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--input', type=str, help='BigQuery table reference DATASET.TABLE')
        parser.add_value_provider_argument('--output', type=str, help='BigQuery table reference DATASET.TABLE')

def run(argv=None):    
    pipeline_options = PipelineOptions()
    p = beam.Pipeline(options=pipeline_options)
    user_options = pipeline_options.view_as(UserOptions)

    # Query
    query = f"""
    SELECT * FROM `{user_options.input}`
    WHERE last_scrape_date > (SELECT max(last_scrape_date) from `{user_options.output}`)
    """
    (p
     | 'Read from BQ Table' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
     | 'Write to BigQuery' >> beam.io.Write(
         beam.io.WriteToBigQuery(
             '{user_options.output}',
             schema=schema,
             # Creates the table in BigQuery if it does not yet exist.
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
             # Deletes all data in the BigQuery table before writing.
             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
    p.run().wait_until_finish()
apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/vf-scrapers/jobs?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Tue, 15 Dec 2020 23:25:17 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '500', '-content-encoding': 'gzip'}>, content <{
  "error": {
    "code": 400,
    "message": "Table name \"RuntimeValueProvider(option: input, type: str, default_value: None)\" missing dataset while no default dataset is set in the request.",
    "errors": [
      {
        "message": "Table name \"RuntimeValueProvider(option: input, type: str, default_value: None)\" missing dataset while no default dataset is set in the request.",
        "domain": "global",
        "reason": "invalid"
      }
    ],
    "status": "INVALID_ARGUMENT"
  }
}

这个问题有一个解决方案:https://issues.apache.org/jira/browse/beam-1440,但到目前为止我还不明白结论。

共有1个答案

曾晨
2023-03-14

运行时参数,如您的Bigquery表名,在管道构建过程中表示为valueProvider对象,而不是字符串文本。您可以在输出中看到user_options.input作为字符串化的runtimeValueProvider输出。但在这种情况下,修复非常简单;直接传入对象,而不将其转换为字符串,如下所示:

beam.io.WriteToBigQuery(
             user_options.output,
             schema=schema,
             ...

这是因为在数据流模板中,管道构造发生在传递运行时参数之前。调用此代码时,您的输入和输出参数尚未定义。相反,ValueProvider充当占位符,一旦定义了参数,就允许在运行时检索参数。

 类似资料:
  • ...这不起作用,因为我在管道执行之前调用get()。到目前为止,我还没有将为do_some_stuff函数所做的调整到“read”行 任何关于如何进行的建议或解决方案都将不胜感激。谢了!

  • 问题内容: 我正在创建一种通过传递搜索字段从任何表中选择ID的方法。 但是我得到一个有关语法错误的MySqlException。当我查看“异常”消息时,它向我显示带引号的查询表!如何将表格作为不带引号的参数传递? 问题答案: 大多数数据库不允许您通过参数指定表名或列名。参数用于 值 。如果确实确实需要使它动态化,则应验证输入(它应该是一个已知的表名,并且该表中具有已知的列名),然后将其包括在SQL

  • 问题内容: 我有3类调用,和。 在我的课程中,我想要一个这样的方法: 因此,例如,我想: 如何将类名作为参数,并基于该类名从类名创建适当的对象? 问题答案: 使用反射是可能的。这里是给定的className(作为字符串传递)。此类将在内存中搜索(应该已经加载)。 作为字符串传递时要实例化的类的名称应 完全限定

  • 我正在做一个情绪检查应用程序使用flutter,其中用户选择了5个表情符号之一来告诉他们的心情。我想用数据表情符号显示一个PieChart vs num天它已经被选择了。问题是,我必须从sqflite数据库中获取数据,以获得numOfDays一个特定的表情符号被选中,它将是未来的类型,但图表不能采取未来,我试图使用async-await,但它似乎不起作用。 *数据库文件 Mood_Chart文件片

  • 问题内容: 我已经熟悉Android框架和Java,并希望创建一个通用的“ NetworkHelper”类,该类可以处理大多数联网代码,使我能够从中调用网页。 我遵循了来自developer.android.com的这篇文章来创建我的网络类:http : //developer.android.com/training/basics/network- ops/connecting.html 码: