当前位置: 首页 > 知识库问答 >
问题:

PySpark:在窗口上计数不同

张姚石
2023-03-14

下面是一些示例代码:

from pyspark.sql.window import Window    
from pyspark.sql import functions as F

#function to calculate number of seconds from number of days
days = lambda i: i * 86400

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),
                    (13, "2017-03-15T12:27:18+00:00", "red"),
                    (25, "2017-03-18T11:27:18+00:00", "red")],
                    ["dollars", "timestampGMT", "color"])
                    
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))

df = df.withColumn('distinct_color_count_over_the_last_week', F.countDistinct("color").over(w))

df.show()

这是我希望看到的输出:

+-------+--------------------+------+---------------------------------------+
|dollars|        timestampGMT| color|distinct_color_count_over_the_last_week|
+-------+--------------------+------+---------------------------------------+
|     17|2017-03-10 15:27:...|orange|                                      1|
|     13|2017-03-15 12:27:...|   red|                                      2|
|     25|2017-03-18 11:27:...|   red|                                      1|
+-------+--------------------+------+---------------------------------------+

共有1个答案

岳奇逸
2023-03-14

编辑:

正如noleto在下面的回答中提到的,自从PySpark2.1以来,现在有了一个在窗口上工作的approx_count_distinct函数。

原始答案

from pyspark.sql.window import Window
from pyspark.sql import functions as F

#function to calculate number of seconds from number of days
days = lambda i: i * 86400

#create some test data
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),
                    (13, "2017-03-15T12:27:18+00:00", "red"),
                    (25, "2017-03-18T11:27:18+00:00", "red")],
                    ["dollars", "timestampGMT", "color"])

#convert string timestamp to timestamp type             
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))

#use collect_set and size functions to perform countDistinct over a window
df = df.withColumn('distinct_color_count_over_the_last_week', F.size(F.collect_set("color").over(w)))

df.show()
+-------+--------------------+------+---------------------------------------+
|dollars|        timestampGMT| color|distinct_color_count_over_the_last_week|
+-------+--------------------+------+---------------------------------------+
|     17|2017-03-10 15:27:...|orange|                                      1|
|     13|2017-03-15 12:27:...|   red|                                      2|
|     25|2017-03-18 11:27:...|   red|                                      1|
+-------+--------------------+------+---------------------------------------+
 类似资料:
  • 我有一个包含以下数据的pyspark dataframe:

  • 问题内容: 样本数据可能会有助于解释我想做的事情,而不是解释它,因此,我将从此开始。 这是我目前正在使用的数据: 我正在尝试在15分钟的时间内滚动显示此数据中的出现次数。该数据的预期结果如下: 样本数据: 我可以通过以下方式 使它 起作用: 但是,我想避免使用子查询,而建议使用(或其他任何可能的解决方案)解决方案。 这可能吗?还是子查询是正确的解决方案? 问题答案: 一种方法-如果表很大,可能比嵌

  • 我有一个每周客户和商店信息数据集, 问题-我必须计算的特征,如总独特的客户在最近1,2,3,4,5,6周..等,截至当前的一周。 我在使用count distinct of customers column over window函数时出错- 我尝试了concat函数创建数组,也没有工作-谢谢帮助! 错误-SQL编译错误:distinct不能与窗口框架或订单一起使用。 如何修复此错误? 输入 输出

  • 问题内容: 我的项目包括Selenium Webdriver,JAVA,Maven,TestNG,Jenkins,Allure(报告)。我有几套包含100多个测试用例的测试,并通过3种不同的浏览器进行了迭代(测试使用TestNG并行运行)。 除非我实际上正在观察窗口并查看测试运行,否则无法通过一项测试。 我将解释: 我要测试什么? 我们的JS开发人员创建了一项功能,仅当用户将焦点放在窗口上时,图像

  • 问题内容: 我正在尝试将某些Windows函数(和)用于数据框,但我不知道如何使用它们。 有人可以帮我吗?在Python API文档 中,没有关于它的示例。 具体来说,我正在尝试获取数据框中数字字段的分位数。 我正在使用Spark 1.4.0。 问题答案: 要使用窗口功能,您必须先创建一个窗口。定义与普通SQL几乎相同,这意味着您可以定义顺序,分区或同时定义两者。首先让我们创建一些虚拟数据: 确保

  • 问题内容: 我正在遵循上的示例,但无法在出现的窗口中成功显示字符串。尺寸和标题设置正确,并显示窗口。如果我在控制台上使用paint()方法输出一个字符串,我会看到它实际上被调用了几次,但是该字符串没有出现在我的应用程序窗口中。我看不到与示例有何不同;我实际上少了一些代码(他们添加了鼠标侦听器和按键侦听器):\ 问题答案: 您遇到的问题是您直接在框架顶部绘画。框架还包括框架边框,因此位置0、0(或者