当前位置: 首页 > 知识库问答 >
问题:

Python dataflow DoFN类函数finish_bundle运行多次并提供空输出

唐默
2023-03-14

我正在运行一个数据流管道,其中我必须将数据整理成一个Python数据流,以便在下一步中使用。因此,我使用dofn类并定义__init__processfinish_bundle函数,如下所示。我希望得到一个输出,将所有条目整理成一个数据目录。我将此输出作为单例端输入提供给管道中的下一步。

class collate_ga_data(beam.DoFn):
    def __init__(self):
        self._ga_data = pd.DataFrame()
        self.window = beam.window.GlobalWindow()
        logging.info("In INITIALIZATION :   {0}".format(self.window))

    def process(self, element,window=beam.DoFn.WindowParam):
        self.window = window
        logging.info("In PROCESS :   {0}".format(self.window))
        self._ga_data=self._ga_data.append(pd.DataFrame({k: [v] for k, v in element.items()}))

    def finish_bundle(self):
        logging.info(" The shape of ga_dataset imported  :  {0}".format(self._ga_data.shape))
        logging.info("In FINISH BUNDLE :   {0}".format(self.window))
        yield WindowedValue(self._ga_data,0,windows=[self.window])
File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 178, in execute
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 612, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/common.py", line 824, in apache_beam.runners.common.DoFnRunner.finish
  File "apache_beam/runners/common.py", line 808, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 834, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 806, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 398, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 401, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 959, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 849, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 610, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.6/site-packages/apache_beam/transforms/sideinputs.py", line 65, in __getitem__
    _FilteringIterable(self._iterable, target_window), self._view_options)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/pvalue.py", line 443, in _from_runtime_iterable
    len(head), str(head[0]), str(head[1])))
ValueError: PCollection of size 2 with more than one element accessed as a singleton view. First two elements encountered are "Empty DataFrame

上面代码的日志信息如下所示:

2020-02-27T16:32:45.331291913Z  The shape of ga_dataset imported  :  (0, 0) I 
2020-02-27T16:32:45.331489801Z In FINISH BUNDLE :   GlobalWindow I 
2020-02-27T16:32:45.390583276Z  The shape of ga_dataset imported  :  (0, 0) I 
2020-02-27T16:32:45.390754222Z In FINISH BUNDLE :   GlobalWindow I 
2020-02-27T16:32:48.639126300Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.641757011Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.644909381Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.647359848Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.649686336Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.651899814Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.654145240Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.656555175Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.658823966Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.660887002Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.663397789Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.665476560Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.667604684Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.669671535Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.672025680Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.674037218Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.676348209Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.678587436Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.680708885Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.682787656Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.685523986Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.687734365Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.689816713Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.691826343Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.693920373Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.696102380Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.698341846Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.700649023Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.703155755Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.705482244Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.707590818Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.709594726Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.711608886Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.713906288Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.716273546Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.718636035Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.720866918Z In PROCESS :   GlobalWindow I 
2020-02-27T16:32:48.723044872Z  The shape of ga_dataset imported  :  (37, 8) I 
2020-02-27T16:32:48.723157405Z In FINISH BUNDLE :   GlobalWindow I 

Dataflow只使用一个工人来完成这个管道..有人知道为什么会这样吗?

共有1个答案

戚均
2023-03-14

根据光束执行模型,“将集合划分为束是任意的,由运行者选择。”这就是为什么可以多次调用finish_bundle。

看起来您的问题可以通过使用CombineGlobally的CombineFn更好地解决,而不是使用DataFrame作为累加器。请查看Beam编程指南(4.2.4.合并)以获得如何实现它的说明。

 类似资料:
  • 0.9.2 新版功能. 封装字符串,提供 ANSI 色彩输出的函数。 本模块中的所有函数均返回包裹对应色彩 ANSI 字符的 text 字符串。 例如,在支持 ANSI 的终端中打印绿色文字: from fabric.colors import green print(green("This text is green!")) 这些函数返回值都是修改后的字符串,因此你也可以嵌套使用它们: fro

  • 我正在使用TestNG和Selenium WebDriver为webapp编写测试套件。 我需要的是为不同的经过身份验证的用户多次运行测试类的所有方法(数据在运行时从数据库中取出)。因此,我需要将一个数据传递给带有数据提供者的类,该类提供所有用户的凭据,然后为每个用户创建一个web驱动程序(为了实验的纯度),并在方法中使用提供的凭据进行验证,运行该类的所有方法,并制作一个分解关闭web驱动程序。

  • 我试图通过一个列表框将值发送到存储过程,该列表框包含我从数据库中获取的国家名称。如果我选择一个选项,代码100%工作100%。但如果我放置2或3个选项,我会得到以下错误: 参数@stIdCity提供了多次。 第322行:da。填充(ds); 完整代码: 存储过程: 列表图像: 请帮助解决这个问题。 对不起,如果问题重复了,但我找不到解决办法。

  • 我想在类中与数据提供者并行运行测试方法。我需要一个数据提供者,它每次在新测试方法开始为给定的测试运行生成部分动态数据之前都会被调用。让我用伪代码解释一下: 我怎样才能做到这一点?

  • 问题内容: 我有一个ajax javascript方法,可从页面等中提取数据。 我希望此过程按一定的时间间隔运行,例如每分钟。但是我不希望它永远循环,所以最多可以循环3次。 实现此目的的最佳方法是什么? 问题答案: 像这样:

  • 我做了一个主菜单JFrame,它可以生成4个不同的新框架来表示每个菜单。主菜单框架工作正常。但是我发现它不能从新框架文本字段中获取文本。这是错误报告和源代码。请让我知道如何修复它。 java线程“AWT-EventQueue-0”中出现异常。电话簿上的lang.NullPointerException。在javax上执行的操作(PhoneBook.java:166)。摆动抽象按钮。fireActi