当前位置: 首页 > 面试题库 >

SparkSQL:使用两列的条件总和

宋嘉禧
2023-03-14
问题内容

希望您能帮到我。我有一个DF,如下所示:

val df = sc.parallelize(Seq(
  (1, "a", "2014-12-01", "2015-01-01", 100), 
  (2, "a", "2014-12-01", "2015-01-02", 150),
  (3, "a", "2014-12-01", "2015-01-03", 120), 
  (4, "b", "2015-12-15", "2015-01-01", 100)
)).toDF("id", "prodId", "dateIns", "dateTrans", "value")
.withColumn("dateIns", to_date($"dateIns")
.withColumn("dateTrans", to_date($"dateTrans"))

我很乐意做一个groupBy prodId并汇总“值”,以将其汇总为由“ dateIns”和“
dateTrans”列之间的差异所定义的日期范围。特别是,我希望有一种方法来定义一个条件总和,该总和将上述各列之间的预定义最大差之内的所有值相加。即从dateIns开始的10、20、30天之间发生的所有值(’dateTrans’-‘dateIns’<=
10、20、30)。

spark中是否有任何预定义的聚合函数可以进行条件求和?您是否建议开发aggr。UDF(如果有的话,有什么建议)?我正在使用pySpqrk,但也非常高兴获得Scala解决方案。非常感谢!


问题答案:

让您更有趣一点,以便窗口中有一些事件:

val df = sc.parallelize(Seq(
  (1, "a", "2014-12-30", "2015-01-01", 100), 
  (2, "a", "2014-12-21", "2015-01-02", 150),
  (3, "a", "2014-12-10", "2015-01-03", 120), 
  (4, "b", "2014-12-05", "2015-01-01", 100)
)).toDF("id", "prodId", "dateIns", "dateTrans", "value")
.withColumn("dateIns", to_date($"dateIns"))
.withColumn("dateTrans", to_date($"dateTrans"))

您所需要的或多或少是这样的:

import org.apache.spark.sql.functions.{col, datediff, lit, sum}

// Find difference in tens of days 
val diff = (datediff(col("dateTrans"), col("dateIns")) / 10)
  .cast("integer") * 10

val dfWithDiff = df.withColumn("diff", diff)

val aggregated = dfWithDiff 
  .where((col("diff") < 30) && (col("diff") >= 0))
  .groupBy(col("prodId"), col("diff"))
  .agg(sum(col("value")))

结果

aggregated.show
// +------+----+----------+
// |prodId|diff|sum(value)|
// +------+----+----------+
// |     a|  20|       120|
// |     b|  20|       100|
// |     a|   0|       100|
// |     a|  10|       150|
// +------+----+----------+

其中diff是范围(0-> [0,10),10-> [10,20),…)的下限。如果您删除val并调整了导入,这也将在PySpark中起作用。

编辑 (每列汇总):

val exprs = Seq(0, 10,  20).map(x => sum(
  when(col("diff") === lit(x), col("value"))
    .otherwise(lit(0)))
    .alias(x.toString))

dfWithDiff.groupBy(col("prodId")).agg(exprs.head, exprs.tail: _*).show

// +------+---+---+---+
// |prodId|  0| 10| 20|
// +------+---+---+---+
// |     a|100|150|120|
// |     b|  0|  0|100|
// +------+---+---+---+

与Python等效:

from pyspark.sql.functions import *

def make_col(x):
   cnd = when(col("diff") == lit(x), col("value")).otherwise(lit(0))
   return sum(cnd).alias(str(x))

exprs = [make_col(x) for x in range(0, 30, 10)]
dfWithDiff.groupBy(col("prodId")).agg(*exprs).show()

## +------+---+---+---+
## |prodId|  0| 10| 20|
## +------+---+---+---+
## |     a|100|150|120|
## |     b|  0|  0|100|
## +------+---+---+---+


 类似资料:
  • 问题内容: 我有两个这样的表,两个都是单独的表 另一个表包含以下结构 我需要从表II中选择AccountNo或TempAccountNo,Mycolumn,条件是 我需要选择 我需要选择 我该如何实现。 问题答案:

  • 我有两个mysql表: 我想从表:loan中获取“loan\u amount”的和,从表:advance中获取“advance\u amount”的和,在一个由内部联接连接的mysql查询中。 如何从两个表的列中求和?

  • 如果相同,下面是一个代码,用于添加金额,并根据创建一个新的

  • 我正在使用谷歌表单的过滤功能,但无法按我想要的方式使用,已经3天了。。。 基本上,我有第1页,有一列“电子邮件”和一列“潜在客户ID”。表2具有相同的“潜在客户ID”,但已过滤。含义,第1页,其“顺序为1,2,3,4,5…”。。。第二张不是,像是2,4,5,23,41。。。我想在表1中找到正确的电子邮件地址,该地址在两个表中具有相同的Lead ID。我使用了Filter函数,它工作得非常好,因为它

  • 我目前在R中有一个数据框,看起来像这样 我用group_by将数据按动物分组。我想创建一个新的列V6,它采用列V4,将较低的值除以较高的值,如果该值小于0.5,则V6= A,ifelse则V6 = b..有没有办法在R中使用带条件语句的mutate函数?实际的数据框要大得多,所以我宁愿不用手动操作。这是我希望最终数据框的样子 这就是我开始做的 但我知道这是不对的。非常感谢。

  • 问题内容: 我正在从AWK编程语言学习awk,但其中一个示例存在问题。 如果我想在$ 2等于一个值的情况下打印$ 3(例如),则我使用的是此命令,它可以正常工作: 但是,当我用另一个搜索条件(例如)代替1时,该命令不起作用: 它不返回任何输出,并且我确定输入文件中存在“ findtext”。 我也试过了,但是不起作用: 这是我的测试文件“ test”,它有9行和8个字段,以空格分隔: 这是我的工作