当前位置: 首页 > 知识库问答 >
问题:

ModuleNotFoundError:运行GCP数据流作业时没有名为“oracledb”的模块

宰父远
2023-03-14

我们正在尝试使用GCP数据流和Python作业模板连接到Oracle数据库。当我们使用无法访问Internet的特殊子网来运行数据流作业时,我们使用setup.py.从GCS存储桶安装依赖包

下面是使用 setup.py 创建数据流模板的命令行:

< code>python3 -m

依赖项包存储在 GCP 存储桶中,并将复制到数据流工作线程,并在作业运行时安装在数据流工作线程上。对于 Oracle 数据库连接,我们使用从 https://pypi.org/project/oracledb/#files 下载的 oracledb-1.0.3-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl。

当我们尝试使用Cloud Shell和DirectRunner时,它可以成功安装和识别oracledb模块。但是,当数据流作业执行时,它会遇到以下错误:

来自worker的错误消息:Traceback(最近一次调用last):File“/usr/local/lib/python 3.9/site-packages/data flow _ worker/batch worker . py”,第772行,在run self中。_ load _ main _ session(self . local _ staging _ directory)文件"/usr/local/lib/python 3.9/site-packages/data flow _ worker/batch worker . py ",第509行,in _ load _ main _ session pickler . load _ session(session _ File)文件"/usr/local/lib/python 3.9/site-packages/Apache _ beam/internal/pickler . py ",第65行,in load_session返回desired _ pickle _ lib . load _ session(File _ path)文件"/usr

非常感谢您的建议。

setup.py

import os
import logging
import subprocess
import pickle
import setuptools
import distutils

from setuptools.command.install import install as _install

class install(_install):  # pylint: disable=invalid-name
    def run(self):
        self.run_command('CustomCommands')
        _install.run(self)


WHEEL_PACKAGES = [
    'wheel-0.37.1-py2.py3-none-any.whl',
    'oracledb-1.0.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl'
    ]
CUSTOM_COMMANDS = [
    ['sudo', 'apt-get', 'update']
]

class CustomCommands(setuptools.Command):
    """A setuptools Command class able to run arbitrary commands."""

    def initialize_options(self):
        pass

    def finalize_options(self):
        pass

    def run_command(self, command):
        import subprocess
        import logging
        logging.getLogger().setLevel(logging.INFO) 
        status = -9999
        try:
            logging.info('CUSTOM_DATAFLOW_JOB_LOG: started running [{}]'.format(command))
            status = subprocess.call(command)
            if status == 0:
                logging.info('CUSTOM_DATAFLOW_JOB_LOG: [{}] completed successfully'.format(command))
            else:
                logging.error('CUSTOM_DATAFLOW_JOB_LOG: [{}] failed with signal {}'.format(command, status))
        except Exception as e:
            logging.error('CUSTOM_DATAFLOW_JOB_LOG: [{}] caught exception: {}'.format(command, e))
        return status        

    def install_cmd(self):
        result = []
        for p in WHEEL_PACKAGES:
                result.append(['gsutil', 'cp', 'gs://dataflow-execution/python_dependencies/{}'.format(p), '.'])
                result.append(['pip', 'install', '{}'.format(p)])
        return result

    def run(self):
        import logging
        logging.getLogger().setLevel(logging.INFO) 
        try:
            install_cmd = self.install_cmd()
            for command in CUSTOM_COMMANDS:
                status = self.run_command(command)
                if status == 0:
                    logging.info('CUSTOM_DATAFLOW_JOB_LOG: [{}] finished successfully'.format(command))
                else:
                    logging.error('CUSTOM_DATAFLOW_JOB_LOG: [{}] failed with status code {}'.format(command, status))
            for command in install_cmd:
                status = self.run_command(command)
                if status == 0:
                    logging.info('CUSTOM_DATAFLOW_JOB_LOG: [{}] finished successfully'.format(command))
                else:
                    logging.error('CUSTOM_DATAFLOW_JOB_LOG: [{}] failed with status code {}'.format(command, status))
        except Exception as e:
            logging.error('CUSTOM_DATAFLOW_JOB_LOG: [{}] caught exception: {}'.format(command, e))


