当前位置: 首页 > 知识库问答 >
问题:

Google数据流:导入自定义Python模块

淳于健
2023-03-14

我尝试在Google Cloud数据流中运行Apache Beam管道(Python),由Google Cloud Coomposer中的DAG触发。

我的dags文件夹在各自的GCS桶中的结构如下:

/dags/
  dataflow.py <- DAG
  dataflow/
    pipeline.py <- pipeline
    setup.py
    my_modules/
      __init__.py
      commons.py <- the module I want to import in the pipeline

setup.py是非常基本的,但是根据Apache Beam文档和SO上的答案:

import setuptools

setuptools.setup(setuptools.find_packages())

在DAG文件(dataflow.py)中,我设置了setup_file选项并将其传递给Dataflow:

default_dag_args = {
    ... ,
    'dataflow_default_options': {
        ... ,
        'runner': 'DataflowRunner',
        'setup_file': os.path.join(configuration.get('core', 'dags_folder'), 'dataflow', 'setup.py')
    }
}

在管道文件(pipeline.py)中,我尝试使用

from my_modules import commons

但这失败了。Google Cloud Composer(Apache Airflow)中的html" target="_blank">日志显示:

gcp_dataflow_hook.py:132} WARNING - b'  File "/home/airflow/gcs/dags/dataflow/dataflow.py", line 11\n    from my_modules import commons\n           ^\nSyntaxError: invalid syntax'

setup.py文件背后的基本思想记录在这里

此外,在SO上也有类似的问题帮助了我:

Google Dataflow-导入自定义python模块失败

Dataflow/apachebeam:管理自定义模块依赖项

我实际上想知道为什么我的管道会出现语法错误,而不是模块找不到类型的错误...

共有1个答案

濮翰学
2023-03-14

我试图重现您的问题,然后尝试解决它,所以我创建了与您已经拥有的相同的文件夹结构:

/dags/
  dataflow.py
  dataflow/
     pipeline.py -> pipeline
     setup.py
     my_modules/
        __init__.py
        common.py

因此,为了使其正常工作,我所做的更改是将这些文件夹复制到实例正在运行的位置,以便代码能够找到它,例如在实例的/tmp/文件夹中。

所以,我的DAG应该是这样的:

1-我首先声明我的论点:

default_args = {
   'start_date': datetime(xxxx, x, x),
   'retries': 1,
   'retry_delay': timedelta(minutes=5),
   'dataflow_default_options': {
       'project': '<project>',
       'region': '<region>',
       'stagingLocation': 'gs://<bucket>/stage',
       'tempLocation': 'gs://<bucket>/temp',
       'setup_file': <setup.py>,
       'runner': 'DataflowRunner'
   }
} 

2-在此之后,我创建了DAG,在运行数据流任务之前,我将上面创建的整个文件夹目录复制到实例任务t1/tmp/文件夹中,然后,我从/tmp/目录任务t2运行管道:

with DAG(
    'composer_df',
     default_args=default_args,
     description='datflow dag',
     schedule_interval="xxxx") as dag:

     def copy_dependencies():
          process = subprocess.Popen(['gsutil','cp', '-r' ,'gs://<bucket>/dags/*', 
          '/tmp/'])
          process.communicate()


     t1 = python_operator.PythonOperator(
        task_id='copy_dependencies',
        python_callable=copy_dependencies,
        provide_context=False
     )


     t2 = DataFlowPythonOperator(task_id="composer_dataflow", 
          py_file='/tmp/dataflow/pipeline.py', job_name='job_composer')

     t1 >> t2

这就是我创建DAG文件数据流的方式。py,然后,在管道中。py要导入的包如下所示:

from my_modules import commons

它应该可以正常工作,因为VM可以理解文件夹目录。

 类似资料:
  • 问题内容: 我有一个名为imutils.py的文件,只有一个定义,即abc(),它返回2个整数的总和。 现在,我想在单独的collab文件中使用此定义,但是我无法使用。 我使用的方法是先将文件imutils.py上传到驱动器,然后将其导入并使用定义。错误提示模块’imutils’没有属性’abc’ 要上传,我首先使用2种方法:首先,我使用驱动器GUI上传,然后我也使用代码尝试了上述方法。两种情况均

  • 我想在任何目录的任何脚本中导入自定义编写的函数,就像在任何脚本中导入请求模块一样。我正在运行Ubuntu和Python 3.9 编辑:我按照本教程完成了我的要求-https://packaging.python.org/tutorials/packaging-projects/

  • 主要内容:自定义模块编写说明文档到目前为止,读者已经掌握了导入 Python 标准库并使用其成员(主要是函数)的方法,接下来要解决的问题是,怎样自定义一个模块呢? 前面章节中讲过,Python 模块就是 Python 程序,换句话说,只要是 Python 程序,都可以作为模块导入。例如,下面定义了一个简单的模块(编写在 demo.py 文件中): 可以看到,我们在 demo.py 文件中放置了变量(name 和 add)、函数(

  • 我想知道是否有可能在Google数据流中运行一个定制的Apache Beam Python版本。在公共存储库中不可用的版本(撰写本文时为0.6.0和2.0.0)。例如,ApacheBeam的官方存储库中的HEAD版本,或该问题的特定标记。 我知道可以按照官方文件中的说明包装定制包装(例如,私人本地包装)。这里有一些关于如何为其他脚本执行此操作的问题。甚至有一个要点指导这一点。 但是,我还没有获得当

  • 我正在使用Ubuntu14.04计算机,试图将google.protobuf模块导入到Python2.7中。 我已经试过了 和 没有成功。 在python内部,我得到一个错误,即: “导入错误:没有名为google.protobuf的模块” 编辑1: 对不起,谢谢大家的评论。我还是Ubuntu和StackOverflow的新手。 具体来说,我输入到命令行 并获得反馈 Traceback(最近的调用