当前位置: 首页 > 面试题库 >

NumPy版本的“指数加权移动平均线”,等效于pandas.ewm()。mean()

葛哲彦
2023-03-14
问题内容

像下面的熊猫一样,如何在NumPy中获得指数加权移动平均值?

import pandas as pd
import pandas_datareader as pdr
from datetime import datetime

# Declare variables
ibm = pdr.get_data_yahoo(symbols='IBM', start=datetime(2000, 1, 1), end=datetime(2012, 1, 1)).reset_index(drop=True)['Adj Close']
windowSize = 20

# Get PANDAS exponential weighted moving average
ewm_pd = pd.DataFrame(ibm).ewm(span=windowSize, min_periods=windowSize).mean().as_matrix()

print(ewm_pd)

我用NumPy尝试了以下

import numpy as np
import pandas_datareader as pdr
from datetime import datetime

# From this post: http://stackoverflow.com/a/40085052/3293881 by @Divakar
def strided_app(a, L, S): # Window len = L, Stride len/stepsize = S
    nrows = ((a.size - L) // S) + 1
    n = a.strides[0]
    return np.lib.stride_tricks.as_strided(a, shape=(nrows, L), strides=(S * n, n))

def numpyEWMA(price, windowSize):
    weights = np.exp(np.linspace(-1., 0., windowSize))
    weights /= weights.sum()

    a2D = strided_app(price, windowSize, 1)

    returnArray = np.empty((price.shape[0]))
    returnArray.fill(np.nan)
    for index in (range(a2D.shape[0])):
        returnArray[index + windowSize-1] = np.convolve(weights, a2D[index])[windowSize - 1:-windowSize + 1]
    return np.reshape(returnArray, (-1, 1))

# Declare variables
ibm = pdr.get_data_yahoo(symbols='IBM', start=datetime(2000, 1, 1), end=datetime(2012, 1, 1)).reset_index(drop=True)['Adj Close']
windowSize = 20

# Get NumPy exponential weighted moving average
ewma_np = numpyEWMA(ibm, windowSize)

print(ewma_np)

但是结果却与大熊猫不同。

是否有更好的方法直接在NumPy中计算指数加权移动平均值并获得与完全相同的结果pandas.ewm().mean()

在对熊猫解决方案提出60,000个请求时,我得到了大约230秒。我敢肯定,使用纯NumPy可以大大减少这种情况。


问题答案:

更新于08/06/2019

大型输入的纯,快速和保护的解决方案

out 用于就地计算的 dtype参数,参数,索引order参数

此功能等效于pandas’
ewm(adjust=False).mean(),但速度更快。ewm(adjust=True).mean()(熊猫的默认设置)可以在结果开始时产生不同的值。我正在努力adjust为该解决方案添加功能。

当输入太大时,@Divakar的答案会导致浮点精度问题。这是因为,(1-alpha)**(n+1) -> 0n ->inf和时alpha -> 1,会导致被零除,并且NaN在计算中会弹出值。

我想我终于破解了!

这是numpy_ewma功能的向量化版本,据称它可以从@RaduS's post-产生正确的结果

def numpy_ewma_vectorized(data, window):

    alpha = 2 /(window + 1.0)
    alpha_rev = 1-alpha

    scale = 1/alpha_rev
    n = data.shape[0]

    r = np.arange(n)
    scale_arr = scale**r
    offset = data[0]*alpha_rev**(r+1)
    pw0 = alpha*alpha_rev**(n-1)

    mult = data*pw0*scale_arr
    cumsums = mult.cumsum()
    out = offset + cumsums*scale_arr[::-1]
    return out

进一步提升

我们可以通过重复使用一些代码来进一步增强它,例如-

def numpy_ewma_vectorized_v2(data, window):

    alpha = 2 /(window + 1.0)
    alpha_rev = 1-alpha
    n = data.shape[0]

    pows = alpha_rev**(np.arange(n+1))

    scale_arr = 1/pows[:-1]
    offset = data[0]*pows[1:]
    pw0 = alpha*alpha_rev**(n-1)

    mult = data*pw0*scale_arr
    cumsums = mult.cumsum()
    out = offset + cumsums*scale_arr[::-1]
    return out

运行时测试

让我们将这两个时间与大型数据集的相同循环函数进行比较。

In [97]: data = np.random.randint(2,9,(5000))
    ...: window = 20
    ...:

In [98]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized(data, window))
Out[98]: True

