当前位置: 首页 > 知识库问答 >
问题:

是否将同一个RDD上的多个减速器编译成单个扫描?

孙帅
2023-03-14

假设我有一个RDD(50M记录/day redu),我想用几种不同的方式来总结。RDD记录是4元组:(保持、foo、bar、baz)

  • 保持-布尔
  • foo条形baz-0/1整数

我想数一下每个< code>foo有多少个

rdd.filter(lambda keep, foo, bar, baz: foo == 1)
   .map(lambda keep, foo, bar, baz: keep, 1)
   .reduceByKey(operator.add)

这将返回(在收集之后)一个列表,如[(True,40000000),(False,10000000)]

问题是:有没有一种简单的方法可以避免扫描< code>rdd三次(对< code>foo 、< code>bar 、< code>baz各扫描一次)?

我的意思不是重写上面的代码来处理所有3个字段的方法,而是告诉Spark在一次传递中处理所有3个管道。

共有2个答案

韩善
2023-03-14

如果我没看错你的问题你想要RDD.aggregate.

val zeroValue = (0L, 0L, 0L, 0L, 0L, 0L) // tfoo, tbar, tbaz, ffoo, fbar, fbaz
rdd.aggregate(zeroValue)(
  (prior, current) => if (current._1) {
    (prior._1 + current._2, prior._2 + current._3, prior._3 + current._4,
      prior._4, prior._5, prior._6)
  } else {
    (prior._1, prior._2, prior._3,
      prior._4 + current._2, prior._5 + current._3, prior._6 + current._4)
  },
  (left, right) =>
    (left._1 + right._1,
      left._2 + right._2,
      left._3 + right._3,
      left._4 + right._4,
      left._5 + right._5,
      left._6 + right._6)
)

聚合在概念上类似于列表上的概念化简函数,但 RDD 不是列表,它们是分布式的,因此您提供了两个函数参数,一个用于对每个分区进行操作,另一个用于组合处理分区的结果。

常乐
2023-03-14

通过使用不同的线程提交作业,可以并行执行三条管道,但这将通过RDD三次,并且需要集群上最多3倍的资源。

通过重写作业以一次处理所有计数,可以一次性完成工作 - 关于聚合的答案是一个选项。将数据成对拆分(keep,foo)(keep,bar),(keep,baz)将是另一个。

不修改任何代码,一次完成任务是不可能的,因为Spark无法知道这些任务与同一个数据集相关。在第一个作业之后,最多可以通过在< code >之前使用< code>rdd.cache对初始rdd进行< code >缓存来提高后续作业的速度。过滤器()。地图()。reduce()步骤;这仍将通过RDD 3次,但如果所有数据都适合群集的内存,第二次和第三次可能会快得多:

rdd.cache
// first reduceByKey action will trigger the cache and rdd data will be kept in memory
val foo = rdd.filter(fooFilter).map(fooMap).reduceByKey(???)
// subsequent operations will execute faster as the rdd is now available in mem
val bar = rdd.filter(barFilter).map(barMap).reduceByKey(???)
val baz = rdd.filter(bazFilter).map(bazMap).reduceByKey(???)

若我这样做,我会创建相关数据对,并在一次通过中对它们进行计数:

// We split the initial tuple into pairs keyed by the data type ("foo", "bar", "baz") and the keep information. dataPairs will contain data like: (("bar",true),1), (("foo",false),1)

val dataPairs = rdd.flatmap{case (keep, foo, bar, baz) => 
    def condPair(name:String, x:Int):Option[((String,Boolean), Int)] = if (x==1) Some(((name,keep),x)) else None
    Seq(condPair("foo",foo), condPair("bar",bar), condPair("baz",baz)).flatten
}
val totals = dataPairs.reduceByKey(_ + _)

这很简单,只会传递一次数据,但需要重写代码。我认为它在回答这个问题时得分为66,66%。

 类似资料:
  • Redux正在扔给我: 错误:预计减速机是一个功能。在线路上 从索引: 从配置: index.js 配置存储。js 减根剂 模拟减速器 找不到bug。我第一次用devtools插件支持创建redux商店

  • 问题内容: 哪个类首先编译,或者? 需要相同的类作为返回类型并扩展该类。 问题答案: 有很多方法可以实现编译器。但是,基本方法是解析源并构建符号表。然后使用该符号表将源代码转换为目标代码。 单遍编译器只能使用它已经遇到的符号。是旨在使用单遍编译器的语言示例。如果不是不可能的话,使用单遍编译器将很难实现。 但是,大多数语言都使用编译器,因为单遍编译器的优点不再重要,在这种情况下,使用尚未定义的符号变

  • 我正在用MapReduce框架用Java制作一个Hadoop应用程序。 对于输入和输出,我只使用文本键和值。在减少到最终输出之前,我使用一个合并器来做额外的计算。 但我有一个问题,钥匙不去同一个减速器。我在组合器中创建和添加了这样的键/值对: 基本上,我创建的工作如下: 减速机打印的标准输出如下: 这是没有意义的,因为键是相同的,因此它应该是2个还原器,其中3个值是相同的 希望你能帮我弄清这件事:

  • 在调试和故障处理的时候,我们通常有必要知道 RDD 有多少个分区。这里有几个方法可以找到这些信息: 使用 UI 查看在分区上执行的任务数 当 stage 执行的时候,你可以在 Spark UI 上看到这个 stage 上的分区数。 下面的例子中的简单任务在 4 个分区上创建了共 100 个元素的 RDD ,然后在这些元素被收集到 driver 之前分发一个 map 任务: scala> val s

  • 我将其插入到Godbolt中,并惊喜地发现这两个函数调用和在之外的任何情况下都是等价的(使用大多数主要编译器): 锁紧螺栓输出为: 我不是组装专家,但我想知道这是否是真的,他们在做相同的工作。我甚至看不出在这个程序集中哪里有对的调用,或者的“常量”(?)他在里面干什么。

  • 使用OpenGL(核心配置文件,4.4),将同一个采样器对象同时绑定到2个不同的纹理单元是否“合法”? 我的测试表明它是有效的,但我不知道我的司机是否原谅了我,或者这是一项功能。