当前位置: 首页 > 知识库问答 >
问题:

在dataflow python中运行时为“beam.io.BigQuerySource”提供“query”参数

蒋昊天
2023-03-14

TLDR:我希望每个月使用dataflow API和模板运行beam.io.bigQuerySource和不同的查询。如果这是不可能的,那么我是否可以在运行时将查询传递给beam.io.bigquerysource,同时仍然使用Dataflow API和模板?

我有一个数据流“批处理”数据管道,它读取一个BigQuery表,如下所示

def run(argv=None):
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--pro_id',
        dest='pro_id',
        type=str,
        default='xxxxxxxxxx',
        help='project id')
    parser.add_argument(
        '--dataset',
        dest='dataset',
        type=str,
        default='xxxxxxxxxx',
        help='bigquery dataset to read data from')

    args, pipeline_args = parser.parse_known_args(argv)
    project_id = args.pro_id
    dataset_id = args.dataset

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    with beam.Pipeline(argv=pipeline_args) as p:
    
        companies = (
                p
                | "Read from BigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=query_bq(project_id, dataset_id),
                                                                              use_standard_sql=True))
        )

beam.io.BigQuerySource的查询参数由如下函数计算

from datetime import datetime
def query_bq(project, dataset):
    month = datetime.today().replace(day=1).strftime("%Y_%m_%d")
    query = (
        f'SELECT * FROM `{project}.{dataset}.data_{month}_json` '
        f'LIMIT 10'
    )
    return query

这里有几件事要注意

  1. 我想每天运行此数据管道一次
  2. 表id每月都在变化。例如,这个月的表id是data_2020_06_01_json,下个月的表id是data_2020_07_01_json,所有这些都是由上面的def query_bq(project,dataset)
  3. 计算的
  4. 我希望使用Dataflow API使用云函数、pubsub事件、云调度器自动运行此批处理管道。

下面是由cloud-scheduler每天向pubsub发布事件触发的云函数

def run_dataflow(event, context):
    if 'data' in event:
        pubsub_message = base64.b64decode(event['data']).decode('utf-8')
        pubsub_message_dict = ast.literal_eval(pubsub_message)
        event = pubsub_message_dict.get("eventName")
        now = datetime.today().strftime("%Y-%m-%d-%H-%M-%S")
        project = 'xxx-xxx-xxx'
        region = 'europe-west2'
        dataflow = build('dataflow', 'v1b3', cache_discovery=False)
        if event == "run_dataflow":
            job = f'dataflow-{now}'
            template = 'gs://xxxxx/templates/xxxxx'
            request = dataflow.projects().locations().templates().launch(
                projectId=project,
                gcsPath=template,
                location=region,
                body={
                    'jobName': job,
                }
            )
            response = request.execute()
            print(response)

下面是我用来在dataflow上启动这个数据管道的命令

python main.py \
    --setup_file ./setup.py \
    --project xxx-xx-xxxx \
    --pro_id xxx-xx-xxxx \
    --dataset 'xx-xxx-xxx' \
    --machine_type=n1-standard-4 \
    --max_num_workers=5 \
    --num_workers=1 \
    --region europe-west2  \
    --serviceAccount= xxx-xxx-xxx \
    --runner DataflowRunner \
    --staging_location gs://xx/xx \
    --temp_location gs://xx/temp \
    --subnetwork="xxxxxxxxxx" \
    --template_location gs://xxxxx/templates/xxxxx

在编译和创建数据流模板时调用我的query_bq函数,然后将该模板加载到GCS。并且这个query_bq函数不会在运行时被调用。因此,每当我的cloud-function调用dataflow create时,它总是从data_2020_06_01_json表中读取数据,即使进入7月、8月等等,查询中的表也始终保持不变。我真正想要的是根据query_bq函数动态更改该查询,以便以后可以读取data_2020_07_01_jsondata_2020_08_01_json等。

