当前位置: 首页 > 面试题库 >

如何在PySpark 2.1.0中的事件时间窗口上定义UDAF

郗河
2023-03-14
问题内容

我正在编写一个Python应用程序,该程序可在带有时间戳的值序列上滑动窗口。我想对滑动窗口中的值应用一个函数,以便根据N个最新值计算分数,如图所示。我们已经使用Python库实现了该功能,以利用GPU。

我发现Apache Spark
2.0附带了结构化流,并且它支持事件时间的窗口操作。如果您想从.csv文件中读取有限的记录序列,并希望在这样的滑动窗口中对记录进行计数,则可以在PySpark中使用以下代码:

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType
from pyspark.sql.functions import window
from os import getcwd

spark = SparkSession \
    .builder \
    .master('local[*]') \
    .getOrCreate()

schema = StructType() \
    .add('ts', 'timestamp') \
    .add('value', 'double') \

sqlContext = SQLContext(spark)
lines = sqlContext \
    .readStream \
    .format('csv') \
    .schema(schema) \
    .load(path='file:///'+getcwd()+'/csv')

windowedCount = lines.groupBy(
    window(lines.ts, '30 minutes', '10 minutes')
).agg({'value':'count'})

query = windowedCount \
   .writeStream \
    .outputMode('complete') \
    .format('console') \
    .start()

query.awaitTermination()

但是,我想在滑动窗口上应用除预定义聚合功能以外的UDAF。根据https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.GroupedData.agg,可用的汇总函数仅为avg,max,min,求和和计数。

还不支持吗?如果是这样,何时在PySpark中支持它?

https://stackoverflow.com/a/32750733/1564381显示,可以在Java或Scala中定义UserDefinedAggregateFunction,然后在PySpark中调用它。看起来很有趣,但是我想将自己的Python函数应用于滑动窗口中的值。我想要一种纯粹的Python方式。

ps让我知道了除PySpark以外的Python中可以解决此类问题的任何框架(将UDAF应用于在流上滑动的窗口上)。


问题答案:

在Spark <2.3中,您不能执行此操作。

对于Spark> = 2.3,这对于分组数据是可行的,但对于使用“带有Pys的PySpark UDAF的Windows”而言,尚不可行。

当前,PySpark无法在Windows上运行UserDefined函数。

这是一个对此有一个很好描述的SO问题:在PySpark中的GroupedData上应用UDF(带有可运行的python示例)

这是添加了此功能的JIRA票证-https:
//issues.apache.org/jira/browse/SPARK-10915



 类似资料:
  • 为什么会这样?如果我在“assigntimestamps(timestampExtractor)”之前添加“keyby(keySelector)”,那么程序可以工作。有人能解释一下原因吗?

  • 从flink办公室引入会话窗口 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-窗口。。。会话窗口操作符为每个到达的记录创建一个新窗口,如果窗口之间的距离比定义的间隙更近,则将窗口合并在一起。为了可合并,会话窗口操作符需要合并触发器和合并窗口函数,。。。

  • 假设我有一个股票市场交易事件流,如下所示: 使得technicalN(其中N是一些数字)代表给定公司的日终股票市场交易数据的第N个技术交易条目[开盘(浮动)、高位(浮动)、低位(浮动)、收盘(浮动)、成交量(int)]。(即ticker GOOG的技术1不同于ticker MSFT的技术1。)如: (请注意,这些交易价格/交易量完全是虚构的。 假设我想创建一个大小为2、时间间隔为1天的窗口,这样我

  • 当我使用 flink 事件时间窗口时,窗口只是不触发。如何解决问题,有没有办法调试?

  • 我正在运行一个简单的示例来测试基于EventTime的Windows。我能够生成带有处理时间的输出,但当我使用EventTime时,没有输出。请帮助我明白我做错了什么。

  • 我有一个微服务,它会收到一条消息,大概是这样的: 是否有任何开箱即用的解决方案允许方法每天在给定时间运行?使用 Spring 中的 cron 不涉及在运行时更改时间。我需要使用更灵活的选项(更改运行时,多次启动) 谢谢!

  • 我想知道是否可以创建类似于以下内容的WindowAssigner: 但我不希望窗口在每个元素的事件时间中保持增长。我希望在接收到的第一个元素(对于该键)处定义窗口的开头,并在1秒后精确结束,无论有多少元素到达该秒。 所以它可能看起来像这样的假设: 谢谢