当前位置: 首页 > 知识库问答 >
问题:

在Scala Spark中连接不同数据帧时动态选择多列

钱劲
2023-03-14

我有两个火花数据帧DF1DF2。在连接这两个数据流的同时,是否有一种方法可以动态地选择输出列?在内部连接的情况下,下面的定义输出来自df1和df2的所有列。

def joinDF (df1: DataFrame,  df2: DataFrame , joinExprs: Column, joinType: String): DataFrame = {   
  val dfJoinResult = df1.join(df2, joinExprs, joinType)
  dfJoinResult
  //.select()
}
val df1 = List(("1","new","current"), ("2","closed","saving"), ("3","blocked","credit")).toDF("id","type","account")
val df2 = List(("1","7"), ("2","5"), ("5","8")).toDF("id","value")
val dfJoinResult = df1
  .join(df2, df1("id") === df2("id"), "inner")
  .select(df1("type"), df1("account"), df2("value")) 

DfJoinResult.Schema():

StructType(StructField(type,StringType,true), 
StructField(account,StringType,true), 
StructField(value,StringType,true))

我查看了df.select(cols.head,cols.tail:_*)等选项,但它不允许从两个DF中选择列。有没有一种方法可以动态地传递selectexpr列以及我们想要在我的def中从中选择的数据帧详细信息?我使用的是Spark2.2.0。

共有1个答案

蒋星雨
2023-03-14

可以将select表达式作为seq[Column]传递给方法:

def joinDF(df1: DataFrame,  df2: DataFrame , joinExpr: Column, joinType: String, selectExpr: Seq[Column]): DataFrame = {   
  val dfJoinResult = df1.join(df2, joinExpr, joinType)
  dfJoinResult.select(selectExpr:_*)
}

若要调用该方法,请使用:

val joinExpr = df1.col("id") === df2.col("id")
val selectExpr = Seq(df1.col("type"), df1.col("account"), df2.col("value"))

val testDf = joinDF(df1, df2, joinExpr, "inner", selectExpr)

这将产生预期的结果:

+------+-------+-----+
|  type|account|value|
+------+-------+-----+
|   new|current|    7|
|closed| saving|    5|
+------+-------+-----+
    null
def joinDF(df1: DataFrame,  df2: DataFrame , joinExpr: Seq[String], joinType: String, selectExpr: Seq[String]): DataFrame = {   
  val dfJoinResult = df1.join(df2, joinExpr, joinType)
  dfJoinResult.select(selectExpr.head, selectExpr.tail:_*)
}

调用该方法现在看起来更清晰:

val joinExpr = Seq("id")
val selectExpr = Seq("type", "account", "value")

val testDf = joinDF(df1, df2, joinExpr, "inner", selectExpr)

注意:当使用seq[String]执行join时,与使用表达式相比,结果数据帧的列名将有所不同。当存在具有相同名称的列时,将无法在之后单独选择这些列。

 类似资料:
  • 我正在构建一个需要连接2个数据库的应用程序。第一个是静态的,另一个是动态的。config/database.php类似于 模型代码就像 如果我提供静态连接详细信息,我可以连接多个数据库,但如果我提供动态连接详细信息,如 或 动态连接多个数据库而不影响应用程序性能的最佳方法是什么?

  • 有很多关于这个问题的帖子,但没有一个回答我的问题。 在尝试将许多不同的数据帧连接在一起时,我在PySpark中遇到了<code>OutOfMemoryError 我的本地机器有16GB内存,我已将Spark配置设置为: 关于Spark中OOM错误的SO帖子显然很多很多,但基本上大多数都是说增加你的内存属性。 我实际上是对50-60个较小的数据帧执行连接,这些数据帧有两列< code>uid和< c

  • 问题内容: 是否可以对位于同一服务器上的不同数据库进行选择(或插入)语句?如果是,怎么办? 问题答案: 您将使用以下语法指定数据库 例:

  • 我在mysql数据库中有名为的表,其中存在类、学生姓名等。 我想在jsp中使用select选项,这样当且仅当首先选择class时,在选择class之后,该特定类的所有学生姓名都应该通过从数据库中检索记录自动(动态)显示在另一个select下拉列表中。 在这里我想使用servlet进行数据库连接,并通过通过jsp访问所有数据库记录

  • 我想做的是: 使用两个数据帧 和 各自的 列和连接它们。我想从中选择所有列,从中选择两个特定列 我尝试了类似于我在下面用不同引号写的东西,但仍然不起作用。我觉得在pyspark中,应该有一种简单的方法来做到这一点。 我知道你可以写作 这样做,但我想更像上面的伪代码。