当前位置: 首页 > 知识库问答 >
问题:

apache spark中的聚合函数

郎成弘
2023-03-14

我需要聚合一个基于1分钟时间间隔的数据集。当我尝试此操作时,它会抛出错误:

我的数据集如下所示

scala> newVX.show
+--------------------+-----+
|            datetime|value|
+--------------------+-----+
|2017-07-31 10:53:...| 0.26|
|2017-07-31 10:53:...| 0.81|
|2017-07-31 09:45:...| 0.42|
|2017-07-31 09:44:...|0.008|
|2017-07-31 09:37:...| 0.14|
|2017-07-31 09:35:...|0.365|
|2017-07-31 09:34:...|0.485|
|2017-07-31 09:33:...| 0.49|
|2017-07-31 09:28:...| 1.15|
|2017-07-31 09:27:...|0.325|
|2017-07-31 09:24:...|0.845|
|2017-07-31 09:24:...|0.045|
|2017-07-31 09:23:...|0.015|
|2017-07-31 09:20:...| 0.45|
|2017-07-31 09:20:...| 0.05|
|2017-07-31 09:19:...| 0.14|
|2017-07-31 09:18:...| 0.24|
|2017-07-31 09:12:...|0.125|
|2017-07-31 09:11:...|  0.3|
|2017-07-31 09:11:...| 0.13|
+--------------------+-----+


scala> newVX.groupBy("datetime","60 seconds").agg(avg("value")).show

org.apache.spark.sql.AnalysisException:无法解析(datetime,value)中的列名“60秒”;在org.apache.spark.sql.dataset$$anonfun$resolve$1.apply(dataset.scala:216)在org.apache.spark.sql.dataset$$anonfun$resolve$1.apply(dataset.scala:216)在scala.option.getorelse(option.scala:121)在org.apache.spark.sql.dataset$$anonfun$groupby$2.apply(dataset.scala:216)在

我也尝试了另一种解决方法。但它为每60行提供一个值,而不是聚合。

