当前位置: 首页 > 知识库问答 >
问题:

如何对数组列的元素进行切片和求和?

芮朗
2023-03-14

我想使用SparkSQL在数组列上sum(或执行其他聚合函数)。

我有一张桌子

+-------+-------+---------------------------------+
|dept_id|dept_nm|                      emp_details|
+-------+-------+---------------------------------+
|     10|Finance|        [100, 200, 300, 400, 500]|
|     20|     IT|                [10, 20, 50, 100]|
+-------+-------+---------------------------------+

我想对emp\u details列的值求和。

预期查询:

sqlContext.sql("select sum(emp_details) from mytable").show

预期结果

1500
180

此外,我还应该能够对范围元素求和,例如:

sqlContext.sql("select sum(slice(emp_details,0,3)) from mytable").show

后果

600
80

当按预期对数组类型执行sum时,它显示sum预期参数为数值类型而不是数组类型。

我认为我们需要为此创建UDF。但是怎么做呢?

我将面临UDF的任何性能冲击吗?除了UDF之外,还有其他解决方案吗?

共有3个答案

郗浩
2023-03-14

一种可能的方法是在您的Array列上使用爆炸(),从而通过唯一键聚合输出。例如:

import sqlContext.implicits._
import org.apache.spark.sql.functions._

(mytable
  .withColumn("emp_sum",
    explode($"emp_details"))
  .groupBy("dept_nm")
  .agg(sum("emp_sum")).show)
+-------+------------+
|dept_nm|sum(emp_sum)|
+-------+------------+
|Finance|        1500|
|     IT|         180|
+-------+------------+

要仅选择数组中的特定值,我们可以使用链接问题的答案,并稍加修改即可应用它:

val slice = udf((array : Seq[Int], from : Int, to : Int) => array.slice(from,to))

(mytable
  .withColumn("slice", 
    slice($"emp_details", 
      lit(0), 
      lit(3)))
  .withColumn("emp_sum",
    explode($"slice"))
  .groupBy("dept_nm")
  .agg(sum("emp_sum")).show)
+-------+------------+
|dept_nm|sum(emp_sum)|
+-------+------------+
|Finance|         600|
|     IT|          80|
+-------+------------+

数据:

val data = Seq((10, "Finance", Array(100,200,300,400,500)),
               (20, "IT", Array(10,20,50,100)))
val mytable = sc.parallelize(data).toDF("dept_id", "dept_nm","emp_details")
董宜然
2023-03-14

从Spark 2.4开始,您可以使用切片函数进行切片:

import org.apache.spark.sql.functions.slice

val df = Seq(
  (10, "Finance", Seq(100, 200, 300, 400, 500)),
  (20, "IT", Seq(10, 20, 50, 100))
).toDF("dept_id", "dept_nm", "emp_details")

val dfSliced = df.withColumn(
   "emp_details_sliced",
   slice($"emp_details", 1, 3)
)

dfSliced.show(false)
+-------+-------+-------------------------+------------------+
|dept_id|dept_nm|emp_details              |emp_details_sliced|
+-------+-------+-------------------------+------------------+
|10     |Finance|[100, 200, 300, 400, 500]|[100, 200, 300]   |
|20     |IT     |[10, 20, 50, 100]        |[10, 20, 50]      |
+-------+-------+-------------------------+------------------+

和带有聚合的和数组

dfSliced.selectExpr(
  "*", 
  "aggregate(emp_details, 0, (x, y) -> x + y) as details_sum",  
  "aggregate(emp_details_sliced, 0, (x, y) -> x + y) as details_sliced_sum"
).show
+-------+-------+--------------------+------------------+-----------+------------------+
|dept_id|dept_nm|         emp_details|emp_details_sliced|details_sum|details_sliced_sum|
+-------+-------+--------------------+------------------+-----------+------------------+
|     10|Finance|[100, 200, 300, 4...|   [100, 200, 300]|       1500|               600|
|     20|     IT|   [10, 20, 50, 100]|      [10, 20, 50]|        180|                80|
+-------+-------+--------------------+------------------+-----------+------------------+
汪凌
2023-03-14

从Spark 2.4开始,Spark SQL支持操作复杂数据结构(包括数组)的高阶函数。

“现代”解决方案如下:

scala> input.show(false)
+-------+-------+-------------------------+
|dept_id|dept_nm|emp_details              |
+-------+-------+-------------------------+
|10     |Finance|[100, 200, 300, 400, 500]|
|20     |IT     |[10, 20, 50, 100]        |
+-------+-------+-------------------------+

input.createOrReplaceTempView("mytable")

val sqlText = "select dept_id, dept_nm, aggregate(emp_details, 0, (acc, value) -> acc + value) as sum from mytable"
scala> sql(sqlText).show
+-------+-------+----+
|dept_id|dept_nm| sum|
+-------+-------+----+
|     10|Finance|1500|
|     20|     IT| 180|
+-------+-------+----+

您可以在以下文章和视频中找到有关高阶函数的详细信息:

  1. 在Apache Spark 2.4中为复杂数据类型引入新的内置和高阶函数
  2. 数据库SQL中使用高阶函数处理嵌套数据
  3. 用Herman van Hovell(数据库)介绍SparkSQL中的高阶函数

免责声明我不推荐这种方法(即使它获得了最多的支持),因为SparkSQL执行Dataset.map时进行了反序列化。查询强制Spark反序列化数据并将其加载到JVM上(从JVM之外由Spark管理的内存区域)。这将不可避免地导致更频繁的GC,从而使性能更差。

一种解决方案是使用数据集解决方案,其中Spark SQL和Scala的组合可以显示其强大功能。

scala> val inventory = Seq(
     |   (10, "Finance", Seq(100, 200, 300, 400, 500)),
     |   (20, "IT", Seq(10, 20, 50, 100))).toDF("dept_id", "dept_nm", "emp_details")
inventory: org.apache.spark.sql.DataFrame = [dept_id: int, dept_nm: string ... 1 more field]

// I'm too lazy today for a case class
scala> inventory.as[(Long, String, Seq[Int])].
  map { case (deptId, deptName, details) => (deptId, deptName, details.sum) }.
  toDF("dept_id", "dept_nm", "sum").
  show
+-------+-------+----+
|dept_id|dept_nm| sum|
+-------+-------+----+
|     10|Finance|1500|
|     20|     IT| 180|
+-------+-------+----+

我把切片部分作为一个练习,因为它同样简单。

 类似资料:
  • 我有一个名为Employees的数组列表,它存储Employees对象。每个员工都有姓、名、小时、工资、毛额、税和净额。我试图计算每个员工的工时、工资、毛工资、税金和净工资的总和。有没有办法从数组列表中提取对象的特定元素? 我试图通过用户的输入来计算总和,但是,它似乎不起作用。

  • 给定任何熊猫数据帧。我想选择列A, B和F: Z 我已经尝试过了但是没有成功。请告诉我怎么做。

  • 主要内容:算法总结及实现,优化算法在实际开发中,有很多场景需要我们将数组元素按照从大到小(或者从小到大)的顺序排列,这样在查阅数据时会更加直观,例如: 一个保存了班级学号的数组,排序后更容易分区好学生和坏学生; 一个保存了商品单价的数组,排序后更容易看出它们的性价比。 对数组元素进行排序的方法有很多种,比如冒泡排序、归并排序、选择排序、插入排序、快速排序等,其中最经典最需要掌握的是「冒泡排序」。 以从小到大排序为例,冒泡排序的整体

  • 问题内容: 在Python中,我有字典列表: 我想要一个最终的字典,其中将包含所有字典的总和。即结果将是: 注意:列表中的每个字典将包含相同数量的键,值对。 问题答案: 有点丑陋,但单线:

  • 问题内容: 在我的程序中,创建了一个固定长度[7]个对象的数组,每个对象都是一个包含3 ,an 和an的类。这些值是从.txt文件中读取的,并基于的值添加到数组的特定索引中。.txt文件中的条目较少,然后数组中存在索引,因此该数组最终看起来像这样: 后来在节目中,我需要的基础上平均的排序的数组中。我有一个工作方法返回这个,但是当我尝试使用数组进行排序和我开始得到这些错误的一个长长的清单: 我的方法

  • 创建一个数组切片,从 arr 数组的最后一个元素开始向前提取n个元素。 使用 Array.slice() 来创建一个从第 n 个元素开始从末尾的数组。 const takeRight = (arr, n = 1) => arr.slice(arr.length - n, arr.length); takeRight([1, 2, 3], 2); // [ 2, 3 ] takeRight([1,