当我运行Google Cloud Platform网站上可用的示例笔记本Dataflow_Word_count.ipynb时,我可以使用Apache Beam笔记本启动数据流作业,并且该作业成功完成。管道的定义如下。
class ReadWordsFromText(beam.PTransform):
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
| beam.FlatMap(lambda line: re.findall(r'[\w\']+', line.strip(), re.UNICODE)))
p = beam.Pipeline(InteractiveRunner())
words = p | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
counts = (words
| 'count' >> beam.combiners.Count.PerElement())
lower_counts = (words
| "lower" >> beam.Map(lambda word: word.lower())
| "lower_count" >> beam.combiners.Count.PerElement()
如果我使用一个新函数重构提取单词的部分,如下所示
def extract_words(line):
return re.findall(r'[\w\']+', line.strip(), re.UNICODE)
class ReadWordsFromText(beam.PTransform):
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
| beam.FlatMap(lambda line: extract_words(line)))
并运行笔记本我得到以下错误消息:
DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "<ipython-input-3-d48b3d7d5e4f>", line 12, in <lambda>
NameError: name 'extract_words' is not defined
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 638, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "<ipython-input-3-d48b3d7d5e4f>", line 12, in <lambda>
NameError: name 'extract_words' is not defined [while running '[3]: read/FlatMap(<lambda at <ipython-input-3-d48b3d7d5e4f>:12>)']
Note: imports, functions and other variables defined in the global context of your __main__ file of your Dataflow pipeline are, by default, not available in the worker execution environment, and such references will cause a NameError, unless the --save_main_session pipeline option is set to True. Please see https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors for additional documentation on configuring your worker execution environment.
为了处理名称错误,我按照说明添加了以下行
options.view_as(SetupOptions).save_main_session=True
但是当我运行笔记本时,出现了以下错误
DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 760, in run
self._load_main_session(self.local_staging_directory)
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 501, in _load_main_session
pickler.load_session(session_file)
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 307, in load_session
return dill.load_session(file_path)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 368, in load_session
module = unpickler.load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 462, in find_class
return StockUnpickler.find_class(self, module, name)
ModuleNotFoundError: No module named 'IPython'
有没有简单的方法来解决这个问题?
不修改管道的一种解决方法是将函数定义作为类方法放在DoFn定义中。
class ReadWordsFromText(beam.PTransform):
def extract_words(self, line):
return re.findall(r'[\w\']+', line.strip(), re.UNICODE)
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
| beam.FlatMap(lambda line: self.extract_words(line)))
不要使用save_main_会话,而是在readwords fromtext复合转换之外解压提取的单词。以下是一个例子:
def extract_words(line):
return re.findall(r'[\w\']+', line.strip(), re.UNICODE)
class ReadWordsFromText(beam.PTransform):
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
)
words = (p | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
| 'Extract' >> beam.FlatMap(extract_words)
)
counts = (words
| 'count' >> beam.combiners.Count.PerElement())
我们的Google Cloud数据流管道程序调用了一些动态链接到*的库。所以要运行它,我需要设置linux环境变量LD_LIBRARY_PATH。有一种方法可以做到这一点:https://groups.google.com/forum/#!主题/综合。java。程序员/LOu18 OWAVM,但我想知道是否有一种方法可以在执行管道之前使用一些运行shell脚本的作业来实现这一点?
我正在AWS Sagemaker实例上工作Jupyter笔记本。为了方便起见,我编写了一个.py文件,其中包含了定义的几个函数; 这两个函数都存储在名为file1.py的.py文件中。然后在我的笔记本里找到它们,我打了字; 此命令导入了这两个函数。但当我尝试运行函数时; 我得到一个名字错误; 请注意,我在我的jupyter笔记本中已经导入了熊猫作为pd。我知道使用选项 但这迫使我使用该函数编写一个
我有一个简单的控制器,它接受文件路径的JSON字符串,并对这些文件运行spring批处理作业。为了实现spring batch,我遵循了一个教程,该教程最终将在https://github.com/michaelhoffmantech/patter-batch-loader中生成代码。 继续下去,直到它抛出StackOverflowError。 任何关于改变什么来修复此问题的建议或帮助都将不胜感激
我一直在运行基于12月创建的模板的数据流作业,该模板在运行时传递一些参数,没有任何问题。我现在不得不对模板做了一些修改,我似乎在生成一个工作模板时遇到了问题,即使使用和以前一样的beam代码/版本。我的工作只是无限期地挂起-尝试离开一个,大约一个小时后超时。 当然有一个问题,因为即使是我创建空PCollection的第一步也没有成功,它只是说运行。 我已经从函数中抽象出来,以解决问题可能是什么,因
我有一个spring批处理作业,从CSV文件读取并写入数据库。我想让它重新启动。例如,如果在读取文件或写入db时出现异常,导致作业失败,则应从失败的同一点/块重新开始,而不是从头开始读取整个文件。 我正在从一个endpoint触发作业启动器,并在我的控制器中配置了它。 目前,我正在通过控制器将参数(这是一个唯一的标识符/数字)传递给作业参数,以运行新的作业实例。如果作业失败,我将使用与GET请求中
我正在尝试使用airflow的DataflowPythonOperator计划数据流作业。这是我的dag操作员: gcp_conn_id已设置,可以正常工作。错误显示数据流失败,返回代码为1。完整日志如下所示。 gcp_dataflow_hook.py似乎有问题,除了这个没有更多的信息。有没有办法解决这个问题,有没有DataflowPython算子的任何例子?)到目前为止,我找不到任何使用案例)