移动平均线对于Spark和任何分布式系统来说都是一个棘手的问题。当数据分散在多台机器上时,会有一些跨分区的时间窗口。我们必须在分区开始时复制数据,这样计算每个分区的移动平均值就可以得到完全的覆盖。
这里有一种在Spark中做到这一点的方法。示例数据:
val ts = sc.parallelize(0 to 100, 10)
val window = 3
一个简单的分区器,它将每一行放在我们通过键指定的分区中:
class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
def numPartitions = p
def getPartition(key: Any) = key.asInstanceOf[Int]
}
val partitioned = ts.mapPartitionsWithIndex((i, p) => {
val overlap = p.take(window - 1).toArray
val spill = overlap.iterator.map((i - 1, _))
val keep = (overlap.iterator ++ p).map((i, _))
if (i == 0) keep else keep ++ spill
}).partitionBy(new StraightPartitioner(ts.partitions.length)).values
val movingAverage = partitioned.mapPartitions(p => {
val sorted = p.toSeq.sorted
val olds = sorted.iterator
val news = sorted.iterator
var sum = news.take(window - 1).sum
(olds zip news).map({ case (o, n) => {
sum += n
val v = sum
sum -= o
v
}})
})
scala> movingAverage.collect.sameElements(3 to 297 by 3)
res0: Boolean = true
我处理了像这样存储的双精度列表: 我想计算这个列表的平均值。根据文档,: MLlib的所有方法都使用Java友好类型,因此您可以像在Scala中一样导入和调用它们。唯一的警告是,这些方法采用Scala RDD对象,而Spark Java API使用单独的JavaRDD类。您可以通过对JavaRDD对象调用.RDD()将JavaRDD转换为Scala RDD。 在同一页面上,我看到以下代码: 根据我
我也看过Pyspark中的加权移动平均线,但我需要一个Spark/Scala的方法,以及10天或30天的均线。 有什么想法吗?
我在尝试将spark数据帧的一列从十六进制字符串转换为双精度字符串时遇到了一个问题。我有以下代码: 我无法共享txs数据帧的内容,但以下是元数据: 但当我运行这个程序时,我得到了一个错误: 错误:类型不匹配;找到:MsgRow需要:org.apache.spark.sql.行MsgRow(row.getLong(0),row.getString(1),row.getString(2),hex2in
目前我正在研究Apache spark和Apache ignite框架。 这篇文章介绍了它们之间的一些原则差异,但我意识到我仍然不理解它们的目的。 我的意思是,哪一个问题更容易产生火花而不是点燃,反之亦然?
阅读 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala 这种实现的文字是谷歌Word2Vec的一个端口 https://code.google.com/archive/p/word2vec/ 这是“向量空间中单词表示的有效估计”
Apache Kafka:分布式消息传递系统 Apache Storm:实时消息处理 我们如何在实时数据管道中使用这两种技术来处理事件数据? 在实时数据管道方面,我觉得两者做的工作是一样的。如何在数据管道上同时使用这两种技术?