当前位置: 首页 > 知识库问答 >
问题:

如何在Spark中对两个列数不同的数据流执行联合?

岳京
2023-03-14

我有2数据帧%s:

我需要这样的联合:

unionall函数不起作用,因为列的编号和名称不同。

我怎么能这么做?

共有1个答案

白嘉石
2023-03-14

在Scala中,您只需将所有缺少的列追加为nulls

scala prettyprint-override">import org.apache.spark.sql.functions._

// let df1 and df2 the Dataframes to merge
val df1 = sc.parallelize(List(
  (50, 2),
  (34, 4)
)).toDF("age", "children")

val df2 = sc.parallelize(List(
  (26, true, 60000.00),
  (32, false, 35000.00)
)).toDF("age", "education", "income")

val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union

def expr(myCols: Set[String], allCols: Set[String]) = {
  allCols.toList.map(x => x match {
    case x if myCols.contains(x) => col(x)
    case _ => lit(null).as(x)
  })
}

df1.select(expr(cols1, total):_*).unionAll(df2.select(expr(cols2, total):_*)).show()

+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 50|       2|     null|   null|
| 34|       4|     null|   null|
| 26|    null|     true|60000.0|
| 32|    null|    false|35000.0|
+---+--------+---------+-------+

两个时态Dataframes将具有相同的列顺序,因为在这两种情况下,我们都是通过total进行映射。

df1.select(expr(cols1, total):_*).show()
df2.select(expr(cols2, total):_*).show()

+---+--------+---------+------+
|age|children|education|income|
+---+--------+---------+------+
| 50|       2|     null|  null|
| 34|       4|     null|  null|
+---+--------+---------+------+

+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 26|    null|     true|60000.0|
| 32|    null|    false|35000.0|
+---+--------+---------+-------+
 类似资料:
  • 我在数据帧df1中有一组列(col1,col2,col3)我在数据帧df2中有另一组列(col4,col5,col6)假设这两个数据帧具有相同的行数。 如何生成在df1和df2之间进行成对相关的相关表? 这张桌子看起来像 我使用,它似乎没有按要求生成表。 我已经看到了如何检查两个数据集的匹配列之间的相关性的答案?,但主要的区别在于col名称不匹配。

  • 我试图使用流重构一些不那么优雅的代码。我有一个包含字符串和MyObject的HashMap,目前正在使用for循环对其进行迭代,如下所示: 因为我只关心id,所以我首先使用map操作只获取id,然后使用filter操作来消除空的id。 下一部分是我遇到的问题。我尝试的第一件事是使用Collectors groupingBy操作,这样我可以根据id的第一个字符对项目进行分组,结果是: 此链接有助于使

  • 输入数据集2 预期输出为 `

  • 我有两个表:p. test和q。在两个不同的数据库上测试-p和q。 这两个表都有不同的MYSQL连接,并且位于两个不同的服务器上。 我需要做一个自然连接,如下所示:http://www.microshell.com/database/sql/comparing-data-from-2-database-tables/2/ 我怎么可能做到呢?我还使用java PreparedStatement来实际

  • 问题内容: 我有两个要连接的表。 TABLE_A: TABLE_B: RESULT_TABLE: 我试图使用这样的东西: 但是,结果仅包括该列存在于TABLE_A中的行。 有没有办法连接TABLE_A和TABLE_B以产生RESULT_TABLE中显示的结果? 问题答案: 如果要获得所有结果,则需要一个 外部 联接,而不是一个 内部 联接。(内部仅返回匹配的行;外部返回所有行,其中匹配的行“缝合在

  • 在scala spark中连接不同数据帧时动态选择多列 从上面的链接,我能够让连接表达式工作,但如果列名不同,我们不能使用Seq(columns)而需要动态地连接它。这里的left_ds和right_ds是我想加入的数据流。下面我想要连接列id=acc_id和“acc_no=number”