我想与Python解决方案共享这个特定的Apache Spark,因为它的文档非常有限。
我想通过KEY计算K / V对的平均值(存储在Pairwise RDD中)。示例数据如下所示:
>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]
现在,以下代码序列 并不是达到最佳效果的 方法,但它确实有效。这是我在寻找更好的解决方案之前所做的事情。这并不可怕,但是-如您将在答案部分中看到的-
有一种更简洁,有效的方法。
>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
[(u'2013-10-09', 11.235365503035176),
(u'2013-10-07', 23.39500642456595),
... snip ...
]
现在,更好的方法是使用该rdd.aggregateByKey()
方法。因为该方法在Apache Spark和Python文档中的记录非常少-
这就是我编写此问与答的原因 -直到最近我一直在使用上述代码序列。但是同样,它的效率较低,因此除非必要,否则 避免 这样做。
这是使用rdd.aggregateByKey()
方法( 推荐 )进行相同操作的方法…
通过KEY,同时计算SUM(我们要计算的平均值的分子)和COUNT(我们要计算的平均值的分母):
>>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function.
>>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b, a[1] + 1),
lambda a,b: (a[0] + b[0], a[1] + b[1]))
关于上面每个a
和b
对的含义,以下内容是正确的(因此您可以直观地看到正在发生的事情):
First lambda expression for Within-Partition Reduction Step::
a: is a TUPLE that holds: (runningSum, runningCount).
b: is a SCALAR that holds the next Value
Second lambda expression for Cross-Partition Reduction Step::
a: is a TUPLE that holds: (runningSum, runningCount).
b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).
最后,计算每个KEY的平均值,并收集结果。
>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect()
>>> print(finalResult)
[(u'2013-09-09', 11.235365503035176),
(u'2013-09-01', 23.39500642456595),
(u'2013-09-03', 13.53240060820617),
(u'2013-09-05', 13.141148418977687),
... snip ...
]
我希望这个问题和答案aggregateByKey()
会有所帮助。
我对斯卡拉和Spark都很陌生,所以如果我做错了,请原谅我。在接收csv文件,过滤和映射之后;我有一个RDD,它是一堆(字符串,双)对。 当我在RDD上使用.groupByKey()时, 得到一个有一堆(String,[Double])对的RDD。(我不知道CompactBuffer是什么意思,可能会导致我的问题?) 一旦他们被分组,我将尝试取平均值和标准偏差。我只想使用.mean()和.samp
该向量包含X、Y坐标,即成对的双打。我想为每个用户ID标识坐标集群,所以我在RDD上进行映射,并尝试为每个组运行k-means: 但是当我运行这个时,我从一行中得到了一个NPE: 问题是,我必须将coords转换为RDD来进行K-Means操作。
问题内容: 我正在寻找带有示例的k-means算法的Python实现来聚类和缓存我的坐标数据库。 问题答案: 更新:( 在最初回答之后十一年,可能是该进行更新的时候了。) 首先,您确定要使用k均值吗? 该页面很好地总结了一些不同的聚类算法。我建议您在图形之外,特别查看每种方法所需的参数,并确定您是否可以提供所需的参数(例如,k均值需要簇的数量,但是也许您不知道在开始之前就知道了)群集)。 以下是一
问题内容: 我有一个日期范围,并且每个日期都有一个度量值。我想计算每个日期的指数移动平均值。有人知道怎么做这个吗? 我是python的新手。似乎没有将平均值内置到标准python库中,这让我感到有些奇怪。也许我找的地方不对。 因此,给定以下代码,如何计算日历日期的IQ点的移动加权平均值? (可能是一种更好的数据结构方式,任何建议将不胜感激) 问题答案: 编辑:看来SciKits(补充SciPy的附
问题内容: 已关闭 。这个问题需要细节或说明。它当前不接受答案。 想改善这个问题吗? 添加详细信息并通过编辑此帖子来澄清问题。 11个月前关闭。 改善这个问题 我有一个清单: 我想要另一个具有三个值均值的列表,因此新列表为: 新列表中只有6个值,因为第一个元素中只有18个元素。 我正在寻找一种精巧的方法来完成此操作,并为大量列表提供最少的步骤。 问题答案: 您可以在3个间隔中迭代使用for循环
问题内容: 编辑:我已经写了平均的代码,但我不知道如何使它也使用从我的args.length而不是数组的整数 我需要编写一个Java程序,该程序可以计算:1.读入的整数数2.平均值-不必是整数! 注意!我不想从数组中计算平均值,但是要在args中计算整数。 目前我已经写了这个: 谁能指导我正确的方向?还是举个例子,以书面形式指导我塑造这段代码? 提前致谢 问题答案: 只需对您的代码进行一些小的修改