当前位置: 首页 > 知识库问答 >
问题:

火花指数移动平均线

丁嘉庆
2023-03-14
EMA: {Price - EMA(previous day)} x multiplier + EMA(previous day)
multiplier = (2 / (Time periods + 1)) //let's assume Time period is 10 days for now
var window = Window.partitionBy("ID").orderBy("Date").rowsBetween(-windowSize, Window.currentRow)
dataFrame.withColumn(avg(col("Price")).over(window).alias("SMA"))

我也看过Pyspark中的加权移动平均线,但我需要一个Spark/Scala的方法,以及10天或30天的均线。

有什么想法吗?

共有1个答案

潘彦
2023-03-14

最后,我分析了指数移动平均是如何在pandas数据中实现的。除了我上面描述的递归公式之外,它很难在任何sql或窗口函数中实现(因为它是递归的),还有另一个公式,在它们的问题跟踪器上有详细说明:

y[t] = (x[t] + (1-a)*x[t-1] + (1-a)^2*x[t-2] + ... + (1-a)^n*x[t-n]) /
       ((1-a)^0 + (1-a)^1 + (1-a)^2 + ... + (1-a)^n).

考虑到这一点,并在这里提供了额外的spark实现帮助,我最终得到了下面的实现,这与执行pandas_dataframe.ewm(Span=window_size).mean()大致相同。

def exponentialMovingAverage(partitionColumn: String, orderColumn: String, column: String, windowSize: Int): DataFrame = {
  val window = Window.partitionBy(partitionColumn)
  val exponentialMovingAveragePrefix = "_EMA_"

  val emaUDF = udf((rowNumber: Int, columnPartitionValues: Seq[Double]) => {
    val alpha = 2.0 / (windowSize + 1)
    val adjustedWeights = (0 until rowNumber + 1).foldLeft(new Array[Double](rowNumber + 1)) { (accumulator, index) =>
      accumulator(index) = pow(1 - alpha, rowNumber - index); accumulator
    }
    (adjustedWeights, columnPartitionValues.slice(0, rowNumber + 1)).zipped.map(_ * _).sum / adjustedWeights.sum
  })
  dataFrame.withColumn("row_nr", row_number().over(window.orderBy(orderColumn)) - lit(1))
    .withColumn(s"$column$exponentialMovingAveragePrefix$windowSize", emaUDF(col("row_nr"), collect_list(column).over(window)))
    .drop("row_nr")
}

(我假定需要计算指数移动平均线的列的类型是双倍的。)

 类似资料:
  • 公式链接:https://sciencing.com/calculate-exponential-moving-averages-8221813.html

  • Python是否有一个SciPy函数或NumPy函数或模块来计算给定特定窗口的一维数组的运行平均值?

  • 我试图计算按名称分组的列的季度移动平均线,我定义了一个火花窗口函数规范为 我的数据frame如下所示:

  • 问题内容: 我基本上有一个像这样的值数组: 上面的数组过于简化,我在实际代码中每毫秒收集1个值,我需要使用编写的算法处理输出,以找到某个时间点之前最接近的峰值。我的逻辑失败了,因为在上面的示例中,它是真正的峰值,但是我的算法会向后看,并看到最后一个数字是峰值,因为之前的数值减少了。 目标是获取这些值,并对它们应用一种算法,该算法将使它们“平滑”一些,以便获得更多的线性值。(即:我希望自己的成绩是弯

  • 问题内容: 我有一个日期范围,并且每个日期都有一个度量值。我想计算每个日期的指数移动平均值。有人知道怎么做这个吗? 我是python的新手。似乎没有将平均值内置到标准python库中,这让我感到有些奇怪。也许我找的地方不对。 因此,给定以下代码,如何计算日历日期的IQ点的移动加权平均值? (可能是一种更好的数据结构方式,任何建议将不胜感激) 问题答案: 编辑:看来SciKits(补充SciPy的附