val data1 = Seq(
("1","111",200,"221",100,"331",1000),
("2","112",400,"222",500,"332",1000),
("3","113",600,"223",1000,"333",1000)
).toDF("id1","t1","val1","t2","val2","t3","val3")
data1.show()
+---+---+----+---+----+---+----+
|id1| t1|val1| t2|val2| t3|val3|
+---+---+----+---+----+---+----+
| 1|111| 200|221| 100|331|1000|
| 2|112| 400|222| 500|332|1000|
| 3|113| 600|223|1000|333|1000|
+---+---+----+---+----+---+----+
val data2 = Seq(("1","111",200),("1","221",100),("1","331",1000),
("2","112",400),("2","222",500),("2","332",1000),
("3","113",600),("3","223",1000), ("3","333",1000)
).toDF("id*","t*","val*")
data2.show()
+---+---+----+
|id*| t*|val*|
+---+---+----+
| 1|111| 200|
| 1|221| 100|
| 1|331|1000|
| 2|112| 400|
| 2|222| 500|
| 2|332|1000|
| 3|113| 600|
| 3|223|1000|
| 3|333|1000|
+---+---+----+
My output should look like below:
+---+---+--------+---+---------+
|id1| t |sum(val)| t*|sum(val*)|
+---+---+--------+---+---------+
| 1|111| 200|111| 200|
| 1|221| 100|221| 100|
| 1|331| 1000|331| 1000|
| 2|112| 400|112| 400|
| 2|222| 500|222| 500|
| 2|332| 1000|332| 1000|
| 3|113| 600|113| 600|
| 3|223| 1000|223| 1000|
| 3|333| 1000|333| 1000|
+---+---+--------+---+---------+
我正在考虑将dataset1分解为每个“T”类型的多个记录,然后与DataSet2连接。但是你能给我一个更好的方法,如果数据集变大了,它不会影响性能吗?
最简单的解决方案是进行子选择,然后联合数据集:
val ts = Seq(1, 2, 3)
val dfs = ts.map (t => data1.select("t" + t as "t", "v" + t as "v"))
val unioned = dfs.drop(1).foldLeft(dfs(0))((l, r) => l.union(r))
val ds = unioned.join(df2, 't === col("t*")
here aggregation
你也可以用爆炸来尝试数组:
val df1 = data1.withColumn("colList", array('t1, 't2, 't3))
.withColumn("t", explode(colList))
.select('t, 'id1 as "id")
val ds = df2.withColumn("val",
when('t === 't1, 'val1)
.when('t === 't2, 'val2)
.when('t === 't3, 'val3)
.otherwise(0))
最后一步是将此Dataset与Data2连接起来:
ds.join(data2, 't === col("t*"))
.groupBy("t", "t*")
.agg(first("id1") as "id1", sum(val), sum("val*"))
我正在使用数据帧读取。拼花地板文件,但不是将它们转换为rdd来进行我的正常处理,我想对它们进行处理。 所以我有我的文件: 即使从数据帧转换为RDD,我也会收到以下错误: :26:错误:值zipWithIndex不是组织的成员。阿帕奇。火花sql。一行 任何人都知道如何做我正在尝试做的事情,本质上是尝试获取值和列索引。 我在想这样的事情: 但最后一部分被卡住了,因为不知道如何做zipWithInde
我正在尝试通过ID和日期聚合数据帧。假设我有一个DataFrame: 我想通过ID和日期(频率=1W)聚合该值,并得到一个dataframe如下所示: 我理解它可以通过迭代ID并使用grouper聚合价格来实现。有没有更有效的方法不迭代IDS?多谢。
我正在用PySpark DataFrames分析一些数据。假设我有一个正在聚合的数据帧< code>df: 这将给我: 聚合工作得很好,但我不喜欢新的列名。有没有办法将此列重命名为人类可以从方法中读取的内容?也许更类似于中的操作:
我想了解如何在新的< code>DataFrame api中使用< code>CaseWhen表达式。 我在文档中看不到任何对它的引用,我唯一看到它的地方是在代码中:https://github . com/Apache/spark/blob/v 1 . 4 . 0/SQL/catalyst/src/main/Scala/org/Apache/spark/SQL/catalyst/expressi
我需要聚合一个基于1分钟时间间隔的数据集。当我尝试此操作时,它会抛出错误: 我的数据集如下所示 org.apache.spark.sql.AnalysisException:无法解析(datetime,value)中的列名“60秒”;在org.apache.spark.sql.dataset$$anonfun$resolve$1.apply(dataset.scala:216)在org.apach
一、简单聚合 1.1 数据准备 // 需要导入 spark sql 内置的函数包 import org.apache.spark.sql.functions._ val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate() val empDF = spark.read.jso