当前位置: 首页 > 知识库问答 >
问题:

使用ValueProvider格式化数据流中的BigQuery

徐皓君
2023-03-14
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY, use_standard_sql=True))
             | "doing stuff" >> beam.Map(do_some_stuff)
             )

class TemplateOption(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--template_do_stuff_param',
                                           default=45,
                                           type=int)
class MyDoStuffFn(beam.DoFn):
    def __init__(self, template_do_stuff_param):
      self.template_do_stuff_param = template_do_stuff_param

    def process(self, *_):
      yield do_some_stuff(self.template_do_stuff_param.get())


pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY),
                                                                                     use_standard_sql=True))
             | "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
             )

class TemplateOption(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--template_nb_users',
                                           default=100,
                                           type=int)
        parser.add_value_provider_argument('--template_do_stuff_param',
                                           default=45,
                                           type=int)
class MyDoStuffFn(beam.DoFn):
    def __init__(self, template_do_stuff_param):
      self.template_do_stuff_param = template_do_stuff_param

    def process(self, *_):
      yield do_some_stuff(self.template_do_stuff_param.get())


pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY.format(nb_users=template_option.template_nb_users.get()),
                                                                                     use_standard_sql=True))
             | "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
             )

...这不起作用,因为我在管道执行之前调用get()。到目前为止,我还没有将为do_some_stuff函数所做的调整到“read”行

任何关于如何进行的建议或解决方案都将不胜感激。谢了!

暂时还没有答案

 类似资料:
  • v-charts 提供对数据格式的设置的能力,一个常见的设置数据格式的方式如下所示: <template> <ve-line :data="chartData" :settings="chartSettings"></ve-line> </template> <script> export default { data () { this.chartSettings = { metr

  • 本文向大家介绍使用js实现数据格式化,包括了使用js实现数据格式化的使用技巧和注意事项,需要的朋友参考一下 格式化是通过格式操作使任意类型的数据转换成一个字符串。例如下面这样 下面是一个完整的代码,可以复制到自己的项目中。

  • 本文向大家介绍layui使用templet格式化表格数据的方法,包括了layui使用templet格式化表格数据的方法的使用技巧和注意事项,需要的朋友参考一下 增加js /*---------------------格式化时间开始--------------------------*/ /*---------------------格式化时间结束--------------------------

  • 数据格式化 这个ValueFormatter接口可以用来创建定制格式化程序类允许格式化图标中的数据或这Y轴上的数据。 使用ValueFormatter仅需要创建一个新的类和实现他的接口并且从getFormattedValue(float value)返回任何你想要显示的文本。 自定义格式化的例子: public class MyValueFormatter implements V

  • 我在下面编写了这个工作流,以根据谷歌Java风格指南验证我项目中的Java格式。我的意图是在工作流中使用maven。 使用gradle,我可以使用chmod x gradlew使其可执行,并使用

  • 问题内容: 我正在尝试使用DecimalFormat格式化价格,但这不适用于所有版本。 版画 和 但是“ 7.79999”的格式设置为“ 7.8”,而不是“ 7.80”。我尝试过这种方式 强制两个dp,但是“ 85.0”的格式设置为“ 85.00”而不是“ 85”! 有没有一种方法可以捕获所有变化,以便将价格打印为#,##或#。##?例如: 5、55、5.55、5.50、500、500.40 问题