当前位置: 首页 > 知识库问答 >
问题:

如何在从datalab运行的数据流管道中使用google云存储

爱炯
2023-03-14

我们在datalab中运行了一个Python管道,它从google云存储(导入google.datalab.storage)中的存储桶中读取图像文件。最初我们使用DirectRunner,效果很好,但现在我们尝试使用DataflowRunner,并且出现导入错误。即使在管道运行的函数中包含“import google.datalab.storage”或其任何变体,也会出现错误,例如“没有名为'datalab.storage'的模块”。我们还尝试过使用save_main_会话、requirements_文件和setup_文件标志,但没有成功。我们如何在数据流管道中正确访问云存储桶中的图像文件?

编辑:我最初的错误是由于使用不正确的语法(即“--requirements\u file./requirements.txt”)指定了requirements\u file标志。我想我已经修复了那里的语法,但是现在我得到了一个不同的错误。这是我们试图运行的代码的基本版本——我们有一个管道,可以从谷歌云的存储桶中读取文件。我们有一个带有单元格的datalab笔记本,其中包含以下Python代码:

import apache_beam as beam
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import GoogleCloudOptions
from apache_beam.utils.pipeline_options import StandardOptions
import google.datalab.storage as storage

bucket = "BUCKET_NAME"
shared_bucket = storage.Bucket(bucket)

# Create and set PipelineOptions. 
options = PipelineOptions(flags = ["--requirements_file", "./requirements.txt"])
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "PROJECT_NAME"
google_cloud_options.job_name = 'test-pipeline-requirements'
google_cloud_options.staging_location = 'gs://BUCKET_NAME/binaries'
google_cloud_options.temp_location = 'gs://BUCKET_NAME/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'

def read_file(input_tuple):
  filepath = input_tuple[0]
  shared_object = shared_bucket.object(filepath)
  f = shared_object.read_stream()
  # More processing of f's contents
  return input_tuple

# File paths relative to the bucket
input_tuples = [("FILEPATH_1", "UNUSED_FILEPATH_2")]
p = beam.Pipeline(options = options)
all_files = (p | "Create file path tuple" >> beam.Create(input_tuples))
all_files = (all_files | "Read file" >> beam.FlatMap(read_file))
p.run()

同时,在笔记本的同一目录中有一个名为“requirements.txt”的文件,只有一行

datalab==1.0.1

如果我使用DirectRunner,这段代码可以正常工作。但是,当我使用DataflowRunner时,我在“p.run()”处得到一个被调用的进程错误,堆栈跟踪以以下内容结束:

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/dependency.pyc_populate_requirements_cache(requirements_file,cache_dir)
224'-no-二进制',': all:']
225logging.info('执行命令:%s',cmd_args)
-

/usr/local/lib/python2.7/dist-packages/apache_beam/utils/processes.pyccheck_call(*args,**kwargs)
38如果force_shell:
39 kwargs['shell']=True
---

/usr/lib/python2。7/子流程。pyc in check_call(*popenargs,**kwargs)
538如果cmd为None:
539 cmd=popenargs[0]
--

调用的进程错误:命令“['/usr/bin/python'、'-m'、'pip'、'install'、'-download'、'/tmp/dataflow requirements cache'、'-r'、'./requirements.txt'、'-no binary'、':all:']'返回非零退出状态1

对于pip,“--download”选项似乎已被弃用,但这是apache_beam代码的一部分。我也尝试过用不同的方法来指定“requirements.txt”,有没有“-save\u main\u session”标志,有没有“-setup\u file”标志,但没有骰子。

共有2个答案

华福
2023-03-14

最可能的问题是您需要让Dataflow安装datalab pypi模块。

通常,您可以通过在上传到数据流的requirements.txt文件中列出“数据实验室”来做到这一点。看到https://cloud.google.com/dataflow/pipelines/dependencies-python

呼延承平
2023-03-14

如果pydatalab的唯一用法是从GCS读取,那么我建议使用Dataflow的gcsio。代码示例

def read_file(input_tuple):
  filepath = input_tuple[0]
  with beam.io.gcp.gcsio.GcsIO().open(filepath, 'r') as f:
    # process f content
    pass

# File paths relative to the bucket
input_tuples = [("gs://bucket/file.jpg", "UNUSED_FILEPATH_2")]
p = beam.Pipeline(options = options)
all_files = (p | "Create file path tuple" >> beam.Create(input_tuples))
all_files = (all_files | "Read file" >> beam.FlatMap(read_file))
p.run()

pydatalab相当沉重,因为它更像是与Datalab或Jupyter一起使用的数据探索库。另一方面,Dataflow的GCSIO在管道中本机支持。

 类似资料:
  • 从Google Datalab中的ipython笔记本访问Google Drive中的数据返回: 异常:accessDenied:Access Denied:BigQuery BigQuery:未找到具有Google驱动器作用域的OAuth令牌。 尝试在bq cmd查询Google Sheet Table中运行gCloud配置解决方案”拒绝访问:BigQuery BigQuery:未找到带有Goo

  • 我正在尝试使用DataFlow(Java)将数据从云存储插入到Big Query中。我可以批量上传数据;但是,我想要设置一个流式上传代替。因此,当新对象添加到我的bucket时,它们将被推送到BigQuery。 我已经将PipelineOptions设置为流,并且在GCP控制台UI中显示dataflow管道是流类型的。bucket中的初始文件/对象集被推送到BigQuery。 但是当我向桶中添加新

  • 但是当我运行代码时,我会遇到以下异常: 你有什么想法会导致这种情况吗?

  • 有没有办法从云函数(最好是基于Python的)运行谷歌数据融合管道? 核心要求是,只要GCS存储桶中有新文件,就会执行基于事件的云功能。云函数反过来需要调用一个数据融合管道,将GCS bucket文件加载到BigQuery中。 要执行云功能,我们可以使用以下方法: gcloud函数部署hello_gcs_generic--runtime python37--trigger resource YOU

  • 想知道是否有某种“钩子”来放置apache beam管道关闭时将执行的一段代码(无论出于何种原因-崩溃、取消) 每次数据流停止时,我都需要删除pubsub主题的订阅。

  • tl;dr Apache Beam管道步骤涉及构建docker图像;如何使用谷歌数据流运行这个管道?存在哪些替代方案? 我目前正在尝试使用谷歌的数据流服务和apache梁(python)迈出第一步。 简单的例子很简单,但当外部软件依赖性开始发挥作用时,事情就会让我感到困惑。似乎可以使用自定义docker容器来设置自己的环境[1][2]。虽然这对大多数依赖项来说都很好,但如果依赖项是docker本身