我试图创建一个beam管道,在一个PCollection上同时应用多个ParDo转换,并在列表中收集和打印所有结果。到目前为止,我经历了顺序的过程,就像第一个帕尔多,然后第二个帕尔多。下面是我为我的问题准备的一个例子:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
p = beam.Pipeline(options=PipelineOptions())
class Tr1(beam.DoFn):
def process(self, number):
number = number + 1
yield number
class Tr2(beam.DoFn):
def process(self, number):
number = number + 2
yield number
def pipeline_test():
numbers = p | "Create" >> beam.Create([1])
tr1 = numbers | "Tr1" >> beam.ParDo(Tr1())
tr2 = numbers | "Tr2" >> beam.ParDo(Tr2())
tr1 | "Print1" >> beam.Map(print)
tr2 | "Print2" >> beam.Map(print)
def main(argv):
del argv
pipeline_test()
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
app.run(main)
转换和元素的调度由用于运行流水线的运行器管理。
运行者通常试图优化图形,并可能按顺序或并行运行某些任务。
在您的示例中,Tr1和Tr2都是无状态的,并应用于相同的输入。在这种情况下,runner通常在同一机器上对同一元素按顺序运行它们。注意,runner仍将并行运行不同的元素。
我已经用Python SDK(Apache Beam Python 3.7 SDK 2.19.0)构建了一个窗口流数据流管道。初始数据的表示如下: 其思想是找出给定窗口中每行号码的平均通话长度。数据作为CSV的行从pub/sub中读取,我向所有行添加一个与该数字的平均调用长度相对应的值: 我使用以下管道: 有什么想法吗?
a)从有界源读取,在数据流中运行时,PCollection的大小可以有多大?b)当处理大数据时,假设PCollection的大约5000万个数据试图查找另一个PCollection的大约1000万个数据。这能做到吗?beam/dataflow的性能有多好?在一个ParDo函数中,假设我们只能传递一个输入并返回一个输出,如何基于两个输入数据集执行查找?我试图查看Dataflow/beam,类似于任何
问题内容: 我是redis的新手,应该使用流水线操作时还是有些困惑,或者应该在发送多个命令时始终使用它? 例如,如果我想一次向Redis服务器发送10条SET命令,我是否应该简单地一个接一个地运行这10条命令,还是应该对它们进行流水线处理? 用管道传输10条SET命令而不是一一发送它们有什么缺点吗? 非常感谢。 问题答案: 当我应该使用流水线 当需要向Redis发送许多命令时,管道用于减少RTT,
Flink社区! 我有一个关于在Flink中连接相同键上的多个流的问题(等连接)。我还是一个新手,正在为我的团队评估Flink,将我们的Spark批处理应用程序迁移到流处理。 注意:我看了FabianHüske的这篇关于加入处理的文章:窥视Apache Flink的引擎室。 为了简化问题,假设您有3个流,每个流都有唯一的记录,可以通过id字段进行键控。对于流中的每条记录,您将在其他流中找到相应的记
面试问题 比如说,我们有一个在Employee表中有200万条记录的表,我们需要削减每个员工10%的工资(需要做一些处理),然后将其保存回collection。你怎样才能有效地做到这一点。 我问他,我们可以使用executor框架来创建多个线程,这些线程可以从表中获取值,然后我们可以处理并将其保存到列表中。 然后他问我,你将如何检查一个记录是否已经被处理,我不知道(如何做)。 甚至我也不确定我是否
问题内容: 线程都是可运行的,并且它们拥有相同的锁。两个线程都可以运行时,它们可以锁定相同的地址吗?那是JRE错误吗? 问题答案: 该问题仅存在于线程转储中。实际上,在任何时间点,锁都仅由一个线程持有。但是,线程转储显示两个具有相同锁的不同线程,因为它不是原子的。 可以使用以下程序轻松重现该行为: