当前位置: 首页 > 知识库问答 >
问题:

数据帧Spark的优化查询

彭骏
2023-03-14

我尝试从配置单元表创建数据帧。但我在Spark API方面做得很差。

我需要帮助来优化方法getLastSession中的查询,为spark将两个任务合并为一个任务:

val pathTable = new File("/src/test/spark-warehouse/test_db.db/test_table").getAbsolutePath
val path      = new Path(s"$pathTable${if(onlyPartition) s"/name_process=$processName" else ""}").toString
val df        = spark.read.parquet(path)


def getLastSession: Dataset[Row] = {
  val lastTime        = df.select(max(col("time_write"))).collect()(0)(0).toString
  val lastSession     = df.select(col("id_session")).where(col("time_write") === lastTime).collect()(0)(0).toString
  val dfByLastSession = df.filter(col("id_session") === lastSession)

  dfByLastSession.show()
  /*
  +----------+----------------+------------------+-------+
  |id_session|      time_write|               key|  value|
  +----------+----------------+------------------+-------+
  |alskdfksjd|1639950466414000|schema2.table2.csv|Failure|

  */
  dfByLastSession
}

PS.我的源表(例如):

共有1个答案

司寇阳曦
2023-03-14

您可以使用row_number与Windows类似:

import org.apache.spark.sql.expressions.Window

val dfByLastSession = df.withColumn(
  "rn", 
  row_number().over(Window.orderBy(desc("time_write")))
).filter("rn=1").drop("rn")
    
dfByLastSession.show()

然而,由于不按任何字段进行分区,可能会降低性能。

您可以在代码中更改的另一件事是使用结构排序来获取与最近的time_write关联的id_session,只需一个查询:

val lastSession = df.select(max(struct(col("time_write"), col("id_session")))("id_session")).first.getString(0)

val dfByLastSession = df.filter(col("id_session") === lastSession)
 类似资料:
  • 如何规范化主要由嵌套数组组成的 spark 数据帧? 我想要类似emple的东西,它将保留(id,foo,bar,baz)的模式,但为数组的每个值返回一个单独的记录。最终结果不应再包含数组。 Foo和baz是相关的。它们的顺序不得扭曲。它们总是具有相同的长度,foo的第一个值与baz的第一个数值相关,以此类推。也许我应该先将它们组合成一个列/结构? 最终结果应如下所示: 部分相关问题-爆炸(转置?

  • 如何查询具有复杂类型(如映射/数组)的RDD?例如,当我编写此测试代码时: 我认为语法应该是这样的: 或 但我明白了 无法访问类型MapType(StringType,StringType,true)中的嵌套字段 和 组织。阿帕奇。火花sql。催化剂错误。包$TreeNodeException:未解析的属性 分别地

  • 我正在使用数据帧读取。拼花地板文件,但不是将它们转换为rdd来进行我的正常处理,我想对它们进行处理。 所以我有我的文件: 即使从数据帧转换为RDD,我也会收到以下错误: :26:错误:值zipWithIndex不是组织的成员。阿帕奇。火花sql。一行 任何人都知道如何做我正在尝试做的事情,本质上是尝试获取值和列索引。 我在想这样的事情: 但最后一部分被卡住了,因为不知道如何做zipWithInde

  • 有人能解释一下将为Spark Dataframe创建的分区数量吗。 我知道对于RDD,在创建它时,我们可以提到如下分区的数量。 但是对于创建时的Spark数据帧,看起来我们没有像RDD那样指定分区数量的选项。 我认为唯一的可能性是,在创建数据帧后,我们可以使用重新分区API。 有人能告诉我在创建数据帧时,我们是否可以指定分区的数量。

  • 我想了解如何在新的< code>DataFrame api中使用< code>CaseWhen表达式。 我在文档中看不到任何对它的引用,我唯一看到它的地方是在代码中:https://github . com/Apache/spark/blob/v 1 . 4 . 0/SQL/catalyst/src/main/Scala/org/Apache/spark/SQL/catalyst/expressi

  • 我试图用复杂类型查询spark sql数据帧,其中函数本身应该能够创建和表达式来为嵌套的复杂数据类型生成列数据帧。说 引用自查询SparkSQL具有复杂类型的DataFrame 用于提取地图类型查询可以是 现在如果我有 代替Map[String,String],如何创建一个udf,在数组的情况下接受名称或索引,并为复杂数据类型中的嵌套元素生成结果。假设现在我想查询a_map_new中包含的< co