当前位置: 首页 > 知识库问答 >
问题:

如何在spark中连接多列,同时从另一个表获取要连接的列名(每行不同)

昝唯
2023-03-14
table - **t**
+---+----+  
| id|name|
+---+----+  
|  1|   a|  
|  2|   b|
+---+----+
table - **r**
+---+-------+
| id|   att |
+---+-------+
|  1|id,name|
|  2|   id  |
+---+-------+

如果我将这两个表连接起来,并执行如下操作,我可以concat但不是基于表r(因为新列的第一行是1,但第二行应该只有2)

t.withColumn("new",concat_ws(",",t.select("att").first.mkString.split(",").map(c => col(c)): _*)).show
+---+----+-------+---+
| id|name|  att  |new|
+---+----+-------+---+
|  1|   a|id,name|1,a|
|  2|   b|  id   |2,b|
+---+----+-------+---+

我必须在上面的查询中的select之前应用filter,但我不确定如何在withColumn中为每一行执行此操作。

下面的东西,如果可能的话。

t.withColumn("new",concat_ws(",",t.**filter**("id="+this.id).select("att").first.mkString.split(",").map(c => col(c)): _*)).show
scala> t.filter("id=1").select("att").first.mkString.split(",").map(c => col(c))
res90: Array[org.apache.spark.sql.Column] = Array(id, name)

scala> t.filter("id=2").select("att").first.mkString.split(",").map(c => col(c))
res89: Array[org.apache.spark.sql.Column] = Array(id)
+---+----+-------+---+
| id|name|  att  |new|
+---+----+-------+---+
|  1|   a|id,name|1,a|
|  2|   b|  id   |2  |
+---+----+-------+---+

共有1个答案

汪信鸥
2023-03-14

我们可以使用UDF

此逻辑工作的要求。

表t的列名应该与表r的col att中的列名的顺序相同

scala> input_df_1.show
+---+----+
| id|name|
+---+----+
|  1|   a|
|  2|   b|
+---+----+

scala> input_df_2.show
+---+-------+
| id|    att|
+---+-------+
|  1|id,name|
|  2|     id|
+---+-------+

scala> val join_df = input_df_1.join(input_df_2,Seq("id"),"inner")
join_df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> val req_cols = input_df_1.columns
req_cols: Array[String] = Array(id, name)

scala> def new_col_udf = udf((cols : Seq[String],row : String,attr : String) => {
     |     val row_values = row.split(",")
     |     val attrs = attr.split(",")
     |     val req_val = attrs.map{at =>
     |     val index = cols.indexOf(at)
     |     row_values(index)
     |     }
     |     req_val.mkString(",")
     |     })
new_col_udf: org.apache.spark.sql.expressions.UserDefinedFunction

scala>  val intermediate_df = join_df.withColumn("concat_column",concat_ws(",",'id,'name)).withColumn("new_col",new_col_udf(lit(req_cols),'concat_column,'att))
intermediate_df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]

scala> val result_df = intermediate_df.select('id,'name,'att,'new_col)
result_df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> result_df.show
+---+----+-------+-------+
| id|name|    att|new_col|
+---+----+-------+-------+
|  1|   a|id,name|    1,a|
|  2|   b|     id|      2|
+---+----+-------+-------+
 类似资料:
  • 有没有办法通过两个列表< code>join两个< code > Spark data frame 具有不同的列名? 我知道如果他们在列表中有相同的名字,我可以做以下事情: 或者,如果我知道不同的列名,我可以这样做: 由于我的方法需要2个列表的输入,这些列表指定哪些列将用于每个DF的,因此我想知道Scala Spark是否有办法做到这一点? 页(page的缩写)我在寻找类似python的东西< c

  • 问题内容: 我的MySQL数据库中有这些表: 通用表: Facebook表: 首席表: 基本上,常规表包含一些( 显然 )常规数据。基于generalTable.scenario,您可以在其他两个表中查找更多详细信息,这些表在某些熟悉的列中(例如,expiresAt),而在其他一些列中则不然。 我的问题是,如何仅通过一个查询就可以获取generalTable和正确的明细表的联接数据。 所以,我想这

  • 我有三张桌子;1.学生-id,name 2.主题-sid,sname 3.结果-id,sid,marks(id和sid是上面两个表引用的外键) 现在,我执行 我得到了最高分的科目名称。现在,我还想要获得这些最高分的学生姓名。 所以我尝试添加列 r.id,不起作用。我尝试在此查询中添加表学生。我可能会在添加表格或其他东西后对分组进行恶作剧? 这是我干的 我得到了每个学生的ID,有重复的科目和分数。而

  • 我有这样一个简单的数据框架: 结果是这样的 当他只有一个人,而且分数是75分,而不是40分和35分时,我如何将名为“Adi”的行与“Adi”的行组合在同一列中

  • 首先,我有这个“用户”模型 而这种“用户详细信息”模型 我试图从user_details模型中只获取特定的列(user_details[外键=user_id]模型与用户[主键/引用键=user_id]模型有关系) 但不幸的是,不幸的是,没有工作,我得到了这个错误(参考下面) 查询onnection.php第624行中的异常: SQLSTATE[42S22]:找不到列: 1054字段列表中的未知列u

  • 我今天脑子很慢... 我有一张像这样的表格 WP_PostMeta 我想这样显示此列: 我已使用此查询进行了测试 但我错了; 任何曲目都很受欢迎。