python batch.py --project foobar --job_name foobar-metrics --runner DataflowRunner --staging_location gs://foobar-staging/dataflow --temp_location gs://foobar-staging/dataflow_temp --output foobar.test
我希望作业被上传到Dataflow runner,收集文件列表并迭代每个文件将在运行时在云中进行。我希望能够像读取一个文件一样传递所有文件的内容。
该作业在试图将其提交给云数据流运行器时已经阻塞。
"""A metric sink workflow."""
from __future__ import absolute_import
import json
import argparse
import logging
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
from apache_beam.utils.pipeline_options import GoogleCloudOptions
class ExtractDatapointsFn(beam.DoFn):
"""
Parse json documents and extract the metrics datapoints.
"""
def __init__(self):
super(ExtractDatapointsFn, self).__init__()
self.total_invalid = Metrics.counter(self.__class__, 'total_invalid')
def process(self, element):
"""
Process json that contains metrics of each element.
Args:
element: the element being processed.
Returns:
unmarshaled json for each metric point.
"""
try:
# Catch parsing errors as well as our custom key check.
document = json.loads(element)
if not "DataPoints" in document:
raise ValueError("missing DataPoints")
except ValueError:
self.total_invalid.inc(1)
return
for point in document["DataPoints"]:
yield point
def run(argv=None):
"""
Main entry point; defines and runs the pipeline.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://foobar-sink/*',
help='Input file to process.')
parser.add_argument('--output',
required=True,
help=(
'Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
'or DATASET.TABLE.'))
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(GoogleCloudOptions)
pipe = beam.Pipeline(options=pipeline_options)
# Read the json data and extract the datapoints.
documents = pipe | 'read' >> ReadFromText(known_args.input)
metrics = documents | 'extract datapoints' >> beam.ParDo(ExtractDatapointsFn())
# BigQuery sink table.
_ = metrics | 'write bq' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='Path:STRING, Value:FLOAT, Timestamp:TIMESTAMP',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
# Actually run the pipeline (all operations above are deferred).
result = pipe.run()
result.wait_until_finish()
total_invalid_filter = MetricsFilter().with_name('total_invalid')
query_result = result.metrics().query(total_invalid_filter)
if query_result['counters']:
total_invalid_counter = query_result['counters'][0]
logging.info('number of invalid documents: %d', total_invalid_counter.committed)
else:
logging.info('no invalid documents were found')
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
我们一直在使用服务,在(AWS)中,我们意外删除了一个目录,因此我们认为它应该在中,但是经过查看,尽管处于打开状态,但它并不存在。
storage和colab之间的链接如下所示:
我试图设置一个谷歌云存储桶,以便我上传的任何文件都自动gzip,并设置“content-encoding:gzip”。 我尝试了“gsutil defacl set public-read gs://Bucket”,其基础是将Google Cloud Storage Bucket中的所有文件默认设置为public,但没有成功。 有什么想法吗?
我想知道是否有可能从非谷歌App Engine服务器上使用API搜索存储在谷歌云存储中的文件。
我正在看新的谷歌云数据存储,看起来很棒。但有件事我不明白。。。它应该替代谷歌应用引擎数据存储吗?我如何在GAE内部使用它?它们之间有什么区别? 我在Java有一个GAE应用程序,它使用3个实体,每个实体都有数千行,我需要经常做连接...
我在尝试从谷歌云存储下载CSV文件时遇到了一个问题。出于某种原因,它一直以字节而不是可读文本的形式下载文件。当我在Excel中打开下载的CSV时,Excel已经足够智能,可以将其转换为可读文本。我在这里错过了什么?我检查了谷歌的文档,但找不到任何好的信息来完成它们。提前谢谢你! 这是错误:UnicodeDecodeError:“utf-8”编解码器无法解码位置15-16的字节:无效的连续字节