scala> newVX.groupBy(window($"datetime","1 minute")).agg(avg("value") as "avg-va
lue").show()


17/07/31 12:41:02 WARN Executor: Managed memory leak detected; size = 4456448 by
tes, TID = 5
+--------------------+-------------------+
|              window|          avg-value|
+--------------------+-------------------+
|[2017-07-31 07:49...| 0.7699999809265137|
|[2017-07-31 05:34...|0.33500000834465027|
|[2017-07-31 04:26...|0.23999999463558197|
|[2017-07-30 20:04...| 0.9399999976158142|
|[2017-07-29 08:33...|0.20250000059604645|
|[2017-07-28 09:30...| 0.3400000035762787|
|[2017-07-27 16:36...| 1.2799999713897705|
|[2017-07-27 08:16...| 0.3400000035762787|
|[2017-07-27 08:11...| 0.3400000035762787|
|[2017-07-27 01:06...| 0.4650000035762787|
|[2017-07-26 23:53...|0.23999999463558197|
|[2017-07-26 19:49...| 0.3199999928474426|
|[2017-07-25 14:39...| 0.3400000035762787|
|[2017-07-25 07:54...| 0.7099999785423279|
|[2017-07-25 06:21...|0.29499998688697815|
|[2017-07-25 03:57...| 0.1899999976158142|
|[2017-07-24 20:31...| 1.2799999713897705|
|[2017-07-24 19:50...| 1.2799999713897705|
|[2017-07-24 16:26...|0.03999999910593033|
|[2017-07-24 16:10...|              0.125|
+--------------------+-------------------+
only showing top 20 rows

编辑:我在这里做了一些修正,但它仍然显示错误的结果。我保留了日期到分钟的值

val VX = newvx.withColumn("datetime", ((unix_timestamp($"datetime") / 60)
.cast("long") * 60).cast("timestamp"))

在我聚合之后,它仍然显示错误的值。

scala> VX.groupBy("datetime").agg(Map("value" -> "mean")).show
17/07/31 15:58:15 WARN Executor: Managed memory leak detected; size = 4456448 by
tes, TID = 21
+-------------------+-------------------+
|           datetime|         avg(value)|
+-------------------+-------------------+
|2017-07-31 06:38:00| 0.6100000143051147|
|2017-07-30 19:46:00| 0.3400000035762787|
|2017-07-30 09:24:00|0.42500001192092896|
|2017-07-29 08:53:00| 0.8899999856948853|
|2017-07-29 15:07:00| 0.3400000035762787|
|2017-07-29 05:26:00| 0.3100000023841858|
|2017-07-28 23:29:00|0.27250000834465027|
|2017-07-28 22:07:00| 0.3199999928474426|
|2017-07-28 20:48:00| 0.2849999964237213|
|2017-07-28 20:13:00|0.44999998807907104|
|2017-07-28 18:07:00|0.20999999344348907|
|2017-07-28 06:38:00|0.08500000089406967|
|2017-07-27 11:27:00|0.26499998569488525|
|2017-07-27 02:37:00| 1.0549999475479126|
|2017-07-27 02:12:00| 0.3449999988079071|
|2017-07-26 22:22:00| 0.4699999988079071|
|2017-07-25 15:22:00| 0.8199999928474426|
|2017-07-25 07:08:00| 0.2800000011920929|
|2017-07-25 06:42:00|0.32499998807907104|
|2017-07-25 04:42:00|0.30000001192092896|
+-------------------+-------------------+
only showing top 20 rows

知道为什么吗?我该怎么纠正呢?谢谢.

共有1个答案

洪雅健
2023-03-14

您可以使用以下方法:

在spark中创建一个用户定义函数,它将只保留日期到分钟颗粒级别。例如2017-07-31 10:53

def atMinute = udf((dateTime: String) => // implement here retain date till minute)

使用自定义项转换初始数据帧

val df_at_minute = df.withColumn("datetime_at_min", atMinute("datetime"))

将聚合函数应用于新数据帧

 df_at_minute.groupBy("datetime_at_min").agg(avg("value"))
 类似资料:
  • 在介绍 Django 中 ORM 模型的聚合函数之前,我们先要了解下 MySQL 中常用的聚合函数。首先同样是准备数据,使用我i们之前在第 18 小节中完成的插入 100 条数据的代码,重新执行一次: (django-manual) [root@server test]# python insert_records.py 批量插入完成 此时,连同上次操作剩余的两条会员记录,数据库中总共有 10

  • 主要内容:1.COUNT函数,2. SUM函数,3. AVG函数,4. MAX函数,5. MIN函数SQL聚合函数用于对表的单个列的多行执行计算,它只返回一个值。它还用于汇总数据。 SQL聚合函数的类型,如下图所示 - 接下来,我们一个个地讲解。 1.COUNT函数 函数用于计算数据库表中的行数,它可以在数字和非数字数据类型上工作。 函数使用返回指定表中所有行的计数。 包函重复值和值。 语法 假设有一个表,它的结构和数据如下所示 - PRODUCT COMPANY QTY RATE COST I

  • 问题内容: 在解释CTE的一些概念时,有人问了一个可爱的问题..我们可以找到行的乘法吗,而我们总是从新手开始集中精力。那给了我一个想法!仅使用SQL是否有可能。我还考虑了我们甚至可以支持的最大精度,因为该产品可能非常庞大。 话虽如此,我们不能编写自己的聚合函数。(可以吗?)我在想仅使用SQL就有可能。 我想到的就像是自己添加2,3次。.但是当集合很大时..由于繁琐,我无法实现。 另一个可能是和,为

  • 查看SQL的ANSI聚合函数,我找不到字符串的任何东西。但是,每个数据库似乎都有自己的数据库,例如MySQL和Oracle的GROUP_CONCAT和LISTAGG,这使得可移植性有点困难。我是不是缺了什么?这是有原因的吗?

  • 问题内容: 我正在尝试在Oracle中编写一个自定义聚合函数,并将该函数与其他一些函数一起分组在一个包中。作为一个示例(为了模拟我遇到的问题),假设我的自定义聚合对数字进行求和看起来像: 如果我编写以下函数定义: 和相应的类型声明进行测试: 这个说法: 给出正确的结果70。但是,使用函数定义创建一个包: 并通过以下方式调用: 与爆炸 是否可以在包声明中嵌套自定义聚合函数? 问题答案: Oracle

  • 我对GraphQL的分析解决方案非常感兴趣(想想一个显示图形的webapp)。但我找不到任何使用聚合函数的GraphQL示例。这是我的前端完成的大多数查询的一个主要方面。 对于我的解决方案,我们有3个典型的后端调用。 搜索 假设我们在GraphQL中指定了这种类型 搜寻 GraphQL似乎很好地处理了这一点。毫无疑问。 ex.搜索Bob的年龄{Person(name:"Bob"){age}} 这是