我正在尝试从Google扳手数据库中读取一个表,并将其写入文本文件以使用python sdk和google dataflow进行备份。我写了以下脚本:
from __future__ import absolute_import
import argparse
import itertools
import logging
import re
import time
import datetime as dt
import logging
import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.io import WriteToText
from apache_beam.io.range_trackers import OffsetRangeTracker, UnsplittableRangeTracker
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions, SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from google.cloud.spanner.client import Client
from google.cloud.spanner.keyset import KeySet
BUCKET_URL = 'gs://my_bucket'
OUTPUT = '%s/output/' % BUCKET_URL
PROJECT_ID = 'my_project'
INSTANCE_ID = 'my_instance'
DATABASE_ID = 'my_db'
JOB_NAME = 'spanner-backup'
TABLE = 'my_table'
class SpannerSource(iobase.BoundedSource):
def __init__(self):
logging.info('Enter __init__')
self.spannerOptions = {
"id": PROJECT_ID,
"instance": INSTANCE_ID,
"database": DATABASE_ID
}
self.SpannerClient = Client
def estimate_size(self):
logging.info('Enter estimate_size')
return 1
def get_range_tracker(self, start_position=None, stop_position=None):
logging.info('Enter get_range_tracker')
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = OffsetRangeTracker.OFFSET_INFINITY
range_tracker = OffsetRangeTracker(start_position, stop_position)
return UnsplittableRangeTracker(range_tracker)
def read(self, range_tracker): # This is not called when using the dataflowRunner !
logging.info('Enter read')
# instantiate spanner client
spanner_client = self.SpannerClient(self.spannerOptions["id"])
instance = spanner_client.instance(self.spannerOptions["instance"])
database = instance.database(self.spannerOptions["database"])
# read from table
table_fields = database.execute_sql("SELECT t.column_name FROM information_schema.columns AS t WHERE t.table_name = '%s'" % TABLE)
table_fields.consume_all()
self.columns = [x[0] for x in table_fields]
keyset = KeySet(all_=True)
results = database.read(table=TABLE, columns=self.columns, keyset=keyset)
# iterator over rows
results.consume_all()
for row in results:
JSON_row = {
self.columns[i]: row[i] for i in range(len(self.columns))
}
yield JSON_row
def split(self, start_position=None, stop_position=None):
# this should not be called since the source is unspittable
logging.info('Enter split')
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = 1
# Because the source is unsplittable (for now), only a single source is returned
yield iobase.SourceBundle(
weight=1,
source=self,
start_position=start_position,
stop_position=stop_position)
def run(argv=None):
"""Main entry point"""
pipeline_options = PipelineOptions()
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = JOB_NAME
google_cloud_options.staging_location = '%s/staging' % BUCKET_URL
google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL
#pipeline_options.view_as(StandardOptions).runner = 'DirectRunner'
pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
p = beam.Pipeline(options=pipeline_options)
output = p | 'Get Rows from Spanner' >> beam.io.Read(SpannerSource())
iso_datetime = dt.datetime.now().replace(microsecond=0).isoformat()
output | 'Store in GCS' >> WriteToText(file_path_prefix=OUTPUT + iso_datetime + '-' + TABLE, file_name_suffix='') # if this line is commented, job completes but does not do anything
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
但是,此脚本只能在DirectRunner上正确运行:当我让它在DataflowRunner上运行时,它会运行一段时间而没有任何输出,然后退出并出现错误:
“执行失败步骤失败14 […]工作流程失败。原因:[…]工人失去了与服务的联系。”
有时,它会一直持续下去,而不会创建输出。
此外,如果我注释“输出= …”行,则作业完成,但实际上并未读取数据。
似乎dataflowRunner调用源的函数“ estimate_size”,而不调用函数“ read”或“ get_range_tracker”。
有人对导致这种情况的原因有任何想法吗?我知道有一个(更完整的)带有实验性扳手源/接收器的Java SDK,但如果可能的话,我宁愿使用python。
谢谢
Google当前添加了对Backup Spanner和Dataflow的支持,您可以在创建DataFlow作业时选择相关模板。
有关更多信息:https :
//cloud.google.com/blog/products/gcp/cloud-spanner-adds-import-export-
functionality-to-ease-data-
movement
理想情况下,我希望使用实例id、项目名称、数据库名称、一个服务帐户电子邮件和一个p12文件的组合,但对其他身份验证选项是开放的。 编辑:在尝试策略时,我生成了这个日志文件,以备不时之需https://gist.github.com/aryeh-looker/e6b1b1617d301f0a247463216c96535d
我需要从压缩的GCS文件中解析json数据,因为文件扩展名是。gz,所以它应该由dataflow正确地重新组织和处理,但是作业日志打印出不可读的字符和未处理的数据。当我处理未压缩的数据时,它工作得很好。我使用以下方法映射/解析JSON: 你知道原因是什么吗? 运行时的配置: 输入文件名示例:file.gz,命令gsutil ls-l gs://bucket/input/file.gz grep c
结果如何在工作人员之间分配?是使用查询结果创建一个表,工作人员从中读取页面,还是每个工作人员运行查询并读取不同的页面或。。。怎样
我试图读取一个csv文件目前在谷歌云存储桶到熊猫数据帧。 它显示以下错误消息: 我做错了什么,我无法找到任何不涉及谷歌数据实验室的解决方案?
有关所述功能的说明: Postgres(索引组合)、MySQL(索引合并)和MongoDB(索引交叉)有一个功能,当在where子句中有多列的给定查询没有找到多列索引时,DB使用多个单列索引(索引)。以下是Postgres的文档对此特性的说明--https://www.costgreql.org/docs/8.3/indexes-bitmap-scans.html 链接节选 从8.1版开始,Pos
我尝试运行一个数据流管道,使用DirectPipelineRunner从本地计算机(windows)读取数据,并写入Google云存储。作业失败,出现以下指定FileNotFoundException的错误(因此我认为数据流作业无法读取我的位置)。我正在本地计算机上运行作业,以运行我创建的基于GCP的模板。我可以在GCP数据流仪表板中看到它,但由于以下错误而失败。请帮忙。我还尝试了本地机器的IP或
谷歌地图 LSV默认的对谷歌影像进行了加载,如果需要加载其他的谷歌地图数据,可以通过LSV中直接点击即可加载。 谷歌历史影像 LSV中支持对谷歌历史影响的查看,。通过历史影像,我们可以分析出很多单一影像无法获取的信息。触摸历史,寻找其他时空中发生的事情和变迁。历史影像主要用于进行多时相对比、去云、多角度观察、小路寻找等。 开启前: 开启后:
谷歌地图 LSV默认的对谷歌影像进行了加载,如果需要加载其他的谷歌地图数据,可以通过LSV中直接点击即可加载。 谷歌历史影像 LSV中支持对谷歌历史影响的查看,。通过历史影像,我们可以分析出很多单一影像无法获取的信息。触摸历史,寻找其他时空中发生的事情和变迁。历史影像主要用于进行多时相对比、去云、多角度观察、小路寻找等。 开启前: 开启后: