当前位置: 首页 > 知识库问答 >
问题:

如何构建GCP/Apache Beam数据流模板?

丌官玺
2023-03-14

好吧,我肯定是遗漏了什么。我需要什么来作为模板准备管道?当我试图通过这些说明将模板暂存时,它会运行模块,但不会暂存任何内容。,它看起来像预期的那样工作,没有出现错误,但是我没有看到任何文件实际添加到bucket位置,在我的--template_位置中侦听。我的python代码应该出现在那里吗?我想是这样吧?我已经确保安装了所有的beam和google cloud SDK,但也许我遗漏了什么?要准备此数据流模板,您需要做什么?我也可以手动将文件放入一个bucket并从那里运行它吗?以下是我当前使用的模板:

import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json

GC_PROJECT = 'my-proj'
BUCKET = 'test-bucket'
STAGING_BUCKET = '%s/test' % BUCKET
TEMP_BUCKET = '%s/test' % BUCKET
# RUNNER = 'DataflowRunner'
RUNNER = 'DirectRunner'

# pipeline_args = ['--save_main_session']
pipeline_args = []
pipeline_args.append('--project=%s' % GC_PROJECT)
pipeline_args.append('--runner=%s' % RUNNER)
pipeline_args.append('--staging_location=gs://%s' % STAGING_BUCKET)
pipeline_args.append('--temp_location=gs://%s' % TEMP_BUCKET)

BQ_DATASET = 'lake'
BQ_TABLE = 'whatever'

SCHEMA_OBJ = [
    {"name": "id", "type": "STRING", "description": ""},
    {"name": "value", "type": "STRING", "description": ""}
]


class ContactUploadOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--infile',
            type=str,
            help='path of input file',
            default='gs://%s/data_files/test.csv' % BUCKET)

def run(argv=None):
    print('running')
    p = beam.Pipeline(options=PipelineOptions(pipeline_args))
    lines = (p
             | beam.Create([
                {"id": "some random name", "value": "i dont know"},
                {"id": "id2", "value": "whatever man"}]))

    schema_str = '{"fields": ' + json.dumps(SCHEMA_OBJ) + '}'
    schema = parse_table_schema_from_json(schema_str)
    output_destination = '%s.%s' % (BQ_DATASET, BQ_TABLE)
    (lines
        | 'Write lines to BigQuery' >> beam.io.WriteToBigQuery(
            output_destination,
            schema=schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

    p.run().wait_until_finish()


if __name__ == '__main__':
    run(pipeline_args)


此外,如果有人可以链接一些sdk文档/资源,解释如何/为什么上面的分期说明应该工作,那就太棒了!

共有1个答案

暴阳州
2023-03-14

临时位置是运行作业时加载临时文件的位置。您没有提到将创建模板的template_location。

请参阅创建模板和运行模板的链接

 类似资料:
  • 使用标准的GCP提供的存储/文本文件来发布Sub数据流模板,但是尽管我已经设置了#workernodes eq 1,但是对于下游组件来说,处理的消息吞吐量“太高”。 在 Pub/Sub 中的消息事件上运行的 Cloud 函数会命中 GCP 配额,并且使用 CloudRun,我在开始时收到一堆 500、429 和 503 个错误(由于步进突发率)。 有没有办法控制数据流的处理速率?需要获得更软/更慢

  • 我正在构建一个事件驱动的微服务架构,它应该是云不可知的(尽可能多)<由于这最初是在GCP中进行的,我不想在配置和所有这些方面花费太长时间,我打算直接将GCP的发布/订阅用于事件队列,并在稍后处理其他云实现,但后来我遇到了Spring云数据流,这看起来很好,因为这些是Spring Boot微服务,我需要一种方法来协调它们 Spring Cloud数据流是否支持Pub Sub作为事件队列? 在配置和设

  • 我很难理解GCP数据流/Apache Beam和Spring Cloud数据流之间的差异。我试图做的是转向一个更云原生的解决方案,用于流数据处理,这样我们的开发人员可以更专注于开发核心逻辑,而不是管理基础设施。 我们有一个现有的流解决方案,由Spring云数据流“模块”组成,我们可以独立迭代和部署,就像微服务一样,效果很好,但我们希望迁移到我们的业务提供的GCP现有平台,要求我们使用GCP数据流。

  • 我已经通过。但是,我注意到Reshuffle()没有出现在发行版中。这是否意味着我将不能在任何数据流管道中使用?有什么办法可以绕过这个吗?或者pip包可能只是不是最新的,如果Reshuffle()在github的master中,那么它将在Dataflow上可用? 根据对这个问题的回答,我试图从BigQuery中读取数据,然后在将数据写入GCP存储桶中的CSV中之前对数据进行随机化。我注意到,我用来

  • 我正在尝试找出是否有任何GCP数据流模板可用于使用“Pub/Sub to Cloud Spanner”进行数据摄取。我发现已经有一个默认的GCP数据流模板可用于示例-“Cloud Pub/Sub to BigQuery”。所以,我有兴趣看看我是否可以在流或批处理模式下对扳手进行数据摄取,以及行为会如何