当前位置: 首页 > 知识库问答 >
问题:

如何从python操作符创建数据流作业?

赏成益
2023-03-14

当我通过命令行运行Beam管道时,使用direct runner或dataflow runner,它工作得很好。。。

例子:

$ python my_pipeline.py --key /path/to/gcp/service/key.json --project gcp_project_name

但是当我尝试使用空气流时,我有两个选项,bash操作符或python操作符。

使用bash操作符会成功,但会限制我使用气流功能的能力。

但是我想做的是作为python操作员运行它。所以我将模块导入到airflow dg文件中,然后作为python操作符运行它。

如果我使用本地运行程序,它也可以正常工作,但是当我将其更改为数据流运行程序时,它在GCP数据流上创建作业后失败,并出现此错误

ImportError:没有名为airflow的模块。箱子cli

让气流从python操作员创建数据流作业,我错过了什么?

共有1个答案

司徒英卓
2023-03-14

好的,这不是一个完美的解决方案,但是你可以使用

DataFlowPythonOperator()

它将运行我们之前提到的完全相同的bash命令。它是一个变通方法,不等于Pythonoperator,但更像是运行一个Bashoperator...在目前的情况下(像xcom)仍然不能使用气流功能的强度...文档

 类似资料:
  • create 当你刚起步或者只是想要测试一些东西时,倾向于从 create() 操作符入手。它接收一个有 observer 参数的函数。在前面的一些章节中已提及过,比如 Observable 包装章节。函数签名如下: Rx.Observable.create([fn]) 示例如下: Rx.Observable.create(observer => { observer.next( 1 );

  • 创建 RxJS 的操作符可以有多种方式。在这个版本的 RxJS 中,性能是首要考虑因素,因此,在此库中将创建的操作符依附到现有结构中的这种方式可能不那么直截了当。这是一个尝试性的文档,以告诉你如何创建一个操作符,这个操作符可以是你自己使用,也可以是用来添加到库中。 想了解如何为本库开发一个自定义操作符,请参见下面。 自动动手来为终端用户提供自定义的操作符 指南 在大多数情况下,用户可能想要创建一个

  • 主要内容:RxJava 创建操作符 介绍,RxJava 创建操作符 示例RxJava 创建操作符 介绍 以下是用于创建 Observable 的运算符。 运算符 描述 Create 从头开始创建一个 Observable 并允许以编程方式调用观察者方法。 Defer 在观察者订阅之前不要创建 Observable。为每个观察者创建一个新的 observable。 Empty/Never/Throw 创建一个行为受限的 Observable。 From 将对象/数据结构

  • 最大字符串为512M,但是大字符串非常不建议。

  • 在我当前的架构中,多个数据流作业在不同阶段被触发,作为ABC框架的一部分,我需要捕获这些作业的作业id作为数据流管道中的审计指标,并在BigQuery中更新它。 如何使用JAVA从管道中获取数据流作业的运行id?有没有我可以使用的现有方法,或者我是否需要在管道中使用google cloud的客户端库?

  • 本章给出关于 Numpy 概述,Numpy 是 Python 中高效数值计算的核心工具。 1.3.1 Numpy 数组对象 1.3.1.1 什么是Numpy以及Numpy数组? 1.3.1.1.1 Numpy数组 Python对象: 高级数值对象:整数、浮点 容器:列表(无成本插入和附加),字典(快速查找) Numpy提供: 对于多维度数组的Python扩展包 更贴近硬件(高效) 为科学计算设计(