我正在尝试根据时间序列数据上的滑动窗口提取特征。在Scala中,似乎有一个sliding
基于此帖子和文档的功能
import org.apache.spark.mllib.rdd.RDDFunctions._
sc.parallelize(1 to 100, 10)
.sliding(3)
.map(curSlice => (curSlice.sum / curSlice.size))
.collect()
我的问题是PySpark是否有类似的功能?或者,如果还没有这样的功能,我们如何实现类似的滑动窗口转换?
据我所知,sliding
函数在Python中不可用,SlidingRDD
是私有类,不能在外部访问MLlib
。
如果要sliding
在现有的RDD上使用,则可以这样创建可怜人sliding
:
def sliding(rdd, n):
assert n > 0
def gen_window(xi, n):
x, i = xi
return [(i - offset, (i, x)) for offset in xrange(n)]
return (
rdd.
zipWithIndex(). # Add index
flatMap(lambda xi: gen_window(xi, n)). # Generate pairs with offset
groupByKey(). # Group to create windows
# Sort values to ensure order inside window and drop indices
mapValues(lambda vals: [x for (i, x) in sorted(vals)]).
sortByKey(). # Sort to makes sure we keep original order
values(). # Get values
filter(lambda x: len(x) == n)) # Drop beginning and end
或者,您可以尝试这样的操作(在的帮助下toolz
)
from toolz.itertoolz import sliding_window, concat
def sliding2(rdd, n):
assert n > 1
def get_last_el(i, iter):
"""Return last n - 1 elements from the partition"""
return [(i, [x for x in iter][(-n + 1):])]
def slide(i, iter):
"""Prepend previous items and return sliding window"""
return sliding_window(n, concat([last_items.value[i - 1], iter]))
def clean_last_items(last_items):
"""Adjust for empty or to small partitions"""
clean = {-1: [None] * (n - 1)}
for i in range(rdd.getNumPartitions()):
clean[i] = (clean[i - 1] + list(last_items[i]))[(-n + 1):]
return {k: tuple(v) for k, v in clean.items()}
last_items = sc.broadcast(clean_last_items(
rdd.mapPartitionsWithIndex(get_last_el).collectAsMap()))
return rdd.mapPartitionsWithIndex(slide)
假设我有一个股票市场交易事件流,如下所示: 使得technicalN(其中N是一些数字)代表给定公司的日终股票市场交易数据的第N个技术交易条目[开盘(浮动)、高位(浮动)、低位(浮动)、收盘(浮动)、成交量(int)]。(即ticker GOOG的技术1不同于ticker MSFT的技术1。)如: (请注意,这些交易价格/交易量完全是虚构的。 假设我想创建一个大小为2、时间间隔为1天的窗口,这样我
目前准备用redis实现分布式限流策略, 抛开那些redis插件,我觉得zset实现理论上可行, 具体逻辑可以是这样: 抽象限流逻辑: 针对某个action,需要在一段时间(即为窗口),只能容许N个操作 redis伪代码 问题: 假设某个行为有大量的不同样本或者不同实例在调用, 也就是key不一样, 假设有百万以上,但是每个key的行为发生次数有限,例如就一次, 这个时候就会出现很大冷数据 处理该
下面是一些示例代码: 这是我希望看到的输出:
在第一个窗口row_number1到4中,新的秩(新列)将是 在第一个窗口row_number5到8中,新的秩(新列)将是 在第一个窗口中,Row_Number9要Rest,新的秩(新列)将是 但这给了我: 此外,尝试了。rowsbetween(-3,0)但这也给我带来了错误:
问题内容: 我正在尝试将某些Windows函数(和)用于数据框,但我不知道如何使用它们。 有人可以帮我吗?在Python API文档 中,没有关于它的示例。 具体来说,我正在尝试获取数据框中数字字段的分位数。 我正在使用Spark 1.4.0。 问题答案: 要使用窗口功能,您必须先创建一个窗口。定义与普通SQL几乎相同,这意味着您可以定义顺序,分区或同时定义两者。首先让我们创建一些虚拟数据: 确保