当前位置: 首页 > 面试题库 >

Pyspark的加权移动平均线

葛子昂
2023-03-14
问题内容

我正在为Pyspark中的时间序列编写异常检测算法。我想计算(-3,3)或(-4,4)窗口的加权移动平均值。现在,我正在使用滞后和超前窗口功能,并将它们乘以一组权重。我的窗口当前是(-2,2)。

我想知道是否有另一种方法可以计算Pyspark中的加权移动平均值。

我正在使用的当前代码是:

data_frame_1 = spark_data_frame.withColumn("weighted_score_predicted", (weights[0] * lag(column_metric, 1).over(w) + weights[1] * lag(column_metric, 2).over(w) + weights[2] * lead(column_metric, 1).over(w) + weights[3] * lead(column_metric, 2).over(w)) / 2).na.drop()

问题答案:

您可以概括当前的代码:

from pyspark.sql.functions import coalesce, lit, col, lead, lag
from operator import add
from functools import reduce

def weighted_average(c, window, offsets, weights):
    assert len(weights) == len(offsets)

    def value(i):
        if i < 0: return lag(c, -i).over(window)
        if i > 0: return lead(c, i).over(window)
        return c

    # Create a list of Columns
    # - `value_i * weight_i` if `value_i IS NOT NULL` 
    # - literal 0 otherwise
    values = [coalesce(value(i) * w, lit(0)) for i, w in zip(offsets, weights)]

    # or sum(values, lit(0))
    return reduce(add, values, lit(0))

它可以用作:

from pyspark.sql.window import Window

df = spark.createDataFrame([
    ("a", 1, 1.4), ("a", 2, 8.0), ("a", 3, -1.0), ("a", 4, 2.4),
    ("a", 5, 99.0), ("a", 6, 3.0), ("a", 7, -1.0), ("a", 8, 0.0)
]).toDF("id", "time", "value")

w = Window.partitionBy("id").orderBy("time")
offsets, delays =  [-2, -1, 0, 1, 2], [0.1, 0.20, 0.4, 0.20, 0.1]

result = df.withColumn("avg", weighted_average(
    col("value"), w, offsets, delays
))
result.show()

## +---+----+-----+-------------------+ 
## | id|time|value|                avg|
## +---+----+-----+-------------------+
## |  a|   1|  1.4|               2.06|
## |  a|   2|  8.0| 3.5199999999999996|
## |  a|   3| -1.0|              11.72|
## |  a|   4|  2.4|              21.66|
## |  a|   5| 99.0| 40.480000000000004|
## |  a|   6|  3.0|              21.04|
## |  a|   7| -1.0|               10.1|
## |  a|   8|  0.0|0.10000000000000003|
## +---+----+-----+-------------------+

注意事项

您可能会考虑将滞后缺失的帧的结果标准化:

 result.withColumn(
     "normalization_factor",
     weighted_average(lit(1), w, offsets, delays)
 ).withColumn(
     "normalized_avg",
      col("avg") / col("normalization_factor")
).show()

## +---+----+-----+-------------------+--------------------+------------------+ 
## | id|time|value|                avg|normalization_factor|    normalized_avg|
## +---+----+-----+-------------------+--------------------+------------------+
## |  a|   1|  1.4|               2.06|  0.7000000000000001|2.9428571428571426|
## |  a|   2|  8.0| 3.5199999999999996|                 0.9|3.9111111111111105|
## |  a|   3| -1.0|              11.72|  1.0000000000000002|11.719999999999999|
## |  a|   4|  2.4|              21.66|  1.0000000000000002|21.659999999999997|
## |  a|   5| 99.0| 40.480000000000004|  1.0000000000000002|             40.48|
## |  a|   6|  3.0|              21.04|  1.0000000000000002|21.039999999999996|
## |  a|   7| -1.0|               10.1|  0.9000000000000001| 11.22222222222222|
## |  a|   8|  0.0|0.10000000000000003|  0.7000000000000001|0.1428571428571429|
## +---+----+-----+-------------------+--------------------+------------------+


 类似资料:
  • 问题内容: 我正在写一个使用numpy中的卷积函数的移动平均函数,它应该等效于(加权移动平均)。当我的权重全部相等时(如简单的算术平均值),它可以正常工作: 给 但是,当我尝试使用加权平均值时 而不是(对于相同的数据)3.667,4.667,5.667,6.667,…我希望,我得到 如果删除“有效”标志,则什至看不到正确的值。我真的很想对WMA和MA使用convolve,因为它可以使代码更整洁(相

  • 我正在写一个移动平均函数,它使用numpy中的卷积函数,它应该相当于一个(加权移动平均)。当我的权重都相等时(就像在一个简单的算术平均值中一样),它工作得很好: 给予 对这种行为有什么看法吗?

  • 公式链接:https://sciencing.com/calculate-exponential-moving-averages-8221813.html

  • 问题内容: 像下面的熊猫一样,如何在NumPy中获得指数加权移动平均值? 我用NumPy尝试了以下 但是结果却与大熊猫不同。 是否有更好的方法直接在NumPy中计算指数加权移动平均值并获得与完全相同的结果? 在对熊猫解决方案提出60,000个请求时,我得到了大约230秒。我敢肯定,使用纯NumPy可以大大减少这种情况。 问题答案: 更新于08/06/2019 大型输入的纯,快速和保护的解决方案 用

  • Python是否有一个SciPy函数或NumPy函数或模块来计算给定特定窗口的一维数组的运行平均值?