我还查看了生成的模板文件,看起来查询在编译后硬编码到模板中。下面是一个片段

 "name": "beamapp-xxxxx-0629014535-344920",
  "steps": [
    {
      "kind": "ParallelRead",
      "name": "s1",
      "properties": {
        "bigquery_export_format": "FORMAT_AVRO",
        "bigquery_flatten_results": true,
        "bigquery_query": "SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10",
        "bigquery_use_legacy_sql": false,
        "display_data": [
          {
            "key": "source",
            "label": "Read Source",
            "namespace": "apache_beam.runners.dataflow.ptransform_overrides.Read",
            "shortValue": "BigQuerySource",
            "type": "STRING",
            "value": "apache_beam.io.gcp.bigquery.BigQuerySource"
          },
          {
            "key": "query",
            "label": "Query",
            "namespace": "apache_beam.io.gcp.bigquery.BigQuerySource",
            "type": "STRING",
            "value": "SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10"
          },
          {
            "key": "validation",
            "label": "Validation Enabled",
            "namespace": "apache_beam.io.gcp.bigquery.BigQuerySource",
            "type": "BOOLEAN",
            "value": false
          }
        ],
        "format": "bigquery",
        "output_info": [
          {
class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--query_bq', type=str)

user_options = pipeline_options.view_as(UserOptions)
p | "Read from BigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=user_options.query_bq,
                                                                              use_standard_sql=True))
WARNING:apache_beam.utils.retry:Retry with exponential backoff: waiting for 3.9023594566785924 seconds before retrying get_query_location because we caught exception: apitools.base.protorpclite.messages.ValidationError: Expected type <class 'str'> for field query, found SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10 (type <class 'apache_beam.options.value_provider.StaticValueProvider'>)

共有1个答案

荣德厚
2023-03-14

BigQuerySource中不能使用ValueProviders,但是在Beam的最新版本中,可以使用Beam.io.ReadFromBigQuery,它很好地支持它们。

你会做:

result = (p 
          | beam.io.ReadFromBigQuery(query=options.input_query,
                                     ....))

您可以传递值提供程序,它还有许多其他实用程序。查看它的文档

 类似资料:
  • 标准中似乎没有规则提到模板参数需要默认参数的情况。 在dcl中。fct。默认值#1 如果在参数声明中指定了初始化子句,则将此初始化子句用作默认参数。缺省参数将用于缺少尾随参数的调用。 在本节中,规则明确描述了何时为函数调用提供默认参数。但是,我在标准中没有找到与上面描述何时提供默认参数作为模板参数的语句类似的引用。 例如

  • 问题内容: 我正在使用JDK 1.6.0_26中的VisualVM来分析在Tomcat下运行的Java Webapp,但是VisualVM经常告诉我它没有足够的内存来拍摄快照,并使用-Xmx开关为Netbeans提供更多的内存。 。问题是,我在Netbeans之外运行VisualVM,那么如何为jvisualvm.exe提供JVM参数? 问题答案: 应该能够修改内存中的设置 并且在排队。

  • 考虑以下示例: 问题来了:上例中的lambda参数是稍后将在“display()”方法内执行的对象。将参数传递给“display()”时,它显然不会执行。 为什么它被编译器拒绝?我认为只有在实际调用lambda时,用try... catch来包围它是很合理的。

  • 问题内容: 我经常碰巧处理可以是数组或null变量的数据,并用这些数据提供一些数据。 当为foreach提供非数组数据时,会收到警告: 警告:[…]中为foreach()提供了无效的参数 假设无法重构该函数以始终返回数组(向后兼容性,不可用的源代码,无论其他原因),我想知道哪种方法最有效,最有效的避免了这些警告: 转换为数组 初始化为数组 包裹有 其他(请建议) 问题答案: 我个人认为这是最干净的

  • 我正在使用TestNG和Java运行测试 这是在https://www.tutorialspoint.com/testng/testng_parameterized_test.htm 错误是这样说的:[Utils][ERROR][错误]org.testng.TestNGExc0019:@Test on method addProjectWork需要参数'url',但未在C:\用户\SStaple\

  • 我正在尝试创建一个@Configuration工厂bean,它应该根据运行时参数创建其他(原型)bean。我想使用基于spring java的配置,但不知怎么的,我无法让它工作。 这里有一个例子: 我检查了Spring文档和这里提出的所有相关问题,但最终我将向创建的bean提供运行时参数。我需要向工厂提供运行时参数,工厂必须使用它们来创建不同的bean。 编辑:似乎(目前)没有办法将@Bean注释