当前位置: 首页 > 知识库问答 >
问题:

Apache Beam流水线中在一个PCollection上同时应用多个pTransform

晋安国
2023-03-14

我试图创建一个beam管道,在一个PCollection上同时应用多个ParDo转换,并在列表中收集和打印所有结果。到目前为止,我经历了顺序的过程,就像第一个帕尔多,然后第二个帕尔多。下面是我为我的问题准备的一个例子:

import apache_beam as beam

from apache_beam.options.pipeline_options import PipelineOptions

p = beam.Pipeline(options=PipelineOptions())

class Tr1(beam.DoFn):
  def process(self, number):
    number = number + 1
    yield number

class Tr2(beam.DoFn):
  def process(self, number):
    number = number + 2
    yield number

def pipeline_test():

  numbers =  p | "Create" >> beam.Create([1])
  tr1 = numbers  | "Tr1" >> beam.ParDo(Tr1())
  tr2 = numbers  | "Tr2" >> beam.ParDo(Tr2())

  tr1 | "Print1" >> beam.Map(print)
  tr2 | "Print2" >> beam.Map(print) 

def main(argv):
  del argv

  pipeline_test()

  result = p.run()
  result.wait_until_finish()
if __name__ == '__main__':
  app.run(main)

共有1个答案

锺离玮
2023-03-14

转换和元素的调度由用于运行流水线的运行器管理。

运行者通常试图优化图形,并可能按顺序或并行运行某些任务。

在您的示例中,Tr1和Tr2都是无状态的,并应用于相同的输入。在这种情况下,runner通常在同一机器上对同一元素按顺序运行它们。注意,runner仍将并行运行不同的元素。

 类似资料:
  • 我已经用Python SDK(Apache Beam Python 3.7 SDK 2.19.0)构建了一个窗口流数据流管道。初始数据的表示如下: 其思想是找出给定窗口中每行号码的平均通话长度。数据作为CSV的行从pub/sub中读取,我向所有行添加一个与该数字的平均调用长度相对应的值: 我使用以下管道: 有什么想法吗?

  • a)从有界源读取,在数据流中运行时,PCollection的大小可以有多大?b)当处理大数据时,假设PCollection的大约5000万个数据试图查找另一个PCollection的大约1000万个数据。这能做到吗?beam/dataflow的性能有多好?在一个ParDo函数中,假设我们只能传递一个输入并返回一个输出,如何基于两个输入数据集执行查找?我试图查看Dataflow/beam,类似于任何

  • 问题内容: 我是redis的新手,应该使用流水线操作时还是有些困惑,或者应该在发送多个命令时始终使用它? 例如,如果我想一次向Redis服务器发送10条SET命令,我是否应该简单地一个接一个地运行这10条命令,还是应该对它们进行流水线处理? 用管道传输10条SET命令而不是一一发送它们有什么缺点吗? 非常感谢。 问题答案: 当我应该使用流水线 当需要向Redis发送许多命令时,管道用于减少RTT,

  • Flink社区! 我有一个关于在Flink中连接相同键上的多个流的问题(等连接)。我还是一个新手,正在为我的团队评估Flink,将我们的Spark批处理应用程序迁移到流处理。 注意:我看了FabianHüske的这篇关于加入处理的文章:窥视Apache Flink的引擎室。 为了简化问题,假设您有3个流,每个流都有唯一的记录,可以通过id字段进行键控。对于流中的每条记录,您将在其他流中找到相应的记

  • 面试问题 比如说,我们有一个在Employee表中有200万条记录的表,我们需要削减每个员工10%的工资(需要做一些处理),然后将其保存回collection。你怎样才能有效地做到这一点。 我问他,我们可以使用executor框架来创建多个线程,这些线程可以从表中获取值,然后我们可以处理并将其保存到列表中。 然后他问我,你将如何检查一个记录是否已经被处理,我不知道(如何做)。 甚至我也不确定我是否

  • 问题内容: 线程都是可运行的,并且它们拥有相同的锁。两个线程都可以运行时,它们可以锁定相同的地址吗?那是JRE错误吗? 问题答案: 该问题仅存在于线程转储中。实际上,在任何时间点,锁都仅由一个线程持有。但是,线程转储显示两个具有相同锁的不同线程,因为它不是原子的。 可以使用以下程序轻松重现该行为: