当前位置: 首页 > 知识库问答 >
问题:

Spark Scala:使用分析函数获取累积和(运行总数)

刁丰羽
2023-03-14

我正在使用窗口函数实现火花中的累积总和。但是在应用窗口分区函数时记录输入的顺序没有保持

输入数据:

val base = List(List("10", "MILLER", "1300", "2017-11-03"), List("10", "Clark", "2450", "2017-12-9"), List("10", "King", "5000", "2018-01-28"),
  List("30", "James", "950", "2017-10-18"), List("30", "Martin", "1250", "2017-11-21"), List("30", "Ward", "1250", "2018-02-05"))
  .map(row => (row(0), row(1), row(2), row(3)))

val DS1 = base.toDF("dept_no", "emp_name", "sal", "date")
DS1.show()
+-------+--------+----+----------+
|dept_no|emp_name| sal|      date|
+-------+--------+----+----------+
|     10|  MILLER|1300|2017-11-03|
|     10|   Clark|2450| 2017-12-9|
|     10|    King|5000|2018-01-28|
|     30|   James| 950|2017-10-18|
|     30|  Martin|1250|2017-11-21|
|     30|    Ward|1250|2018-02-05|
+-------+--------+----+----------+

预期产出:

+-------+--------+----+----------+-----------+
|dept_no|emp_name| sal|      date|Dept_CumSal|
+-------+--------+----+----------+-----------+
|     10|  MILLER|1300|2017-11-03|     1300.0|
|     10|   Clark|2450| 2017-12-9|     3750.0|
|     10|    King|5000|2018-01-28|     8750.0|
|     30|   James| 950|2017-10-18|      950.0|
|     30|  Martin|1250|2017-11-21|     2200.0|
|     30|    Ward|1250|2018-02-05|     3450.0|
+-------+--------+----+----------+-----------+

我试过下面的逻辑

val baseDepCumSal = DS1.withColumn("Dept_CumSal", sum("sal").over(Window.partitionBy("dept_no").
  orderBy(col("sal"), col("emp_name"), col("date").asc).
  rowsBetween(Long.MinValue, 0)
))

baseDepCumSal.orderBy("dept_no", "date").show
+-------+--------+----+----------+-----------+
|dept_no|emp_name| sal|      date|Dept_CumSal|
+-------+--------+----+----------+-----------+
|     10|  MILLER|1300|2017-11-03|     1300.0|
|     10|   Clark|2450| 2017-12-9|     3750.0|
|     10|    King|5000|2018-01-28|     8750.0|
|     30|   James| 950|2017-10-18|     3450.0|
|     30|  Martin|1250|2017-11-21|     1250.0|
|     30|    Ward|1250|2018-02-05|     2500.0|
+-------+--------+----+----------+-----------+

对于dept_no=10,记录是按预期顺序计算的,而对于dept_no=30,记录不是按输入顺序计算的。

共有3个答案

谷梁驰
2023-03-14

可能在orderBy基本文档中缺少emp_name。订购人(“部门号”、“日期”)。show()

施超
2023-03-14

请注意,在您的窗口中的顺序(coll("sall"),coll("emp_name"),coll("date"). asc),与显示"dept_no","date"的顺序不同橱窗里的emp_name”?为什么不按日期订购?

岳安福
2023-03-14

这是因为类型不正确。因为薪水是一个字符串

DS1.printSchema
root
 |-- dept_no: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- sal: string (nullable = true)
 |-- date: string (nullable = true)

它按字典顺序排列:

DS1.orderBy("sal").show
+-------+--------+----+----------+
|dept_no|emp_name| sal|      date|
+-------+--------+----+----------+
|     30|  Martin|1250|2017-11-21|
|     30|    Ward|1250|2018-02-05|
|     10|  MILLER|1300|2017-11-03|
|     10|   Clark|2450| 2017-12-9|
|     10|    King|5000|2018-01-28|
|     30|   James| 950|2017-10-18|
+-------+--------+----+----------+ 

为了得到想要的结果,你必须投(没有必要的框架定义):

DS1.withColumn("Dept_CumSal", sum("sal").over(
  Window
     .partitionBy("dept_no")
    .orderBy(col("sal").cast("integer"), col("emp_name"), col("date").asc))).show
+-------+--------+----+----------+-----------+                                  
|dept_no|emp_name| sal|      date|Dept_CumSal|
+-------+--------+----+----------+-----------+
|     30|   James| 950|2017-10-18|      950.0|
|     30|  Martin|1250|2017-11-21|     2200.0|
|     30|    Ward|1250|2018-02-05|     3450.0|
|     10|  MILLER|1300|2017-11-03|     1300.0|
|     10|   Clark|2450| 2017-12-9|     3750.0|
|     10|    King|5000|2018-01-28|     8750.0|
+-------+--------+----+----------+-----------+
 类似资料:
  • 问题内容: 我想用NumPy创建CDF,下面是我的代码: 我正在阵列旁走,但是需要很长时间执行程序。这个功能有一个内置的功能,不是吗? 问题答案: 我不太确定您的代码在做什么,但是如果您有和返回的数组,则可以用来生成直方图内容的累积和。

  • 我有两个不同长度的向量,每个向量包含0到50之间的数字。有些数字在向量中不包含,其他数字可能出现多次。 我想画一条线,显示每个数字在每个向量中包含的频率,即数字的频率。 如果我将中断设置为每个可能的数字之间,我可以绘制显示频率的直方图: 我知道有一个经验累积分布函数(),它会形成一个S形;但我想要的是一个非累积的经验分布函数,它将导致类似阶梯形钟形曲线的结果,类似于直方图的轮廓。 我能得到的最接近

  • 问题内容: 我正在尝试为dellstore2数据库累计计算用户数。在这里和其他论坛上寻找答案时,我使用了这个 这返回 每个月是 看一下前几项,似乎总的来说还不错。但是当我跑步时 对于整个事情,我明白了 这与第一个输出11,681中的最后一项不一致。我猜上面的计算无法确定整个月的唯一性。什么是最快的计算方式(最好不使用自联接)? 问题答案: 除了直接从订单中选择之外,还可以使用如下子查询: 我认为这

  • 问题内容: 我有一个看起来像这样的表: 我想添加一个新的列,称为cumulative_sum,因此表如下所示: 是否有可以轻松完成此操作的MySQL更新语句?做到这一点的最佳方法是什么? 问题答案: 如果性能是一个问题,则可以使用MySQL变量: 或者,您可以删除该列并在每个查询中对其进行计算: 这以运行方式计算运行总和:)

  • 问题内容: 在numpy或scipy(或其他库)中是否有一个函数将cumsum和cumprod的概念推广为任意函数。例如,考虑(理论上的)函数 func是一个接受两个浮点数并返回一个浮点数的函数。特殊情况 和 分别是cumsum和cumprod。例如,如果 我将其应用于: 我想要 问题答案: NumPy的ufunc有: 不幸的是,呼吁在“编Python函数失败,一个奇怪的错误: 这是将NumPy

  • 我尝试过用这个方法来计算累积值,但是如果日期字段与累积字段中的值相同,那么有人能提出类似于这个问题的解决方案吗