当前位置: 首页 > 知识库问答 >
问题:

在Scala中迭代RDD迭代

华飞驰
2023-03-14
case class ID: Int,  Var1: Int, Var2: Int extends Serializable
  def foo(rdds: RDD[(ID, Iterable[(Var1, Var2)])]): RDD[(Var1, AvgVar2)] = {

    def average(as: Array[Var2]): AvgVar2 = {
       var sum = 0.0
       var i = 0.0
       while (i < as.length) {
           sum += Var2.val
           i += 1
      }
      sum/i
    }

    //My attempt at Scala
    rdds.map(x=> ((x._1),x._2)).groupByKey().map(x=>average(x._1)).collect()
}
    null
//RDD[(ID,Iterable[(Var1,Var2)...])]
RDD[(1,[(1,3),(1,12),(1,6)])],
RDD[(2,[(2,5),(2,7)])]

一些示例输出数据:

//RDD[(Var1, AvgVar2)]
RDD[(1,7),(2,6)]

*编辑:工作的scala代码行:

rdd.map(x => (x._2.map(it => it._1).asInstanceOf[Var1], average(x._2.map(it => it._2).toArray)))

共有1个答案

吴伟志
2023-03-14

考虑到id=var1,简单的.map()将解决这个问题:

def foo(rdds: RDD[(Int, Iterable[(Int, Int)])]): RDD[(Int, Double)] = {

  def average(as: Iterable[(Int, Int)]): Double = {
    as.map(_._2).reduce(_+_)/as.size.toDouble
  }

  rdds.map(x => (x._1, average(x._2)))
}

产出:

val input = sc.parallelize(List((1,Iterable((1,3),(1,12),(1,6))), (2, Iterable((2,5),(2,7)))))

scala> foo(input).collect
res0: Array[(Int, Double)] = Array((1,7.0), (2,6.0))

编辑:(averary(),签名相同):

def foo(rdds: RDD[(Int, Iterable[(Int, Int)])]): RDD[(Int, Double)] = {

  def average(as: Array[Int]): Double = {
    as.reduce(_+_)/as.size.toDouble
  }

  rdds.map(x => (x._1, average(x._2.map(tuple => tuple._2).toArray)))
}
 类似资料:
  • 迭代器不是集合,而是逐个访问集合元素的方法。 iterator it上的两个基本操作是next和hasNext 。 对it.next()调用将返回迭代器的下一个元素并提升迭代器的状态。 您可以使用Iterator的it.hasNext方法找出是否有更多元素要返回。 “逐步”迭代器返回的所有元素的最简单方法是使用while循环。 让我们按照以下示例程序进行操作。 例子 (Example) objec

  • 主要内容:实例,查找最大与最小元素,实例,获取迭代器的长度,实例,Scala Iterator 常用方法Scala 集合 Scala Iterator(迭代器)不是一个集合,它是一种用于访问集合的方法。 迭代器 it 的两个基本操作是 next 和 hasNext。 调用 it.next() 会返回迭代器的下一个元素,并且更新迭代器的状态。 调用 it.hasNext() 用于检测集合中是否还有元素。 让迭代器 it 逐个返回所有元素最简单的方法是使用 while 循环: 实例 object T

  • Scala 集合 Scala Iterator(迭代器)不是一个集合,它是一种用于访问集合的方法。 迭代器 it 的两个基本操作是 next 和 hasNext。 调用 it.next() 会返回迭代器的下一个元素,并且更新迭代器的状态。 调用 it.hasNext() 用于检测集合中是否还有元素。 让迭代器 it 逐个返回所有元素最简单的方法是使用 while 循环: object Test

  • 我有一个顺序数据源,表示为简单迭代器(或流)。数据相当大,不适合内存。此外,源代码可以遍历一次,并且获取成本很高。该源用于一些重过程(黑盒),该过程将迭代器(或流)作为其参数来使用线性数据。好的,很简单。但如果我有两种不同的消费程序,我该怎么办??正如我所说的,我不想将输入数据吸入类似列表的集合中。我也可以从一开始就重读源代码两次来完成我的任务,但我不喜欢这样,因为这样做没有效果。如果事实上我需要

  • 问题内容: 您能想到一种很好的方法(也许使用itertools)将迭代器拆分为给定大小的块吗? 因此,with成为迭代器 我可以想到一个小程序来做到这一点,但是使用itertools并不是一个好方法。 问题答案: 在 从配方文件的食谱来靠近你想要什么: 但是,它将使用填充值填充最后一个块。 较不通用的解决方案仅适用于序列,但可以根据需要处理最后一个块 最后,一种可在一般迭代器上运行且其行为符合预期

  • 你竟任着刚硬不悔改的心,为自己积蓄忿怒,以致神震怒,显他公义审判的日子来到。他必照各人的行为报应各人。凡恒心行善,寻求荣耀、尊贵和不能朽坏之福的,就以永生报应他们;惟有结党不顺从真理,反顺从不义的,就以忿怒、恼恨报应他们。(ROMANS 2:7-8) 迭代 Bill正在介绍他的项目,嘴里不断蹦出“loop、iterate、traversal、recursion”这些单词,夹杂在汉语汇总。旁边的小白