REQUIRED_PACKAGES = [
]

print("======\nRunning setup.py\n==========")
setuptools.setup(
    name='main_setup',
    version='1.0.0',
    description='DataFlow worker',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    cmdclass={
        'install': install,
        'CustomCommands': CustomCommands,
        }
    )```

共有2个答案

杨乐意
2023-03-14

为什么子网没有接入互联网?你可以在Google Cloud上创建一个路由器和一个网关(Cloud NAT gateways ),这样就不会将(数据流)VM IP对外暴露给互联网。

路由器是为 VPC 网络创建的(您的子网位于此 VPC 中):

并且NAT网关是用以前的路由器创建的。

然后从PyPi和setup.py文件下载您的包会非常容易。安装文件中PyPi中的oracledb包示例:

from glob import glob

from setuptools import find_packages, setup

setup(
    name="lib",
    version="0.0.1",
    install_requires=['oracledb==1.0.3'],
    packages=find_packages(),
)

然后< code>Dataflow将在workers中安装该包,不会出现问题。

邹英发
2023-03-14

您是否已验证数据流工作线程肯定有权访问该gcs存储桶?这可能会在这里造成问题。

一般来说,我相信这种事情的推荐路径是使用--extra_package标志 - https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#local-or-nonpypi - 你可能会有更多的运气这样做。

 类似资料:
  • 我试图保存熊猫数据框的内容到Excel文件在windows/azure数据库 df=pd。数据帧({'Data':[10,20,30,20,15,30,45]}) 作者=pd.ExcelWriter('pandas_simple.xlsx',引擎='xlsxWriter') df。到excel(编写器,工作表\u name='Sheet1') 作家保存() 错误 ModuleNotFoundErr

  • 问题内容: 我正在尝试导入“火炬”包。同样,我尝试使用如下pip命令安装它,安装甚至开始,但几秒钟后出现错误 下面是我执行的命令 我得到的错误: 操作系统: Windows。 IDE :pyCharm 我获得了与此问题相关的唯一链接,但无法解释它。 https://www.gitmemory.com/torch 问题答案: 任何正在寻找解决方案的人请参考以下内容: 似乎安装割炬的命令无法正常工作,

  • 问题内容: 完成我的一个Flask项目后,我像其他人一样将其上传到了github。在2-3个月的时间后,我将整个githube存储库下载到另一台计算机上以运行它。但是,该应用程序无法运行,因为未找到给出以下消息的软件包 ModuleNotFoundError:没有名为“ Flask”的模块 因此,我最终下载了从Flask,SQLalchemy等所有软件包。但我被困在: 有人可以帮忙解决这个问题吗?

  • 我安装了Crypto模块和SHA256,但显示ModuleNotFoundError:- 回溯(最后一次调用):文件“Digitalsig.py”,第1行,来自加密。哈希导入SHA256 ModuleNotFoundError:没有名为“Crypto”的模块 这是参考代码

  • 我目前试图安装一个名为pyvjoy的python模块(https://github.com/tidzo/pyvjoy),但我在导入后遇到一个错误 以下是错误: 回溯(最后一次调用):导入pyvjoy模块中导入PlayHelper文件“C:\Users\Slay\Desktop\RLBot master\PlayHelper.py”第1行第5行的文件“runner.py”错误:没有名为“pyvjoy

  • 我不能导入在PyCharm IDE在Mac上。我尝试过使用、、和简易安装来安装和卸载Keras,但都没有成功。我尝试过更换解释器(Python 2.7和3.6),但都不起作用。 在终端中,当我运行时: pip3列表|grep-i keras 我得到: Keras 2.2.2 Keras-应用程序1.0.4 Keras-预处理1.0.2 我认为这意味着我的Keras安装是成功的。我还通过以下方式检查