In [99]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized_v2(data, window))
Out[99]: True

In [100]: %timeit numpy_ewma(data, window)
100 loops, best of 3: 6.03 ms per loop

In [101]: %timeit numpy_ewma_vectorized(data, window)
1000 loops, best of 3: 665 µs per loop

In [102]: %timeit numpy_ewma_vectorized_v2(data, window)
1000 loops, best of 3: 357 µs per loop

In [103]: 6030/357.0
Out[103]: 16.89075630252101

加速约17倍!

这是我最快的解决方案,几乎没有向量问题,几乎没有向量化。它有点复杂,但是性能却很好,尤其是对于非常庞大的输入。不使用就地计算(可以使用out参数来节省内存分配时间):在相当老的PC上,对于100M元素输入向量为3.62秒,对于100K元素输入向量为3.2ms,对于5000个元素输入向量为293µs
(结果会因alpha/row_size值而异)。

# tested with python3 & numpy 1.15.2
import numpy as np

def ewma_vectorized_safe(data, alpha, row_size=None, dtype=None, order='C', out=None):
    """
    Reshapes data before calculating EWMA, then iterates once over the rows
    to calculate the offset without precision issues
    :param data: Input data, will be flattened.
    :param alpha: scalar float in range (0,1)
        The alpha parameter for the moving average.
    :param row_size: int, optional
        The row size to use in the computation. High row sizes need higher precision,
        low values will impact performance. The optimal value depends on the
        platform and the alpha being used. Higher alpha values require lower
        row size. Default depends on dtype.
    :param dtype: optional
        Data type used for calculations. Defaults to float64 unless
        data.dtype is float32, then it will use float32.
    :param order: {'C', 'F', 'A'}, optional
        Order to use when flattening the data. Defaults to 'C'.
    :param out: ndarray, or None, optional
        A location into which the result is stored. If provided, it must have
        the same shape as the desired output. If not provided or `None`,
        a freshly-allocated array is returned.
    :return: The flattened result.
    """
    data = np.array(data, copy=False)

    if dtype is None:
        if data.dtype == np.float32:
            dtype = np.float32
        else:
            dtype = np.float
    else:
        dtype = np.dtype(dtype)

    row_size = int(row_size) if row_size is not None 
               else get_max_row_size(alpha, dtype)

    if data.size <= row_size:
        # The normal function can handle this input, use that
        return ewma_vectorized(data, alpha, dtype=dtype, order=order, out=out)

    if data.ndim > 1:
        # flatten input
        data = np.reshape(data, -1, order=order)

    if out is None:
        out = np.empty_like(data, dtype=dtype)
    else:
        assert out.shape == data.shape
        assert out.dtype == dtype

    row_n = int(data.size // row_size)  # the number of rows to use
    trailing_n = int(data.size % row_size)  # the amount of data leftover
    first_offset = data[0]

    if trailing_n > 0:
        # set temporary results to slice view of out parameter
        out_main_view = np.reshape(out[:-trailing_n], (row_n, row_size))
        data_main_view = np.reshape(data[:-trailing_n], (row_n, row_size))
    else:
        out_main_view = out
        data_main_view = data

    # get all the scaled cumulative sums with 0 offset
    ewma_vectorized_2d(data_main_view, alpha, axis=1, offset=0, dtype=dtype,
                       order='C', out=out_main_view)

    scaling_factors = (1 - alpha) ** np.arange(1, row_size + 1)
    last_scaling_factor = scaling_factors[-1]

    # create offset array
    offsets = np.empty(out_main_view.shape[0], dtype=dtype)
    offsets[0] = first_offset
    # iteratively calculate offset for each row
    for i in range(1, out_main_view.shape[0]):
        offsets[i] = offsets[i - 1] * last_scaling_factor + out_main_view[i - 1, -1]

    # add the offsets to the result
    out_main_view += offsets[:, np.newaxis] * scaling_factors[np.newaxis, :]

    if trailing_n > 0:
        # process trailing data in the 2nd slice of the out parameter
        ewma_vectorized(data[-trailing_n:], alpha, offset=out_main_view[-1, -1],
                        dtype=dtype, order='C', out=out[-trailing_n:])
    return out

def get_max_row_size(alpha, dtype=float):
    assert 0. <= alpha < 1.
    # This will return the maximum row size possible on 
    # your platform for the given dtype. I can find no impact on accuracy
    # at this value on my machine.
    # Might not be the optimal value for speed, which is hard to predict
    # due to numpy's optimizations
    # Use np.finfo(dtype).eps if you  are worried about accuracy
    # and want to be extra safe.
    epsilon = np.finfo(dtype).tiny
    # If this produces an OverflowError, make epsilon larger
    return int(np.log(epsilon)/np.log(1-alpha)) + 1

一维ewma函数:

def ewma_vectorized(data, alpha, offset=None, dtype=None, order='C', out=None):
    """
    Calculates the exponential moving average over a vector.
    Will fail for large inputs.
    :param data: Input data
    :param alpha: scalar float in range (0,1)
        The alpha parameter for the moving average.
    :param offset: optional
        The offset for the moving average, scalar. Defaults to data[0].
    :param dtype: optional
        Data type used for calculations. Defaults to float64 unless
        data.dtype is float32, then it will use float32.
    :param order: {'C', 'F', 'A'}, optional
        Order to use when flattening the data. Defaults to 'C'.
    :param out: ndarray, or None, optional
        A location into which the result is stored. If provided, it must have
        the same shape as the input. If not provided or `None`,
        a freshly-allocated array is returned.
    """
    data = np.array(data, copy=False)

    if dtype is None:
        if data.dtype == np.float32:
            dtype = np.float32
        else:
            dtype = np.float64
    else:
        dtype = np.dtype(dtype)

    if data.ndim > 1:
        # flatten input
        data = data.reshape(-1, order)

    if out is None:
        out = np.empty_like(data, dtype=dtype)
    else:
        assert out.shape == data.shape
        assert out.dtype == dtype

    if data.size < 1:
        # empty input, return empty array
        return out

    if offset is None:
        offset = data[0]

    alpha = np.array(alpha, copy=False).astype(dtype, copy=False)

    # scaling_factors -> 0 as len(data) gets large
    # this leads to divide-by-zeros below
    scaling_factors = np.power(1. - alpha, np.arange(data.size + 1, dtype=dtype),
                               dtype=dtype)
    # create cumulative sum array
    np.multiply(data, (alpha * scaling_factors[-2]) / scaling_factors[:-1],
                dtype=dtype, out=out)
    np.cumsum(out, dtype=dtype, out=out)

    # cumsums / scaling
    out /= scaling_factors[-2::-1]

    if offset != 0:
        offset = np.array(offset, copy=False).astype(dtype, copy=False)
        # add offsets
        out += offset * scaling_factors[1:]

    return out

2D ewma函数:

def ewma_vectorized_2d(data, alpha, axis=None, offset=None, dtype=None, order='C', out=None):
    """
    Calculates the exponential moving average over a given axis.
    :param data: Input data, must be 1D or 2D array.
    :param alpha: scalar float in range (0,1)
        The alpha parameter for the moving average.
    :param axis: The axis to apply the moving average on.
        If axis==None, the data is flattened.
    :param offset: optional
        The offset for the moving average. Must be scalar or a
        vector with one element for each row of data. If set to None,
        defaults to the first value of each row.
    :param dtype: optional
        Data type used for calculations. Defaults to float64 unless
        data.dtype is float32, then it will use float32.
    :param order: {'C', 'F', 'A'}, optional
        Order to use when flattening the data. Ignored if axis is not None.
    :param out: ndarray, or None, optional
        A location into which the result is stored. If provided, it must have
        the same shape as the desired output. If not provided or `None`,
        a freshly-allocated array is returned.
    """
    data = np.array(data, copy=False)

    assert data.ndim <= 2

    if dtype is None:
        if data.dtype == np.float32:
            dtype = np.float32
        else:
            dtype = np.float64
    else:
        dtype = np.dtype(dtype)

    if out is None:
        out = np.empty_like(data, dtype=dtype)
    else:
        assert out.shape == data.shape
        assert out.dtype == dtype

    if data.size < 1:
        # empty input, return empty array
        return out

    if axis is None or data.ndim < 2:
        # use 1D version
        if isinstance(offset, np.ndarray):
            offset = offset[0]
        return ewma_vectorized(data, alpha, offset, dtype=dtype, order=order,
                               out=out)

    assert -data.ndim <= axis < data.ndim

    # create reshaped data views
    out_view = out
    if axis < 0:
        axis = data.ndim - int(axis)

    if axis == 0:
        # transpose data views so columns are treated as rows
        data = data.T
        out_view = out_view.T

    if offset is None:
        # use the first element of each row as the offset
        offset = np.copy(data[:, 0])
    elif np.size(offset) == 1:
        offset = np.reshape(offset, (1,))

    alpha = np.array(alpha, copy=False).astype(dtype, copy=False)

    # calculate the moving average
    row_size = data.shape[1]
    row_n = data.shape[0]
    scaling_factors = np.power(1. - alpha, np.arange(row_size + 1, dtype=dtype),
                               dtype=dtype)
    # create a scaled cumulative sum array
    np.multiply(
        data,
        np.multiply(alpha * scaling_factors[-2], np.ones((row_n, 1), dtype=dtype),
                    dtype=dtype)
        / scaling_factors[np.newaxis, :-1],
        dtype=dtype, out=out_view
    )
    np.cumsum(out_view, axis=1, dtype=dtype, out=out_view)
    out_view /= scaling_factors[np.newaxis, -2::-1]

    if not (np.size(offset) == 1 and offset == 0):
        offset = offset.astype(dtype, copy=False)
        # add the offsets to the scaled cumulative sums
        out_view += offset[:, np.newaxis] * scaling_factors[np.newaxis, 1:]

    return out

用法:

data_n = 100000000
data = ((0.5*np.random.randn(data_n)+0.5) % 1) * 100

span = 5000  # span >= 1
alpha = 2/(span+1)  # for pandas` span parameter

# com = 1000  # com >= 0
# alpha = 1/(1+com)  # for pandas` center-of-mass parameter

# halflife = 100  # halflife > 0
# alpha = 1 - np.exp(np.log(0.5)/halflife)  # for pandas` half-life parameter

result = ewma_vectorized_safe(data, alpha)

只是一个提示

根据给定alpha窗口中数据对平均值的贡献,很容易为给定的计算“窗口大小”(技术指数平均值具有无限的“窗口”)。例如,这对于选择由于边界效应而将结果的起始部分视为不可靠的情况很有用。

def window_size(alpha, sum_proportion):
    # Increases with increased sum_proportion and decreased alpha
    # solve (1-alpha)**window_size = (1-sum_proportion) for window_size        
    return int(np.log(1-sum_proportion) / np.log(1-alpha))

alpha = 0.02
sum_proportion = .99  # window covers 99% of contribution to the moving average
window = window_size(alpha, sum_proportion)  # = 227
sum_proportion = .75  # window covers 75% of contribution to the moving average
window = window_size(alpha, sum_proportion)  # = 68

alpha = 2 / (window_size + 1.0)此线程中使用的关系(pandas的’span’选项)与上述函数(带有sum_proportion~=0.87)的逆函数非常近似。alpha = 1 - np.exp(np.log(1-sum_proportion)/window_size)更准确(pandas的“半衰期”选项等于sum_proportion=0.5)。

在下面的示例中,data代表一个连续的噪声信号。cutoff_idx是第一个位置,result其中至少99%的值取决于其中的单独值data(即小于1%的值取决于data
[0])。直到cutoff_idx最终的数据都被排除在数据之外,因为它太依赖于中的第一个值data,因此可能会扭曲平均值。

result = ewma_vectorized_safe(data, alpha, chunk_size)
sum_proportion = .99
cutoff_idx = window_size(alpha, sum_proportion)
result = result[cutoff_idx:]

为了说明上面解决的问题,您可以运行几次,请注意,红线经常出现的错误的开始,在以下位置被跳过cutoff_idx

data_n = 100000
data = np.random.rand(data_n) * 100
window = 1000
sum_proportion = .99
alpha = 1 - np.exp(np.log(1-sum_proportion)/window)

result = ewma_vectorized_safe(data, alpha)

cutoff_idx = window_size(alpha, sum_proportion)
x = np.arange(start=0, stop=result.size)

import matplotlib.pyplot as plt
plt.plot(x[:cutoff_idx+1], result[:cutoff_idx+1], '-r',
         x[cutoff_idx:], result[cutoff_idx:], '-b')
plt.show()

请注意,cutoff_idx==window因为alpha是用window_size()函数的反函数设置的,所以具有sum_proportion。这类似于大熊猫的用法ewm(span=window,min_periods=window)



 类似资料:
  • 问题内容: 我正在为Pyspark中的时间序列编写异常检测算法。我想计算(-3,3)或(-4,4)窗口的加权移动平均值。现在,我正在使用滞后和超前窗口功能,并将它们乘以一组权重。我的窗口当前是(-2,2)。 我想知道是否有另一种方法可以计算Pyspark中的加权移动平均值。 我正在使用的当前代码是: 问题答案: 您可以概括当前的代码: 它可以用作: 注意事项 : 您可能会考虑将滞后缺失的帧的结果标

  • 问题内容: 我有一个二维的numpy数组。我想对每个条目取n个最近条目的平均值,就像对一维数组取滑动平均值一样。什么是最干净的方法? 问题答案: 这与将 滤镜 应用于 图像的 概念类似。 幸运的是,有很多功能可以做到这一点。您所追求的是。 可以这样使用: 如果您需要5x5滤镜,请使用。该选项控制如何处理边缘。您没有指定要如何处理边缘。在此示例中,“常量”模式表示将数组边界之外的每个项目都视为常量值

  • 我也看过Pyspark中的加权移动平均线,但我需要一个Spark/Scala的方法,以及10天或30天的均线。 有什么想法吗?

  • 公式链接:https://sciencing.com/calculate-exponential-moving-averages-8221813.html

  • 问题内容: 我正在写一个使用numpy中的卷积函数的移动平均函数,它应该等效于(加权移动平均)。当我的权重全部相等时(如简单的算术平均值),它可以正常工作: 给 但是,当我尝试使用加权平均值时 而不是(对于相同的数据)3.667,4.667,5.667,6.667,…我希望,我得到 如果删除“有效”标志,则什至看不到正确的值。我真的很想对WMA和MA使用convolve,因为它可以使代码更整洁(相

  • 我正在写一个移动平均函数,它使用numpy中的卷积函数,它应该相当于一个(加权移动平均)。当我的权重都相等时(就像在一个简单的算术平均值中一样),它工作得很好: 给予 对这种行为有什么看法吗?