当前位置: 首页 > 知识库问答 >
问题:

使用Apache Beam笔记本启动数据流作业时处理名称错误

欧阳高昂
2023-03-14

当我运行Google Cloud Platform网站上可用的示例笔记本Dataflow_Word_count.ipynb时,我可以使用Apache Beam笔记本启动数据流作业,并且该作业成功完成。管道的定义如下。

class ReadWordsFromText(beam.PTransform):
    
    def __init__(self, file_pattern):
        self._file_pattern = file_pattern
    
    def expand(self, pcoll):
        return (pcoll.pipeline
                | beam.io.ReadFromText(self._file_pattern)
                | beam.FlatMap(lambda line: re.findall(r'[\w\']+', line.strip(), re.UNICODE)))
    
p = beam.Pipeline(InteractiveRunner())

words = p | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')

counts = (words 
          | 'count' >> beam.combiners.Count.PerElement())

lower_counts = (words
                | "lower" >> beam.Map(lambda word: word.lower())
                | "lower_count" >> beam.combiners.Count.PerElement()

如果我使用一个新函数重构提取单词的部分,如下所示

def extract_words(line):
    return re.findall(r'[\w\']+', line.strip(), re.UNICODE)

class ReadWordsFromText(beam.PTransform):
    
    def __init__(self, file_pattern):
        self._file_pattern = file_pattern
    
    def expand(self, pcoll):
        return (pcoll.pipeline
                | beam.io.ReadFromText(self._file_pattern)
                | beam.FlatMap(lambda line: extract_words(line)))

并运行笔记本我得到以下错误消息:

DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "<ipython-input-3-d48b3d7d5e4f>", line 12, in <lambda>
NameError: name 'extract_words' is not defined

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 638, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "<ipython-input-3-d48b3d7d5e4f>", line 12, in <lambda>
NameError: name 'extract_words' is not defined [while running '[3]: read/FlatMap(<lambda at <ipython-input-3-d48b3d7d5e4f>:12>)']

Note: imports, functions and other variables defined in the global context of your __main__ file of your Dataflow pipeline are, by default, not available in the worker execution environment, and such references will cause a NameError, unless the --save_main_session pipeline option is set to True. Please see https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors for additional documentation on configuring your worker execution environment.

为了处理名称错误,我按照说明添加了以下行

options.view_as(SetupOptions).save_main_session=True

但是当我运行笔记本时,出现了以下错误

DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 760, in run
    self._load_main_session(self.local_staging_directory)
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 501, in _load_main_session
    pickler.load_session(session_file)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 307, in load_session
    return dill.load_session(file_path)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 368, in load_session
    module = unpickler.load()
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 462, in find_class
    return StockUnpickler.find_class(self, module, name)
ModuleNotFoundError: No module named 'IPython'

有没有简单的方法来解决这个问题?

共有2个答案

邢良才
2023-03-14

修改管道的一种解决方法是将函数定义作为类方法放在DoFn定义中。

class ReadWordsFromText(beam.PTransform):

  def extract_words(self, line):
    return re.findall(r'[\w\']+', line.strip(), re.UNICODE)

  def __init__(self, file_pattern):
    self._file_pattern = file_pattern

  def expand(self, pcoll):
    return (pcoll.pipeline
            | beam.io.ReadFromText(self._file_pattern)
            | beam.FlatMap(lambda line: self.extract_words(line)))
苏星宇
2023-03-14

不要使用save_main_会话,而是在readwords fromtext复合转换之外解压提取的单词。以下是一个例子:

def extract_words(line):
    return re.findall(r'[\w\']+', line.strip(), re.UNICODE)

class ReadWordsFromText(beam.PTransform):
    
    def __init__(self, file_pattern):
        self._file_pattern = file_pattern
    
    def expand(self, pcoll):
        return (pcoll.pipeline
                | beam.io.ReadFromText(self._file_pattern)    
                )
    
words = (p | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
           | 'Extract' >> beam.FlatMap(extract_words)
        )
counts = (words 
          | 'count' >> beam.combiners.Count.PerElement())
 类似资料:
  • 我们的Google Cloud数据流管道程序调用了一些动态链接到*的库。所以要运行它,我需要设置linux环境变量LD_LIBRARY_PATH。有一种方法可以做到这一点:https://groups.google.com/forum/#!主题/综合。java。程序员/LOu18 OWAVM,但我想知道是否有一种方法可以在执行管道之前使用一些运行shell脚本的作业来实现这一点?

  • 我正在AWS Sagemaker实例上工作Jupyter笔记本。为了方便起见,我编写了一个.py文件,其中包含了定义的几个函数; 这两个函数都存储在名为file1.py的.py文件中。然后在我的笔记本里找到它们,我打了字; 此命令导入了这两个函数。但当我尝试运行函数时; 我得到一个名字错误; 请注意,我在我的jupyter笔记本中已经导入了熊猫作为pd。我知道使用选项 但这迫使我使用该函数编写一个

  • 我有一个简单的控制器,它接受文件路径的JSON字符串,并对这些文件运行spring批处理作业。为了实现spring batch,我遵循了一个教程,该教程最终将在https://github.com/michaelhoffmantech/patter-batch-loader中生成代码。 继续下去,直到它抛出StackOverflowError。 任何关于改变什么来修复此问题的建议或帮助都将不胜感激

  • 我一直在运行基于12月创建的模板的数据流作业,该模板在运行时传递一些参数,没有任何问题。我现在不得不对模板做了一些修改,我似乎在生成一个工作模板时遇到了问题,即使使用和以前一样的beam代码/版本。我的工作只是无限期地挂起-尝试离开一个,大约一个小时后超时。 当然有一个问题,因为即使是我创建空PCollection的第一步也没有成功,它只是说运行。 我已经从函数中抽象出来,以解决问题可能是什么,因

  • 我有一个spring批处理作业,从CSV文件读取并写入数据库。我想让它重新启动。例如,如果在读取文件或写入db时出现异常,导致作业失败,则应从失败的同一点/块重新开始,而不是从头开始读取整个文件。 我正在从一个endpoint触发作业启动器,并在我的控制器中配置了它。 目前,我正在通过控制器将参数(这是一个唯一的标识符/数字)传递给作业参数,以运行新的作业实例。如果作业失败,我将使用与GET请求中

  • 我正在尝试使用airflow的DataflowPythonOperator计划数据流作业。这是我的dag操作员: gcp_conn_id已设置,可以正常工作。错误显示数据流失败,返回代码为1。完整日志如下所示。 gcp_dataflow_hook.py似乎有问题,除了这个没有更多的信息。有没有办法解决这个问题,有没有DataflowPython算子的任何例子?)到目前为止,我找不到任何使用案例)