当前位置: 首页 > 知识库问答 >
问题:

Spark:数据帧聚合(Scala)

澹台衡
2023-03-14
val data1 = Seq(
    ("1","111",200,"221",100,"331",1000),
    ("2","112",400,"222",500,"332",1000),
    ("3","113",600,"223",1000,"333",1000)
).toDF("id1","t1","val1","t2","val2","t3","val3")

data1.show()

+---+---+----+---+----+---+----+
|id1| t1|val1| t2|val2| t3|val3|
+---+---+----+---+----+---+----+
|  1|111| 200|221| 100|331|1000|
|  2|112| 400|222| 500|332|1000|
|  3|113| 600|223|1000|333|1000|
+---+---+----+---+----+---+----+    
val data2 = Seq(("1","111",200),("1","221",100),("1","331",1000),
  ("2","112",400),("2","222",500),("2","332",1000),
  ("3","113",600),("3","223",1000), ("3","333",1000)
).toDF("id*","t*","val*")

data2.show()    

+---+---+----+
|id*| t*|val*|
+---+---+----+
|  1|111| 200|
|  1|221| 100|
|  1|331|1000|
|  2|112| 400|
|  2|222| 500|
|  2|332|1000|
|  3|113| 600|
|  3|223|1000|
|  3|333|1000|
+---+---+----+      
My output should look like below:
+---+---+--------+---+---------+
|id1| t |sum(val)| t*|sum(val*)|
+---+---+--------+---+---------+
|  1|111|     200|111|      200|
|  1|221|     100|221|      100|
|  1|331|    1000|331|     1000|
|  2|112|     400|112|      400|
|  2|222|     500|222|      500|
|  2|332|    1000|332|     1000|
|  3|113|     600|113|      600|
|  3|223|    1000|223|     1000|
|  3|333|    1000|333|     1000|
+---+---+--------+---+---------+

我正在考虑将dataset1分解为每个“T”类型的多个记录,然后与DataSet2连接。但是你能给我一个更好的方法,如果数据集变大了,它不会影响性能吗?

共有1个答案

孟跃
2023-03-14

最简单的解决方案是进行子选择,然后联合数据集:

val ts = Seq(1, 2, 3)
val dfs = ts.map (t => data1.select("t" + t as "t", "v" + t as "v"))
val unioned = dfs.drop(1).foldLeft(dfs(0))((l, r) => l.union(r))

val ds = unioned.join(df2, 't === col("t*")
here aggregation

你也可以用爆炸来尝试数组:

val df1 = data1.withColumn("colList", array('t1, 't2, 't3))
               .withColumn("t", explode(colList))
               .select('t, 'id1 as "id")

val ds = df2.withColumn("val", 
          when('t === 't1, 'val1)
          .when('t === 't2, 'val2)
          .when('t === 't3, 'val3)
          .otherwise(0))

最后一步是将此Dataset与Data2连接起来:

ds.join(data2, 't === col("t*"))
  .groupBy("t", "t*")
  .agg(first("id1") as "id1", sum(val), sum("val*"))
 类似资料:
  • 我正在使用数据帧读取。拼花地板文件,但不是将它们转换为rdd来进行我的正常处理,我想对它们进行处理。 所以我有我的文件: 即使从数据帧转换为RDD,我也会收到以下错误: :26:错误:值zipWithIndex不是组织的成员。阿帕奇。火花sql。一行 任何人都知道如何做我正在尝试做的事情,本质上是尝试获取值和列索引。 我在想这样的事情: 但最后一部分被卡住了,因为不知道如何做zipWithInde

  • 我正在尝试通过ID和日期聚合数据帧。假设我有一个DataFrame: 我想通过ID和日期(频率=1W)聚合该值,并得到一个dataframe如下所示: 我理解它可以通过迭代ID并使用grouper聚合价格来实现。有没有更有效的方法不迭代IDS?多谢。

  • 我正在用PySpark DataFrames分析一些数据。假设我有一个正在聚合的数据帧< code>df: 这将给我: 聚合工作得很好,但我不喜欢新的列名。有没有办法将此列重命名为人类可以从方法中读取的内容?也许更类似于中的操作:

  • 我想了解如何在新的< code>DataFrame api中使用< code>CaseWhen表达式。 我在文档中看不到任何对它的引用,我唯一看到它的地方是在代码中:https://github . com/Apache/spark/blob/v 1 . 4 . 0/SQL/catalyst/src/main/Scala/org/Apache/spark/SQL/catalyst/expressi

  • null null 为什么要使用UDF/UADF而不是map(假设map保留在数据集表示中)?

  • 我需要聚合一个基于1分钟时间间隔的数据集。当我尝试此操作时,它会抛出错误: 我的数据集如下所示 org.apache.spark.sql.AnalysisException:无法解析(datetime,value)中的列名“60秒”;在org.apache.spark.sql.dataset$$anonfun$resolve$1.apply(dataset.scala:216)在org.apach