我想把BigQuery表名作为运行时参数传递给我的数据流模板,就像这样简单:
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--input', type=str, help='BigQuery table reference DATASET.TABLE')
parser.add_value_provider_argument('--output', type=str, help='BigQuery table reference DATASET.TABLE')
def run(argv=None):
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
user_options = pipeline_options.view_as(UserOptions)
# Query
query = f"""
SELECT * FROM `{user_options.input}`
WHERE last_scrape_date > (SELECT max(last_scrape_date) from `{user_options.output}`)
"""
(p
| 'Read from BQ Table' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.WriteToBigQuery(
'{user_options.output}',
schema=schema,
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# Deletes all data in the BigQuery table before writing.
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()
apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/vf-scrapers/jobs?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Tue, 15 Dec 2020 23:25:17 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '500', '-content-encoding': 'gzip'}>, content <{
"error": {
"code": 400,
"message": "Table name \"RuntimeValueProvider(option: input, type: str, default_value: None)\" missing dataset while no default dataset is set in the request.",
"errors": [
{
"message": "Table name \"RuntimeValueProvider(option: input, type: str, default_value: None)\" missing dataset while no default dataset is set in the request.",
"domain": "global",
"reason": "invalid"
}
],
"status": "INVALID_ARGUMENT"
}
}
这个问题有一个解决方案:https://issues.apache.org/jira/browse/beam-1440,但到目前为止我还不明白结论。
运行时参数,如您的Bigquery表名,在管道构建过程中表示为valueProvider
对象,而不是字符串文本。您可以在输出中看到user_options.input
作为字符串化的runtimeValueProvider
输出。但在这种情况下,修复非常简单;直接传入对象,而不将其转换为字符串,如下所示:
beam.io.WriteToBigQuery(
user_options.output,
schema=schema,
...
这是因为在数据流模板中,管道构造发生在传递运行时参数之前。调用此代码时,您的输入和输出参数尚未定义。相反,ValueProvider充当占位符,一旦定义了参数,就允许在运行时检索参数。
...这不起作用,因为我在管道执行之前调用get()。到目前为止,我还没有将为do_some_stuff函数所做的调整到“read”行 任何关于如何进行的建议或解决方案都将不胜感激。谢了!
问题内容: 我正在创建一种通过传递搜索字段从任何表中选择ID的方法。 但是我得到一个有关语法错误的MySqlException。当我查看“异常”消息时,它向我显示带引号的查询表!如何将表格作为不带引号的参数传递? 问题答案: 大多数数据库不允许您通过参数指定表名或列名。参数用于 值 。如果确实确实需要使它动态化,则应验证输入(它应该是一个已知的表名,并且该表中具有已知的列名),然后将其包括在SQL
问题内容: 我有3类调用,和。 在我的课程中,我想要一个这样的方法: 因此,例如,我想: 如何将类名作为参数,并基于该类名从类名创建适当的对象? 问题答案: 使用反射是可能的。这里是给定的className(作为字符串传递)。此类将在内存中搜索(应该已经加载)。 作为字符串传递时要实例化的类的名称应 完全限定
我正在做一个情绪检查应用程序使用flutter,其中用户选择了5个表情符号之一来告诉他们的心情。我想用数据表情符号显示一个PieChart vs num天它已经被选择了。问题是,我必须从sqflite数据库中获取数据,以获得numOfDays一个特定的表情符号被选中,它将是未来的类型,但图表不能采取未来,我试图使用async-await,但它似乎不起作用。 *数据库文件 Mood_Chart文件片
问题内容: 我已经熟悉Android框架和Java,并希望创建一个通用的“ NetworkHelper”类,该类可以处理大多数联网代码,使我能够从中调用网页。 我遵循了来自developer.android.com的这篇文章来创建我的网络类:http : //developer.android.com/training/basics/network- ops/connecting.html 码: