当前位置: 首页 > 知识库问答 >
问题:

在Apache Beam/Dataflow中跨越多个文件的流水线代码

喻嘉泽
2023-03-14
juliaset/__init__.py
juliaset/juliaset.py # actual code
juliaset/some_conf.py
__init__.py
juliaset_main.py
setup.py
INFO:root:2017-12-15T17:34:09.333Z: JOB_MESSAGE_ERROR: (8cdf3e226105b90a): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 706, in run
    self._load_main_session(self.local_staging_directory)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 446, in _load_main_session
    pickler.load_session(session_file)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 247, in load_session
    return dill.load_session(file_path)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 363, in load_session
    module = unpickler.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1133, in load_reduce
    value = func(*args)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 767, in _import_module
    return getattr(__import__(module, None, None, [obj]), obj)
ImportError: No module named package_name.juliaset.some_conf

一个完整的工作示例将非常感谢!

共有1个答案

尹钱青
2023-03-14

是否可以验证setup.py是否包含如下结构:

import setuptools

setuptools.setup(
    name='My Project',
    version='1.0',
    install_requires=[],
    packages=setuptools.find_packages(),
)

从juliaset导入模块,如。juliaset导入someclass

在调用Python脚本时,使用Python-m juliaset_main(没有。py)

 类似资料:
  • 6.1. 文件系统跨越 无论你用什么方法使用文件,你都要在某个地方指定文件名。在很多情况下,文件名会作为fopen()函数的一个参数,同时其它函数会调用它返回的句柄: <?php $handle = fopen('/path/to/myfile.txt', 'r'); ?> 当你把被污染数据作为文件名的一部分时,漏洞就产生了: <?php $handle = fopen("/path/to/{$_

  • 我有一个Java程序,需要在多个类的多个点上从控制台获取用户输入。我试着在每节课上使用一台扫描仪,但当我关闭一台扫描仪时,它会关闭系统。所以我想在整个程序中使用相同的扫描仪。我在主类中打开了扫描仪,但在其他类中如何使用相同的扫描仪?

  • 我有一个非常基本的Python Dataflow工作,从pub/sub读取一些数据,应用FixedWindow并写入Google Cloud Storage。 输出被写入--output中特定的位置,但只写入临时阶段,即。 当进一步测试时,我注意到streaming_wordcount示例也有同样的问题,但是标准wordcount示例写得很好。也许问题在于开窗,或者从PubSub阅读? Write

  • 我已经用Python SDK(Apache Beam Python 3.7 SDK 2.19.0)构建了一个窗口流数据流管道。初始数据的表示如下: 其思想是找出给定窗口中每行号码的平均通话长度。数据作为CSV的行从pub/sub中读取,我向所有行添加一个与该数字的平均调用长度相对应的值: 我使用以下管道: 有什么想法吗?

  • 我试图创建一个beam管道,在一个PCollection上同时应用多个ParDo转换,并在列表中收集和打印所有结果。到目前为止,我经历了顺序的过程,就像第一个帕尔多,然后第二个帕尔多。下面是我为我的问题准备的一个例子:

  • 但是,共享上下文还有其他很好的理由,其中之一(IMHO)是上下文必须跟踪实体的状态,如果您要获得一个实体,处理该上下文,对该实体进行一些修改,然后附加到一个新的上下文,这个新的上下文必须访问数据库,以便它能够计算出该实体的状态。同样,如果您正在处理实体(发票和它们的所有InvoiceItems)的图,那么新的上下文必须获取图中的所有实体来确定它们的状态。 但现在我用这种建筑撞上了单行道! 如果我必