当前位置: 首页 > 知识库问答 >
问题:

从模板运行时参数命名BigQuery表,Python,Apache Beam,数据流

高森
2023-03-14

我正在PythonApacheBeamDataflow中进行一个项目,我需要根据启动数据流模板提供的运行时参数命名bigquery表。

到目前为止,我运气不好,它要么为我提供了运行时参数的定义,要么提供了一个空字符串。

所以我基本上需要这个来工作:

class CustomPipelineOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--path',
            type=str,
            help='csv storage path')
        parser.add_value_provider_argument(
            '--table_name',
            type=str,
            help='Table Id')
def run()
    def rewrite_values(element):
        """ Rewrite default env values"""
        try:
            logging.info("File Path with str(): {}".format(str(custom_options.path)))
            logging.info("----------------------------")
            logging.info("element: {}".format(element))
            project_id = str(cloud_options.project)
            file_path = custom_options.path.get()
            table_name = custom_options.table_name.get()

            logging.info("project: {}".format(project_id))
            logging.info("File path: {}".format(file_path))
            logging.info("language: {}".format(table_name))
            logging.info("----------------------------")
        except Exception as e:
            logging.info("Error format----------------------------")
            raise KeyError(e)

        return file_path

    pipeline_options = PipelineOptions()
    cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    custom_options = pipeline_options.view_as(CustomPipelineOptions)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    # Beginning of the pipeline
    p = beam.Pipeline(options=pipeline_options)

    init_data = (p
                 | beam.Create(["Start"])
                 | beam.FlatMap(rewrite_values))

pipeline processing, running pipeline etc.

save_data_bigquery = (table_data | "Get all numbers" >> beam.ParDo(GetAllNumbers())
                      | 'Flat items' >> beam.FlatMap(flat_item)
                      | 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=project_id,
                                                                       dataset="defined_dataset",
                                                                       table=table_name, **********
                                                                       schema="id:STRING",
                                                                       batch_size=10000)
                      )

命名表中的写BigQuery函数是我有麻烦的地方,我也尝试使用custom_options.table_name,声明变量为全局等。

我已经创建了一个自定义DoFn来写入BigQuery,尽管这将是我的首选方法。

共有1个答案

柳均
2023-03-14

我尝试编写一个BQ_writer类,并在其中编写了实际的WriteToBigQuery。

class BQ_writer(beam.DoFn):
    def __init__(self, schema, output):
        self.output = output
        self.schema = schema

    def process(self, element):
        schema_l = self.schema.get()
        output_table_l = self.output.get()
        logging.info('Writing to table and schema: {}  {}'.format(output_table_l,schema_l))
        beam.io.WriteToBigQuery(output_table_l,
                                schema=schema_l,
                                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)

然后在管道中:

| 'WriteToBigQuery' >> beam.ParDo(BQ_writer(useroptions.schema,useroptions.output))

这工作得很好,流程构建得没有错误。但是在大查询表中找不到数据。可能是我们不能在ParDo函数中使用WriteToBigQuery。欢迎建议如何从这里开始...

 类似资料:
  • 我试图在Apache Beam中使用BigtableIO的运行时参数来写入BigTable。 我创建了一个从 BigQuery 读取并写入 Bigtable 的管道。当我提供静态参数时,管道工作正常(使用 ConfigBigtableIO 和 ConfigBigtableConfiguration,请参阅此处的示例 - https://github.com/GoogleCloudPlatform/

  • 我想把BigQuery表名作为运行时参数传递给我的数据流模板,就像这样简单: 这个问题有一个解决方案:https://issues.apache.org/jira/browse/beam-1440,但到目前为止我还不明白结论。

  • 我正在尝试为Google Cloud Dataflow创建自己的模板,这样作业就可以从GUI执行,让其他人更容易执行。我遵循了教程,创建了自己的PipelineOptions类,并用parser.add_value_provider_argument()方法填充了它。然后,当我尝试使用my_options.argname.get()将这些参数传递到管道中时,我会得到一个错误,告诉我该项不是从运行时

  • 请找到我使用过的代码。以下HQL查询失败,说明: 找不到命名参数 [模板 Id] 但是模板 Id 存在于我的模型类中。 请帮助解决问题或可能导致此类错误的原因: 模型文件 请帮助解决我的问题

  • 由于我刚接触DataFlow/Beam,概念还不太清楚(或者至少我在开始编写代码时有困难),我有很多问题: 什么是最好的模板或模式,我可以用来做到这一点?我应该先执行BigQuery的PTransform(然后执行PubSub的PTransform)还是先执行PubSub的PTransform? 我怎么做加入?比如? PubSub的最佳窗口设置是什么?BigQuery的PTransform部分的窗

  • 我想从java程序中运行cmd命令,但它不起作用,这是命令:newman run collection1。4.json-eqa。邮递员的环境。json-r htmlextra目录是:C:\ 我在eclipse下编写了这个java脚本,但它不起作用: *公共静态空Test_WS_Cmd()抛出IOExctive,中断异常{