我已经实现了一个解决方案,通过键分组RDD[K, V]
,并根据每个组(K, RDD[V])
计算数据,使用分区按
和分区器
。然而,我不确定它是否真的有效,我想听听你的观点。
下面是一个示例案例:根据[K:Int,V:Int]
列表,计算K
每组的V
s平均值,知道它应该分布,并且V
值可能非常大。这应该是:
List[K, V] => (K, mean(V))
简单的分区器类:
class MyPartitioner(maxKey: Int) extends Partitioner {
def numPartitions = maxKey
def getPartition(key: Any): Int = key match {
case i: Int if i < maxKey => i
}
}
分区代码:
val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7))
val rdd = sc.parallelize(l)
val p = rdd.partitionBy(new MyPartitioner(4)).cache()
p.foreachPartition(x => {
try {
val r = sc.parallelize(x.toList)
val id = r.first() //get the K partition id
val v = r.map(x => x._2)
println(id._1 + "->" + mean(v))
} catch {
case e: UnsupportedOperationException => 0
}
})
输出为:
1-
我的问题是:
调用分区时到底发生了什么?(抱歉,我没有找到足够的规格)
分区映射真的有效吗,知道在我的生产案例中,不会有太多的键(50个样本)和很多值(100万个样本)
- 什么是
paralellize(x.toList)
的成本?这样做是一致的吗?(我需要一个RDD
在输入的均值()
) - 你自己会怎么做?
问候
您的代码不应该工作。不能将SparkContext
对象传递给执行程序。(它不是Serializable
。)我也不明白你为什么需要。
要计算平均值,你需要计算总和和和计数,并取它们的比率。默认的分区程序就可以了。
def meanByKey(rdd: RDD[(Int, Int)]): RDD[(Int, Double)] = {
case class SumCount(sum: Double, count: Double)
val sumCounts = rdd.aggregateByKey(SumCount(0.0, 0.0))(
(sc, v) => SumCount(sc.sum + v, sc.count + 1.0),
(sc1, sc2) => SumCount(sc1.sum + sc2.sum, sc1.count + sc2.count))
sumCounts.map(sc => sc.sum / sc.count)
}
这是一个有效的单程计算,具有很好的通用性。
我需要找到一种算法来解决以下问题: 给出了一个区间列表(leftBound、RightBound),这是在此行为中对区间进行分组的最有效算法: 间隔:(1,4)、(6,9)、(1,3)、(4,8)、(6,9)、(2,7)、(10,15) 需要的解决方案: 组(2,3)包含(1,3), (1,4), (2,7) 组(6,8)包含(4,8), (6,9) 组(10,15)包含(10,15) 当然,有不
我如何使用胶水/火花转换成拼花,这也是分区的日期和分裂在n个文件每天?。这些示例不包括分区、拆分或供应(多少节点和多大节点)。每天包含几百GBS。 因为源CSV不一定在正确的分区中(错误的日期),并且大小不一致,所以我希望用正确的分区和更一致的大小写到分区的parquet。
问题内容: Redshift在其窗口函数中不支持聚合。 AWS文档的状态为this ,任何窗口功能均不支持。 我的用例:在不同的时间间隔和流量渠道上统计客户 我希望获得当年的月度和年初至今 唯一 客户数,并希望按流量渠道以及所有渠道的总数进行划分。由于一个客户可以拜访不止一次,因此我只需要计算不同的客户,因此Redshift窗口汇总将无济于事。 我可以使用来计算不同的客户,但这只会给我四个所需结果
我有两个长字符串映射,我想通过比较一个映射值和另一个映射值来计算百分比。 例如: 我想通过比较map2值和map1值来计算test1和test2值的平均值。 期望的结果应该是
使用Spark 2.4.0。我的生产数据非常歪斜,因此其中一项任务的时间是其他任务的7倍。我尝试了不同的策略来规范数据,以便所有执行者都能平等工作- spark.default。并行性 reduceByKey(numPartitions) 重新分区(numPartitions) 我的期望是这三个选项应该均匀分区,但是在Spark Local/Standalone上使用一些虚拟的非生产数据表明,选项
因此,如何跨辅助节点对RDD进行分区,是将被分区的单个RDD还是一个完整的批处理。 我可能拿错了。请指引我