当前位置: 首页 > 知识库问答 >
问题:

如何迭代谷歌云存储中的所有文件作为数据流输入?

房星光
2023-03-14
python batch.py --project foobar --job_name foobar-metrics --runner DataflowRunner --staging_location gs://foobar-staging/dataflow --temp_location gs://foobar-staging/dataflow_temp --output foobar.test

我希望作业被上传到Dataflow runner,收集文件列表并迭代每个文件将在运行时在云中进行。我希望能够像读取一个文件一样传递所有文件的内容。

该作业在试图将其提交给云数据流运行器时已经阻塞。

"""A metric sink workflow."""

from __future__ import absolute_import

import json
import argparse
import logging

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
from apache_beam.utils.pipeline_options import GoogleCloudOptions

class ExtractDatapointsFn(beam.DoFn):
    """
    Parse json documents and extract the metrics datapoints.
    """
    def __init__(self):
        super(ExtractDatapointsFn, self).__init__()
        self.total_invalid = Metrics.counter(self.__class__, 'total_invalid')

    def process(self, element):
        """
        Process json that contains metrics of each element.

        Args:
            element: the element being processed.

        Returns:
            unmarshaled json for each metric point.
        """
        try:
            # Catch parsing errors as well as our custom key check.
            document = json.loads(element)
            if not "DataPoints" in document:
                raise ValueError("missing DataPoints")
        except ValueError:
            self.total_invalid.inc(1)
            return

        for point in document["DataPoints"]:
            yield point

def run(argv=None):
    """
    Main entry point; defines and runs the pipeline.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        default='gs://foobar-sink/*',
                        help='Input file to process.')
    parser.add_argument('--output',
                        required=True,
                        help=(
                            'Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
                            'or DATASET.TABLE.'))
    known_args, pipeline_args = parser.parse_known_args(argv)
    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(GoogleCloudOptions)
    pipe = beam.Pipeline(options=pipeline_options)

    # Read the json data and extract the datapoints.
    documents = pipe | 'read' >> ReadFromText(known_args.input)
    metrics = documents | 'extract datapoints' >> beam.ParDo(ExtractDatapointsFn())

    # BigQuery sink table.
    _ = metrics | 'write bq' >> beam.io.Write(
        beam.io.BigQuerySink(
            known_args.output,
            schema='Path:STRING, Value:FLOAT, Timestamp:TIMESTAMP',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

    # Actually run the pipeline (all operations above are deferred).
    result = pipe.run()
    result.wait_until_finish()

    total_invalid_filter = MetricsFilter().with_name('total_invalid')
    query_result = result.metrics().query(total_invalid_filter)
    if query_result['counters']:
        total_invalid_counter = query_result['counters'][0]
        logging.info('number of invalid documents: %d', total_invalid_counter.committed)
    else:
        logging.info('no invalid documents were found')

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

暂时还没有答案

 类似资料:
  • 我们一直在使用服务,在(AWS)中,我们意外删除了一个目录,因此我们认为它应该在中,但是经过查看,尽管处于打开状态,但它并不存在。

  • storage和colab之间的链接如下所示:

  • 我试图设置一个谷歌云存储桶,以便我上传的任何文件都自动gzip,并设置“content-encoding:gzip”。 我尝试了“gsutil defacl set public-read gs://Bucket”,其基础是将Google Cloud Storage Bucket中的所有文件默认设置为public,但没有成功。 有什么想法吗?

  • 我想知道是否有可能从非谷歌App Engine服务器上使用API搜索存储在谷歌云存储中的文件。

  • 我正在看新的谷歌云数据存储,看起来很棒。但有件事我不明白。。。它应该替代谷歌应用引擎数据存储吗?我如何在GAE内部使用它?它们之间有什么区别? 我在Java有一个GAE应用程序,它使用3个实体,每个实体都有数千行,我需要经常做连接...

  • 我在尝试从谷歌云存储下载CSV文件时遇到了一个问题。出于某种原因,它一直以字节而不是可读文本的形式下载文件。当我在Excel中打开下载的CSV时,Excel已经足够智能,可以将其转换为可读文本。我在这里错过了什么?我检查了谷歌的文档,但找不到任何好的信息来完成它们。提前谢谢你! 这是错误:UnicodeDecodeError:“utf-8”编解码器无法解码位置15-16的字节:无效的连续字节