当前位置: 首页 > 知识库问答 >
问题:

数据流中的自定义Apache Beam Python版本

越飞语
2023-03-14

我想知道是否有可能在Google数据流中运行一个定制的Apache Beam Python版本。在公共存储库中不可用的版本(撰写本文时为0.6.0和2.0.0)。例如,ApacheBeam的官方存储库中的HEAD版本,或该问题的特定标记。

我知道可以按照官方文件中的说明包装定制包装(例如,私人本地包装)。这里有一些关于如何为其他脚本执行此操作的问题。甚至有一个要点指导这一点。

但是,我还没有获得当前的ApacheBeam开发版本(或标记版本),该版本可以在其官方存储库的主分支中获得,以便打包并将脚本发送到Google数据流。例如,对于最新的可用标记,其PiP处理的链接是:githttps://github.com/apache/beam.git@v2。1.0-RC2#egg=apache#u梁[gcp]

INFO:root:Executing command: ['.../bin/python', '-m', 'pip', 'install', '--download', '/var/folders/nw/m_035l9d7f1dvdbd7rr271tcqkj80c/T/tmpJhCkp8', 'apache-beam==2.1.0', '--no-binary', ':all:', '--no-deps']
DEPRECATION: pip install --download has been deprecated and will be removed in the future. Pip now has a download command that should be used instead.
Collecting apache-beam==2.1.0
  Could not find a version that satisfies the requirement apache-beam==2.1.0 (from versions: 0.6.0, 2.0.0)
No matching distribution found for apache-beam==2.1.0

有什么想法吗?(我想知道这是否可能,因为Google Dataflow可能已经修复了可以运行到官方发布版本的Apache Beam版本)。


共有1个答案

农存
2023-03-14

当我在一家Apache Beam的JIRA上得到这个问题的答案时,我会回答自己。

如果您想在Google Cloud Dataflow中使用自定义Apache Beam Python版本(即,使用--runerDataflowRunner运行管道),则必须使用选项--sdk_location

例如,在撰写本文时,如果您已经签出了ApacheBeam的git存储库的HEAD版本,那么您必须首先使用cd Beam/sdks/Python导航到Python SDK来打包存储库,然后运行Python安装程序。py sdist(压缩的tar文件将在dist子目录中创建)。

此后,您可以像这样运行管道:

python your_pipeline.py [...your_options...] --sdk_location beam/sdks/python/dist/apache-beam-2.2.0.dev0.tar.gz

谷歌云数据流将使用提供的SDK。

 类似资料:
  • 从这个链接中,我发现Google Cloud Dataflow使用Docker容器为其工作人员:Google Cloud Dataflow实例的图像 我看到有可能找到docker容器的映像名称。 但是,有没有一种方法可以获取这个docker容器(即从哪个存储库获取它?),修改它,然后指示我的数据流作业使用这个新的docker容器? 我问的原因是我们需要在我们的docker上安装各种C和Fortra

  • 我尝试在Google Cloud数据流中运行Apache Beam管道(Python),由Google Cloud Coomposer中的DAG触发。 我的dags文件夹在各自的GCS桶中的结构如下: setup.py是非常基本的,但是根据Apache Beam文档和SO上的答案: 在DAG文件(dataflow.py)中,我设置了选项并将其传递给Dataflow: 在管道文件(pipeline.

  • 不仅仅是falcon-agent采集的数据可以push到监控系统,一些场景下,我们自定义的一些数据指标,也可以push到open-falcon中,比如: 线上某服务的qps 某业务的在线人数 某个接⼝的响应时间 某个⻚面的状态码(500、200) 某个接⼝的请求出错次数 某个业务的每分钟的收⼊统计 ...... 一个shell脚本编写的,自定义push数据到open-falcon的例子 # 注意,

  • 我正试图通过PipelineOptions自定义用于不同GCP调用的用户代理。setUserAgent。 然而,它似乎总是回到。 看看Beam代码库,用户代理似乎是由数据流转轮强制的:https://github.com/apache/beam/blob/ce9ee0b034cef66ea3845ca049770b9a354a4fd4/runners/google-cloud-dataflow-j

  • 问题内容: 我正在使用以Python编写的mapper和reducer在Hadoop中(在Amazon EMR上)运行流作业。我想知道如果我在Java中实现相同的mapper和reducer(或使用Pig),将会获得的速度提升。 特别是,我正在寻找人们从流媒体迁移到自定义jar部署和/或Pig的经验,以及包含这些选项的基准比较的文档。我找到了这个问题,但是答案对我来说不够具体。我不是要在Java和