我对斯卡拉和Spark都很陌生,所以如果我做错了,请原谅我。在接收csv文件,过滤和映射之后;我有一个RDD,它是一堆(字符串,双)对。
(b2aff711,-0.00510)
(ae095138,0.20321)
(etc.)
当我在RDD上使用.groupByKey()时,
val grouped = rdd1.groupByKey()
得到一个有一堆(String,[Double])对的RDD。(我不知道CompactBuffer是什么意思,可能会导致我的问题?)
(32540b03,CompactBuffer(-0.00699, 0.256023))
(a93dec11,CompactBuffer(0.00624))
(32cc6532,CompactBuffer(0.02337, -0.05223, -0.03591))
(etc.)
一旦他们被分组,我将尝试取平均值和标准偏差。我只想使用.mean()和.sampleStdev()。当我尝试创建一个新的方法RDD时,
val mean = grouped.mean()
返回错误
错误:(51, 22) 值平均值不是组织的成员。
grouped.mean
我已导入org.apache.spark.ParkContext_
我还尝试使用sampleStdev()、.sum()和.stats(),结果相同。无论问题是什么,它似乎都会影响所有的数字RDD操作。
这是一个没有自定义功能的完整程序:
val conf = new SparkConf().setAppName("means").setMaster("local[*]")
val sc = new SparkContext(conf)
val data = List(("Lily", 23), ("Lily", 50),
("Tom", 66), ("Tom", 21), ("Tom", 69),
("Max", 11), ("Max", 24))
val RDD = sc.parallelize(data)
val counts = RDD.map(item => (item._1, (1, item._2.toDouble)) )
val countSums = counts.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2) )
val keyMeans = countSums.mapValues(avgCount => avgCount._2 / avgCount._1)
for ((key, mean) <- keyMeans.collect()) println(key + " " + mean)
主要方法是使用avier eByKey
而不是groupByKey
。
val result = sc.parallelize(data)
.map { case (key, value) => (key, (value, 1)) }
.reduceByKey { case ((value1, count1), (value2, count2))
=> (value1 + value2, count1 + count2)}
.mapValues {case (value, count) => value.toDouble / count.toDouble}
另一方面,您的解决方案中的问题是将
分组为(String, Iterable[Double])
形式的对象的RDD(就像在错误中一样)。例如,您可以计算Ints或Double ble的RDD平均值,但rdd对的平均值是多少。
让我们考虑以下几点:
val data = List(("32540b03",-0.00699), ("a93dec11",0.00624),
("32cc6532",0.02337) , ("32540b03",0.256023),
("32cc6532",-0.03591),("32cc6532",-0.03591))
val rdd = sc.parallelize(data.toSeq).groupByKey().sortByKey()
计算每对平均值的一种方法如下:
您需要定义一个平均方法:
def average[T]( ts: Iterable[T] )( implicit num: Numeric[T] ) = {
num.toDouble( ts.sum ) / ts.size
}
您可以在rdd上应用您的方法,如下所示:
val avgs = rdd.map(x => (x._1, average(x._2)))
您可以检查:
avgs.take(3)
结果是:
res4: Array[(String, Double)] = Array((32540b03,0.1245165), (32cc6532,-0.016149999999999998), (a93dec11,0.00624))
问题内容: 我想与Python解决方案共享这个特定的Apache Spark,因为它的文档非常有限。 我想通过KEY计算K / V对的平均值(存储在Pairwise RDD中)。示例数据如下所示: 现在,以下代码序列 并不是达到最佳效果的 方法,但它确实有效。这是我在寻找更好的解决方案之前所做的事情。这并不可怕,但是-如您将在答案部分中看到的- 有一种更简洁,有效的方法。 问题答案: 现在,更好的
问题内容: 已关闭 。这个问题需要细节或说明。它当前不接受答案。 想改善这个问题吗? 添加详细信息并通过编辑此帖子来澄清问题。 11个月前关闭。 改善这个问题 我有一个清单: 我想要另一个具有三个值均值的列表,因此新列表为: 新列表中只有6个值,因为第一个元素中只有18个元素。 我正在寻找一种精巧的方法来完成此操作,并为大量列表提供最少的步骤。 问题答案: 您可以在3个间隔中迭代使用for循环
我在添加数组的所有元素以及求取它们的平均值时遇到了问题。我将如何做到这一点并用我当前拥有的代码实现它?这些元素应该定义如下。
我花了一段时间才弄明白这一点,我想分享我的解决方案。当然欢迎改进。 参考:在RDD中展平Scala映射,通过反转groupby(即,为其中的每个序列重复标题)来展平序列 我有一个RDD的形式:RDD[(Int,List[(String,List[(String,Int,Float)])]] 关键字:Int 值:List[(String,List[(String,Int,Float)])] 目标是将
问题内容: 任何人都知道如何计算这些列之一的平均值(在Linux上)? 例如:mean(第2栏) 问题答案: Awk: 读为: 对于每一行,将第2列添加到变量“总计”中。 在文件末尾,打印“总计”除以记录数。
问题内容: 我有以下JSON数组,我想创建对象表单状态键计数 要计算状态键值并创建以下对象 问题答案: 使用 方法 虽然可以使用 具有相同代码的方法。