当前位置: 首页 > 知识库问答 >
问题:

在Google Cloud Dataflow-Python中访问模板化运行时参数

邹俊友
2023-03-14

我正在尝试为Google Cloud Dataflow创建自己的模板,这样作业就可以从GUI执行,让其他人更容易执行。我遵循了教程,创建了自己的PipelineOptions类,并用parser.add_value_provider_argument()方法填充了它。然后,当我尝试使用my_options.argname.get()将这些参数传递到管道中时,我会得到一个错误,告诉我该项不是从运行时上下文调用的。我不明白这个。Arg不是定义管道图本身的一部分,它们只是参数,如输入文件名、输出表名等。

下面是代码。如果我对输入文件名、输出表名、写处理和分隔符进行硬编码,它就可以工作。如果我用它们的my_options.argname.get()等效项替换它们,则失败。在所示的代码片段中,除了outputBQTable名称之外,我已经对所有内容进行了硬编码,在该名称中我使用my_options.outputBQTable.get()。此操作失败,出现以下消息

apache_beam.error.runtimeValueProviderError:RuntimeValueProvider(选项:outputBQTable,type:str,default_value:'dataflow_csv_reader_testing.names').get()未从运行时上下文调用

我很感激任何关于如何让它发挥作用的指导。

import apache_beam
from apache_beam.io.gcp.gcsio import GcsIO
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import RuntimeValueProvider
import csv
import argparse

class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls,parser):
        parser.add_value_provider_argument('--inputGCS', type=str,
              default='gs://mybucket/df-python-csv-test/test-dict.csv',
              help='Input gcs csv file, full path and filename')
        parser.add_value_provider_argument('--delimiter', type=str,
              default=',',
              help='Character used as delimiter in csv file, default is ,')
        parser.add_value_provider_argument('--outputBQTable', type=str,
              default='dataflow_csv_reader_testing.names',
              help='Output BQ Dataset.Table to write to')
        parser.add_value_provider_argument('--writeDisposition', type=str,
              default='WRITE_APPEND',
              help='BQ write disposition, WRITE_TRUNCATE or WRITE_APPEND or WRITE_EMPTY')

def main():
    optlist=PipelineOptions()
    my_options=optlist.view_as(MyOptions)
    p = apache_beam.Pipeline(options=optlist)
    (p
    | 'create'            >> apache_beam.Create(['gs://mybucket/df-python-csv-test/test-dict.csv'])
    | 'read gcs csv dict' >> apache_beam.FlatMap(lambda file: csv.DictReader(apache_beam.io.gcp.gcsio.GcsIO().open(file,'r'), delimiter='|'))
    | 'write bq record'   >> apache_beam.io.Write(apache_beam.io.BigQuerySink(my_options.outputBQTable.get(), write_disposition='WRITE_TRUNCATE'))
    )
    p.run()

if __name__ == '__main__':
    main()

暂时还没有答案

 类似资料:
  • 我试图在类型s. t上专门化一个类。它忽略了给定类型的恒定性。在这种情况下,该类型是一个模板模板参数: 上面的代码在GCC 4.8.4和clang 5.0(with-std=c 11)中都抱怨bar在与匹配FOFType模板参数化的类一起使用时未定义。即使我删除了sfinae参数,仍然无法找到特化。 这个问题的一个例子可以在这里找到:https://godbolt.org/g/Cjci9C.在上面

  • 我通过Xtext创建了一个DSL,现在需要将编辑器中创建的模型转换为另一个模型。我认为最直接的方法是使用某种M2M转换框架,但我需要访问文本文件后面的模型。问题:如何获得模型的引用?

  • 我正在PythonApacheBeamDataflow中进行一个项目,我需要根据启动数据流模板提供的运行时参数命名bigquery表。 到目前为止,我运气不好,它要么为我提供了运行时参数的定义,要么提供了一个空字符串。 所以我基本上需要这个来工作: 命名表中的写BigQuery函数是我有麻烦的地方,我也尝试使用custom_options.table_name,声明变量为全局等。 我已经创建了一个

  • 我正在使用一个由其他人编写的C++库,它(不幸的是imo)实现了几个封装在类中的算法,这些类的行为可以通过作为模板参数传递的参数(参数通常也是类)进行微调。例如,可能有这样一个类模板: 我想用这个库编写一个程序,它需要根据运行时信息创建的实例。我的问题是,假设有、和有效类型可以为、和传递,那么当我想实例化的不同spezialization时,我可能需要在代码中的某个位置创建多达分支。例如,在运行时

  • 我有个测试: 当我删除子句并添加一个包含的依赖项(作为库)时,它可以工作: 当我添加它们时(中的Maven依赖项和),IDEA中的编译失败,出现以下消息: 但是Maven build仍然成功! 测试项目可在https://github.com/rpuch/test-resource-jdk9获得

  • 有没有一种方法可以在运行时访问JUnit5版本? 例如。 在JUnit4中工作得很好。 我正在寻找JUnit5的“对应物” 谢谢:-)