当前位置: 首页 > 知识库问答 >
问题:

使用分区通过分割和有效地计算RDD组的关键

越扬
2023-03-14

我已经实现了一个解决方案,通过键分组RDD[K, V],并根据每个组(K, RDD[V])计算数据,使用分区按分区器。然而,我不确定它是否真的有效,我想听听你的观点。

下面是一个示例案例:根据[K:Int,V:Int]列表,计算K每组的Vs平均值,知道它应该分布,并且V值可能非常大。这应该是:

List[K, V] => (K, mean(V))

简单的分区器类:

class MyPartitioner(maxKey: Int) extends Partitioner {

    def numPartitions = maxKey

    def getPartition(key: Any): Int = key match {
      case i: Int if i < maxKey => i
    }
  }

分区代码:

val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7))

      val rdd = sc.parallelize(l)
      val p =  rdd.partitionBy(new MyPartitioner(4)).cache()

      p.foreachPartition(x => {
        try {
          val r = sc.parallelize(x.toList)
          val id = r.first() //get the K partition id
          val v = r.map(x => x._2)
          println(id._1 + "->" + mean(v))
        } catch {
          case e: UnsupportedOperationException => 0
        }
      })

输出为:

1-

我的问题是:

  1. 调用分区时到底发生了什么?(抱歉,我没有找到足够的规格)
  2. 分区映射真的有效吗,知道在我的生产案例中,不会有太多的键(50个样本)和很多值(100万个样本)
  3. 什么是paralellize(x.toList)的成本?这样做是一致的吗?(我需要一个RDD在输入的均值()
  4. 你自己会怎么做?

问候


共有1个答案

朱祺
2023-03-14

您的代码不应该工作。不能将SparkContext对象传递给执行程序。(它不是Serializable。)我也不明白你为什么需要。

要计算平均值,你需要计算总和和和计数,并取它们的比率。默认的分区程序就可以了。

def meanByKey(rdd: RDD[(Int, Int)]): RDD[(Int, Double)] = {
  case class SumCount(sum: Double, count: Double)
  val sumCounts = rdd.aggregateByKey(SumCount(0.0, 0.0))(
    (sc, v) => SumCount(sc.sum + v, sc.count + 1.0),
    (sc1, sc2) => SumCount(sc1.sum + sc2.sum, sc1.count + sc2.count))
  sumCounts.map(sc => sc.sum / sc.count)
}

这是一个有效的单程计算,具有很好的通用性。

 类似资料:
  • 我需要找到一种算法来解决以下问题: 给出了一个区间列表(leftBound、RightBound),这是在此行为中对区间进行分组的最有效算法: 间隔:(1,4)、(6,9)、(1,3)、(4,8)、(6,9)、(2,7)、(10,15) 需要的解决方案: 组(2,3)包含(1,3), (1,4), (2,7) 组(6,8)包含(4,8), (6,9) 组(10,15)包含(10,15) 当然,有不

  • 我如何使用胶水/火花转换成拼花,这也是分区的日期和分裂在n个文件每天?。这些示例不包括分区、拆分或供应(多少节点和多大节点)。每天包含几百GBS。 因为源CSV不一定在正确的分区中(错误的日期),并且大小不一致,所以我希望用正确的分区和更一致的大小写到分区的parquet。

  • 问题内容: Redshift在其窗口函数中不支持聚合。 AWS文档的状态为this ,任何窗口功能均不支持。 我的用例:在不同的时间间隔和流量渠道上统计客户 我希望获得当年的月度和年初至今 唯一 客户数,并希望按流量渠道以及所有渠道的总数进行划分。由于一个客户可以拜访不止一次,因此我只需要计算不同的客户,因此Redshift窗口汇总将无济于事。 我可以使用来计算不同的客户,但这只会给我四个所需结果

  • 我有两个长字符串映射,我想通过比较一个映射值和另一个映射值来计算百分比。 例如: 我想通过比较map2值和map1值来计算test1和test2值的平均值。 期望的结果应该是

  • 使用Spark 2.4.0。我的生产数据非常歪斜,因此其中一项任务的时间是其他任务的7倍。我尝试了不同的策略来规范数据,以便所有执行者都能平等工作- spark.default。并行性 reduceByKey(numPartitions) 重新分区(numPartitions) 我的期望是这三个选项应该均匀分区,但是在Spark Local/Standalone上使用一些虚拟的非生产数据表明,选项

  • 因此,如何跨辅助节点对RDD进行分区,是将被分区的单个RDD还是一个完整的批处理。 我可能拿错了。请指引我