我尝试在Google Cloud数据流中运行Apache Beam管道(Python),由Google Cloud Coomposer中的DAG触发。
我的dags文件夹在各自的GCS桶中的结构如下:
/dags/
dataflow.py <- DAG
dataflow/
pipeline.py <- pipeline
setup.py
my_modules/
__init__.py
commons.py <- the module I want to import in the pipeline
setup.py是非常基本的,但是根据Apache Beam文档和SO上的答案:
import setuptools
setuptools.setup(setuptools.find_packages())
在DAG文件(dataflow.py)中,我设置了setup_file
选项并将其传递给Dataflow:
default_dag_args = {
... ,
'dataflow_default_options': {
... ,
'runner': 'DataflowRunner',
'setup_file': os.path.join(configuration.get('core', 'dags_folder'), 'dataflow', 'setup.py')
}
}
在管道文件(pipeline.py)中,我尝试使用
from my_modules import commons
但这失败了。Google Cloud Composer(Apache Airflow)中的html" target="_blank">日志显示:
gcp_dataflow_hook.py:132} WARNING - b' File "/home/airflow/gcs/dags/dataflow/dataflow.py", line 11\n from my_modules import commons\n ^\nSyntaxError: invalid syntax'
setup.py文件背后的基本思想记录在这里
此外,在SO上也有类似的问题帮助了我:
Google Dataflow-导入自定义python模块失败
Dataflow/apachebeam:管理自定义模块依赖项
我实际上想知道为什么我的管道会出现语法错误
,而不是模块找不到
类型的错误...
我试图重现您的问题,然后尝试解决它,所以我创建了与您已经拥有的相同的文件夹结构:
/dags/
dataflow.py
dataflow/
pipeline.py -> pipeline
setup.py
my_modules/
__init__.py
common.py
因此,为了使其正常工作,我所做的更改是将这些文件夹复制到实例正在运行的位置,以便代码能够找到它,例如在实例的/tmp/
文件夹中。
所以,我的DAG应该是这样的:
1-我首先声明我的论点:
default_args = {
'start_date': datetime(xxxx, x, x),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'dataflow_default_options': {
'project': '<project>',
'region': '<region>',
'stagingLocation': 'gs://<bucket>/stage',
'tempLocation': 'gs://<bucket>/temp',
'setup_file': <setup.py>,
'runner': 'DataflowRunner'
}
}
2-在此之后,我创建了DAG,在运行数据流任务之前,我将上面创建的整个文件夹目录复制到实例任务t1
的/tmp/
文件夹中,然后,我从/tmp/目录任务t2
运行管道:
with DAG(
'composer_df',
default_args=default_args,
description='datflow dag',
schedule_interval="xxxx") as dag:
def copy_dependencies():
process = subprocess.Popen(['gsutil','cp', '-r' ,'gs://<bucket>/dags/*',
'/tmp/'])
process.communicate()
t1 = python_operator.PythonOperator(
task_id='copy_dependencies',
python_callable=copy_dependencies,
provide_context=False
)
t2 = DataFlowPythonOperator(task_id="composer_dataflow",
py_file='/tmp/dataflow/pipeline.py', job_name='job_composer')
t1 >> t2
这就是我创建DAG文件数据流的方式。py
,然后,在管道中。py要导入的包如下所示:
from my_modules import commons
它应该可以正常工作,因为VM可以理解文件夹目录。
问题内容: 我有一个名为imutils.py的文件,只有一个定义,即abc(),它返回2个整数的总和。 现在,我想在单独的collab文件中使用此定义,但是我无法使用。 我使用的方法是先将文件imutils.py上传到驱动器,然后将其导入并使用定义。错误提示模块’imutils’没有属性’abc’ 要上传,我首先使用2种方法:首先,我使用驱动器GUI上传,然后我也使用代码尝试了上述方法。两种情况均
我想在任何目录的任何脚本中导入自定义编写的函数,就像在任何脚本中导入请求模块一样。我正在运行Ubuntu和Python 3.9 编辑:我按照本教程完成了我的要求-https://packaging.python.org/tutorials/packaging-projects/
主要内容:自定义模块编写说明文档到目前为止,读者已经掌握了导入 Python 标准库并使用其成员(主要是函数)的方法,接下来要解决的问题是,怎样自定义一个模块呢? 前面章节中讲过,Python 模块就是 Python 程序,换句话说,只要是 Python 程序,都可以作为模块导入。例如,下面定义了一个简单的模块(编写在 demo.py 文件中): 可以看到,我们在 demo.py 文件中放置了变量(name 和 add)、函数(
我想知道是否有可能在Google数据流中运行一个定制的Apache Beam Python版本。在公共存储库中不可用的版本(撰写本文时为0.6.0和2.0.0)。例如,ApacheBeam的官方存储库中的HEAD版本,或该问题的特定标记。 我知道可以按照官方文件中的说明包装定制包装(例如,私人本地包装)。这里有一些关于如何为其他脚本执行此操作的问题。甚至有一个要点指导这一点。 但是,我还没有获得当
我正在使用Ubuntu14.04计算机,试图将google.protobuf模块导入到Python2.7中。 我已经试过了 和 没有成功。 在python内部,我得到一个错误,即: “导入错误:没有名为google.protobuf的模块” 编辑1: 对不起,谢谢大家的评论。我还是Ubuntu和StackOverflow的新手。 具体来说,我输入到命令行 并获得反馈 Traceback(最近的调用