当前位置: 首页 > 知识库问答 >
问题:

为什么Apache Beam python中GroupByKey之后的FlatMap这么慢?

端木渝
2023-03-14

通过一组转换进行筛选和按键分组:

p | 'Group' >> beam.GroupByKey()
  | 'Filter' >> beam.Filter(lambda (key, values): len(list(values)) > 50)
  | 'Unwind' >> beam.FlatMap(lambda (key, values): values)

关于如何让它更好的性能,有什么想法吗?谢谢你的帮助!

共有1个答案

常元章
2023-03-14

这是一个管道的有趣的角落案例。我认为这里的问题在于读取来自groupbykey的数据的方式。让我给你一个关于GBK如何工作的快速总结。

所有大数据系统都实现了在同一关键字的多个元素上实现操作的方法。这在MapReduce中被称为reduce,在其他大数据系统中被称为按键分组或组合。

当您执行GroupByKey转换时,Dataflow需要将单个键的所有元素聚集到同一台机器中。由于同一键的不同元素可能在不同的机器中处理,因此需要以某种方式对数据进行序列化。

def unwind_and_filter((key, values)):
  # This consumes all the data from shuffle
  value_list = list(values)
  if len(value_list) > 50:
    yield value_list

p | 'Group' >> beam.GroupByKey()
  | 'UnwindAndFilter' >> beam.FlatMap(unwind_and_filter)
 类似资料:
  • 我希望能够像这样使用Stream::FlatMap 但我得到以下编译器错误 test.java:25:错误:不兼容类型:无法推断类型变量R ListofStrings.Stream().FlatMap(str->duplicate(str)).Collect(Collectors.ToList()); 为什么这不是Java中flatMap的有效用法?

  • 有人能给我解释一下map和flatMap之间的区别,以及什么是各自的好用例吗? “结果扁平化”是什么意思?它有什么好处?

  • 问题内容: 因此,我在闲逛时使用了递归,我发现使用递归的循环比常规的while循环要慢得多,我想知道是否有人知道为什么。我已经包括了我下面所做的测试: 但是,在上一次测试中,我注意到如果删除该语句,则表明速度略有提高,因此我想知道if语句是否是造成循环速度差异的原因? 问题答案: 您已将函数编写为尾递归。在许多命令式和函数式语言中,这将触发尾部递归消除,在这种情况下,编译器用简单的JUMP替换了C

  • 问题内容: 这是所有编程语言所共有的吗?在进行多次打印后再执行println似乎更快,但是将所有内容移动到字符串中并仅进行打印似乎最快。为什么? 编辑:例如,Java可以在不到一秒钟的时间内找到所有高达100万的质数- 但要进行打印,然后在自己的println中将它们全部输出可能需要几分钟!最多可打印100亿小时! 例如: 问题答案: 速度并不慢,而是由主机操作系统提供的与控制台连接的基础。 您可