当前位置: 首页 > 知识库问答 >
问题:

数据流模板作业未获取输入参数

应涵容
2023-03-14

我用下面的命令创建了一个数据流模板

    python scrap.py --setup_file /home/deepak_verma/setup.py
  --temp_location gs://visualization-dev/temp
 --staging_location gs://visualization-dev/stage 
--project visualization-dev --job_name scrap-job 
--subnetwork regions/us-east1/subnetworks/dataflow-internal 
--region us-east1  --input sentiment_analysis.table_view 
--output gs://visualization-dev/incoming 
--runner DataflowRunner 
--template_location gs://visualization-dev/template/scrap 
@classmethod
def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument(
        '--input', dest='input', required=True,
        help='Input view. sentiment_analysis.table_view',
    )

    parser.add_value_provider_argument(
        '--output', dest='output', required=True,
        help='output gcs file path'
    )
beam.io.Read(beam.io.BigQuerySource(query=read_query.format(
        table=options.input.get(), limit=(LIMIT and "limit " + str(LIMIT) or '')), use_standard_sql=True)))

where read_query is defined as `SELECT upc, max_review_date FROM `{table}`
template_body = {
                'jobName': job_name,
                'parameters': {'input': 'table_view2'}
            }
            credentials = GoogleCredentials.get_application_default()
            service = build('dataflow', 'v1b3', credentials=credentials)
            request = service.projects().locations().templates().launch(projectId=constants.GCP_PROJECT_ID, location=constants.REGION, gcsPath=template_gcs_path, body=template_body)

数据流不为table_view2调用此函数,而是为该作业使用table_view。

共有1个答案

伏欣悦
2023-03-14

问题是,在暂存模板时,您已经传递了input值,这是正在解决的问题。在运行第一个命令时删除--input sentiment_analysis.table_view,并将其保留为空。仅在使用'parameters'执行模板时将其指定为参数:{'input':'sentiment_analysis.table_view2'}

如果仍然需要默认值,可以在添加值提供程序参数时进行,如本例所示:

parser.add_value_provider_argument(
        '--input', dest='input', required=True,
        help='Input view. sentiment_analysis.table_view',
        default='sentiment_analysis.table_view'
    )
 类似资料:
  • 在我当前的架构中,多个数据流作业在不同阶段被触发,作为ABC框架的一部分,我需要捕获这些作业的作业id作为数据流管道中的审计指标,并在BigQuery中更新它。 如何使用JAVA从管道中获取数据流作业的运行id?有没有我可以使用的现有方法,或者我是否需要在管道中使用google cloud的客户端库?

  • 如何在谷歌VPC项目中运行的谷歌数据流模板中传递/设置“usepublicips”作为运行时参数?

  • 我想得到输入流作为JSON数组从一个网址。如何设置源代码,以便在apache flink中使用datastream连续获得输入。简而言之,我想从一个url连续获得json数据,而不会关闭flink作业。

  • 当我在GCP中运行作业时,这工作很好,但如果没有任何更新,它将失败。如果我删除update标志,只要没有正在运行的作业,它就可以正常工作。 是否有一种方法来指定,如果作业存在,则更新它,而不仅仅是开始一个新的作业?

  • 我需要从服务绑定模板数据变量。服务使用HTTP检索JSON格式的数据。我得到了正确的数据,但是因为请求是如此的异步返回服务总是未定义的。 如何将异步数据销售到模板中?不使用回调? AppComponent: 菜单提供者: 月经量: 模板:

  • 我正在尝试创建和阶段数据流经典模板。按照下面提供的链接中的文档- https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#java_8 . mvn编译exec: java\-Dexec.mainClass=com.example.myclass\-Dexec.args="--runner=Dataflow