我们在datalab中运行了一个Python管道,它从google云存储(导入google.datalab.storage)中的存储桶中读取图像文件。最初我们使用DirectRunner,效果很好,但现在我们尝试使用DataflowRunner,并且出现导入错误。即使在管道运行的函数中包含“import google.datalab.storage”或其任何变体,也会出现错误,例如“没有名为'datalab.storage'的模块”。我们还尝试过使用save_main_会话、requirements_文件和setup_文件标志,但没有成功。我们如何在数据流管道中正确访问云存储桶中的图像文件?
编辑:我最初的错误是由于使用不正确的语法(即“--requirements\u file./requirements.txt”)指定了requirements\u file标志。我想我已经修复了那里的语法,但是现在我得到了一个不同的错误。这是我们试图运行的代码的基本版本——我们有一个管道,可以从谷歌云的存储桶中读取文件。我们有一个带有单元格的datalab笔记本,其中包含以下Python代码:
import apache_beam as beam
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import GoogleCloudOptions
from apache_beam.utils.pipeline_options import StandardOptions
import google.datalab.storage as storage
bucket = "BUCKET_NAME"
shared_bucket = storage.Bucket(bucket)
# Create and set PipelineOptions.
options = PipelineOptions(flags = ["--requirements_file", "./requirements.txt"])
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "PROJECT_NAME"
google_cloud_options.job_name = 'test-pipeline-requirements'
google_cloud_options.staging_location = 'gs://BUCKET_NAME/binaries'
google_cloud_options.temp_location = 'gs://BUCKET_NAME/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'
def read_file(input_tuple):
filepath = input_tuple[0]
shared_object = shared_bucket.object(filepath)
f = shared_object.read_stream()
# More processing of f's contents
return input_tuple
# File paths relative to the bucket
input_tuples = [("FILEPATH_1", "UNUSED_FILEPATH_2")]
p = beam.Pipeline(options = options)
all_files = (p | "Create file path tuple" >> beam.Create(input_tuples))
all_files = (all_files | "Read file" >> beam.FlatMap(read_file))
p.run()
同时,在笔记本的同一目录中有一个名为“requirements.txt”的文件,只有一行
datalab==1.0.1
如果我使用DirectRunner,这段代码可以正常工作。但是,当我使用DataflowRunner时,我在“p.run()”处得到一个被调用的进程错误,堆栈跟踪以以下内容结束:
/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/dependency.pyc_populate_requirements_cache(requirements_file,cache_dir)
224'-no-二进制',': all:']
225logging.info('执行命令:%s',cmd_args)
-
/usr/local/lib/python2.7/dist-packages/apache_beam/utils/processes.pyccheck_call(*args,**kwargs)
38如果force_shell:
39 kwargs['shell']=True
---
/usr/lib/python2。7/子流程。pyc in check_call(*popenargs,**kwargs)
538如果cmd为None:
539 cmd=popenargs[0]
--
调用的进程错误:命令“['/usr/bin/python'、'-m'、'pip'、'install'、'-download'、'/tmp/dataflow requirements cache'、'-r'、'./requirements.txt'、'-no binary'、':all:']'返回非零退出状态1
对于pip,“--download”选项似乎已被弃用,但这是apache_beam代码的一部分。我也尝试过用不同的方法来指定“requirements.txt”,有没有“-save\u main\u session”标志,有没有“-setup\u file”标志,但没有骰子。
最可能的问题是您需要让Dataflow安装datalab pypi模块。
通常,您可以通过在上传到数据流的requirements.txt文件中列出“数据实验室”来做到这一点。看到https://cloud.google.com/dataflow/pipelines/dependencies-python
如果pydatalab的唯一用法是从GCS读取,那么我建议使用Dataflow的gcsio。代码示例:
def read_file(input_tuple):
filepath = input_tuple[0]
with beam.io.gcp.gcsio.GcsIO().open(filepath, 'r') as f:
# process f content
pass
# File paths relative to the bucket
input_tuples = [("gs://bucket/file.jpg", "UNUSED_FILEPATH_2")]
p = beam.Pipeline(options = options)
all_files = (p | "Create file path tuple" >> beam.Create(input_tuples))
all_files = (all_files | "Read file" >> beam.FlatMap(read_file))
p.run()
pydatalab相当沉重,因为它更像是与Datalab或Jupyter一起使用的数据探索库。另一方面,Dataflow的GCSIO在管道中本机支持。
从Google Datalab中的ipython笔记本访问Google Drive中的数据返回: 异常:accessDenied:Access Denied:BigQuery BigQuery:未找到具有Google驱动器作用域的OAuth令牌。 尝试在bq cmd查询Google Sheet Table中运行gCloud配置解决方案”拒绝访问:BigQuery BigQuery:未找到带有Goo
我正在尝试使用DataFlow(Java)将数据从云存储插入到Big Query中。我可以批量上传数据;但是,我想要设置一个流式上传代替。因此,当新对象添加到我的bucket时,它们将被推送到BigQuery。 我已经将PipelineOptions设置为流,并且在GCP控制台UI中显示dataflow管道是流类型的。bucket中的初始文件/对象集被推送到BigQuery。 但是当我向桶中添加新
但是当我运行代码时,我会遇到以下异常: 你有什么想法会导致这种情况吗?
有没有办法从云函数(最好是基于Python的)运行谷歌数据融合管道? 核心要求是,只要GCS存储桶中有新文件,就会执行基于事件的云功能。云函数反过来需要调用一个数据融合管道,将GCS bucket文件加载到BigQuery中。 要执行云功能,我们可以使用以下方法: gcloud函数部署hello_gcs_generic--runtime python37--trigger resource YOU
想知道是否有某种“钩子”来放置apache beam管道关闭时将执行的一段代码(无论出于何种原因-崩溃、取消) 每次数据流停止时,我都需要删除pubsub主题的订阅。
tl;dr Apache Beam管道步骤涉及构建docker图像;如何使用谷歌数据流运行这个管道?存在哪些替代方案? 我目前正在尝试使用谷歌的数据流服务和apache梁(python)迈出第一步。 简单的例子很简单,但当外部软件依赖性开始发挥作用时,事情就会让我感到困惑。似乎可以使用自定义docker容器来设置自己的环境[1][2]。虽然这对大多数依赖项来说都很好,但如果依赖项是docker本身