我不是python开发专家,这就是为什么我只列出我的步骤。
我为apache-airflow创建了一个目录~/Desktop/airflow并制作了
export AIRFLOW_HOME=~/桌面/气流
然后我创建了venv使用
python3-m venv~/桌面/气流
结果是
然后我做了
源绑定/激活
Pip3安装apache-airflow==1.10.9
气流initdb
结果是
在我的气流.cfg文件中,我检查了 dags 和插件目录。我在 $AIRFLOW_HOME/Desktop/airflow 中创建了 dags 和插件目录
我启动了气流网络服务器和调度程序,并确保一切正常。
我发现了很多创建气流插件的方法。我尝试了所有可能的方法。我们开始吧。
第一种方法是在(first_plugin)项目中创建一个插件文件夹,然后创建一个python文件(first_operator.py)
import logging
from airflow.operators import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
log = logging.getLogger(__name__)
class FirstOperator(BaseOperator):
@apply_defaults
def __init__(self, *args, **kwargs):
super(FirstOperator, self).__init__(*args, **kwargs)
def execute(self, context):
log.info("Hello World!")
class FirstOperatorPlugin(AirflowPlugin):
name = "first_plugin"
operators = [FirstOperator]
似乎
然后我只需将我的插件文件夹(first_plugin)移动到$ air flow _ HOME/DESKTOP/air flow/plugins,重启airflow webserver和scheduler。
现在是时候使用我的自定义运算符创建自定义 dag了。如何正确导入插件是一个挑战。有很多可能的方法可以导入自定义运算符。我将展示我尝试过的内容。
在Pycharm IDE中导入时,这些方法都没有帮助我。例如
从airflow.operators.first_plugin导入第一操作员
但是我确信如果我忽略导入行并将我的自定义dag放入dages文件夹,它会正常工作。(我试过了)。此外,我决定检查气流日志(在DEBUG模式下)。
当我重新启动airflow webserver时,我会看到哪些日志
我花了2天,仍然没有任何解决方案。可能你们告诉我尝试其他方法。我试过了。
https://www.astronomer.io/guides/airflow-importing-custom-hooks-operators/https://www.astronomer.io/guides/airflow-importing-custom-hooks-operators/
https://pybit.es/introduction-airflow.html
所有这些都是工作方式,但没有一个解决了我的IDE导入问题。
这是适用于我的文件夹结构,请确保自定义操作符位于operator
文件夹中,对于传感器
和
hooks也是如此。初始化。py文件应为空。
plugins
├── __init__.py
├── operators
│ ├── __init__.py
│ ├── glue_crawler_operator.py
│ └── gsheet_to_redshift_operator.py
└── sensors
├── __init__.py
└── glue_crawler_sensor.py
还要检查气流中的
文件指向插件文件夹。plugins_folder
。cfg
例如,为了导入gsheet_to_redshift_operator运算符而不出错,我使用了以下语句:
从 operators.gsheet_to_redshift_operator 导入 GsheetToRSOperator
我们不需要创建文件夹< code >插件和添加< code >自定义操作符。有了Airflow 2.0,我们只需要在项目中创建一个文件夹,并将其导入dag文件中。
airflow_project
+--custom_operator
+--first_operator.py
+--dags
+--dag_1.py
在dag_1.py中,您可以将自定义运算符/钩子导入为:
from custom_operator.first_operator import FirstOperator
我绕过了插件步骤,以一种纯python式的方式完成了它。这似乎也是文档对最新版本(1.10.12)的说法。
这是我的文件夹结构:
AIRFLOW_HOME
├── dags
│ ├── __init__.py
│ └── my_dag.py
└── utils
├── __init__.py
└── custom_operator.py
确保< code>PYTHONPATH包含< code>AIRFLOW_HOME路径。然后你可以像这样导入它:
from custom_operator.hello_operator import HelloOperator
在此处阅读更多内容: https://airflow.apache.org/docs/stable/howto/custom-operator.html#creating-a-custom-operator
我正在制作一个小的原型插件,它每分钟都在增强mob,但这个插件并没有出现在服务器上。 插件作为jar文件导出,并放入服务器的plugins文件夹中。 非常感谢您的帮助。谢谢 我的3个脚本在这里: 主脚本: 听众: 插件。yml:
自定义运算符 struct Vector2D { var x = 0.0 var y = 0.0 } infix operator +++ extension Vector2D { static func +++ (left: Vector2D, right: Vector2D) -> Vector2D { return Vector2D(x: left
问题内容: 我创建了由名称的库这是在我的程序作为 并输出如下功能 当我在主程序中使用库时 尝试使用构建时出现以下错误 奇怪的是,库文件libfastget.a位于pkg文件夹中。 问题答案: 您需要使函数的名称可以大写导出: 用作: 规范中提到:“ 导出的标识符 ”: 标识符可以被导出以允许从另一个包访问它。如果同时满足以下条件,则导出标识符 标识符名称的第一个字符是Unicode大写字母(Uni
我想在任何目录的任何脚本中导入自定义编写的函数,就像在任何脚本中导入请求模块一样。我正在运行Ubuntu和Python 3.9 编辑:我按照本教程完成了我的要求-https://packaging.python.org/tutorials/packaging-projects/
问题内容: 我有一个名为imutils.py的文件,只有一个定义,即abc(),它返回2个整数的总和。 现在,我想在单独的collab文件中使用此定义,但是我无法使用。 我使用的方法是先将文件imutils.py上传到驱动器,然后将其导入并使用定义。错误提示模块’imutils’没有属性’abc’ 要上传,我首先使用2种方法:首先,我使用驱动器GUI上传,然后我也使用代码尝试了上述方法。两种情况均
我正在尝试发送电子邮件时,一个订单被移动到一个自定义状态在Woocommerce。我已经成功地创建了一个插件,它创建了一个自定义状态,订单被分配给自定义状态。 我还成功地在WooCommerce中的Settings->Emails下添加了一个自定义电子邮件模板,并编写了代码,当订单移动到自定义状态时,该代码应该发送电子邮件。 但是,代码不会被触发,电子邮件也不会被发送。 下面是我的代码。 在我编写