当前位置: 首页 > 知识库问答 >
问题:

在Spark Java API中连接行数据集

卫昊东
2023-03-14

我想要两个连接两个数据集DS1和DS2以获得DS3

+---------+--------------------+-----------+------------+
|Compte   |         Lib        |ReportDebit|ReportCredit|
+---------+--------------------+-----------+------------+
|   447105|Autres impôts, ta...|    77171.0|         0.0|
|   753000|Jetons de présenc...|     6839.0|         0.0|
|   511107|Valeurs à l’encai...|        0.0|     77171.0|
+---------+--------------------+-----------+------------+
+---------+------------+
|Compte   |SoldeBalance|
+---------+------------+
| 447105  |      992.13|
| 111111  |     35065.0|
+---------+--------------------+-----------+------------+------------+
|Compte   |           CompteLib|ReportDebit|ReportCredit|SoldeBalance|
+---------+--------------------+-----------+------------+------------+
|   447105|Autres impôts, ta...|    77171.0|         0.0|      992.13|
|   753000|Jetons de présenc...|    6839.0 |         0.0|         0.0|
|   511107|Valeurs à l’encai...|        0.0|     77171.0|         0.0|
    111111|                    |        0.0|         0.0|     35065.0|
+---------+--------------------+-----------+------------+------------+

共有1个答案

吴胜
2023-03-14

您可以通过应用完整的外部联接,然后用所需的值替换空值来实现这一点。

import static org.apache.spark.sql.functions.*;

...

ds1.join(ds2, ds1.col("Compte").equalTo(ds2.col("Compte")), "full_outer")
                .select(ds1.col("Compte").alias("Compte1"),
                        ds2.col("Compte").alias("Compte2"),
                        ds1.col("Lib"),
                        ds1.col("ReportDebit"),
                        ds1.col("ReportCredit"),
                        ds2.col("SoldeBalance"))
                .withColumn("Compte", when(col("Compte1").isNull(), col("Compte2")).otherwise(col("Compte1")))
                .drop("Compte1", "Compte2")
                .na().fill(0.0, new String[] { "ReportDebit", "ReportCredit", "SoldeBalance" })
                .na().fill("", new String[] { "Lib" })
                .show();

输出:

+--------------------+-----------+------------+------------+------+
|                 Lib|ReportDebit|ReportCredit|SoldeBalance|Compte|
+--------------------+-----------+------------+------------+------+
|Valeurs à l’encai...|        0.0|     77171.0|         0.0|511107|
|Autres impôts, ta...|    77171.0|         0.0|      992.13|447105|
|                    |        0.0|         0.0|     35065.0|111111|
|Jetons de présenc...|     6839.0|         0.0|         0.0|753000|
+--------------------+-----------+------------+------------+------+
 类似资料:
  • 我需要将两个数据帧和一个接一个地连接起来,它们具有相同的行数(),而不考虑任何键。此函数类似于

  • 我在计算引擎中创建了一个实例。 我安装了postgresql 9.6,现在我正在尝试使用pgadmin客户端连接外部ip,但它给了我一个错误 连接到35.224.170.161:5432被拒绝。检查主机名和端口是否正确,邮政局长是否接受TCP/IP连接。连接到35.224.170.161:5432被拒绝。检查主机名和端口是否正确,邮政局长是否接受TCP/IP连接。连接被拒绝:连接被拒绝:连接 我只

  • 问题:当我的spring应用程序运行时,同时数据库服务器停止/重新启动,然后db连接丢失并且从未恢复。 com.mysql.jdbc.exceptions.jdbc4.mysqlnontransientConnectionException:连接关闭后不允许任何操作。 服务mysql启动 问题:如何告诉spring在连接丢失后自动重新连接? 这是我的配置:

  • 数据连接是任何数据分析的常见要求。可能需要在单个源中连接来自不同表的数据,或者从多个源连接数据。 Tableau提供了使用“Data”菜单中提供的数据窗格来连接表的功能。 连接意味着组合关系数据库中一个或多个表的列。它还创建一个可以保存为表的集合,或者可以按原样使用它。 连接指定有五种类型: 交叉连接。 内部连接。 自然连接。 外连接。 左外连接。 右外连接。 完全外连接。 自连接。 1. 连接类

  • 连接数据库 在能够对MongDB进行操作之前,需要使用BuguFramework创建一个数据库连接,代码如下: BuguConnection conn = BuguFramework.getInstance().createConnection(); conn.connect("192.168.0.100", 27017, "mydb", "username", "password"); 也可以

  • 一、全局配置定义 return array( 'DB_TYPE' => 'mysql', 'DB_HOST' => '127.0.0.1', 'DB_NAME' => 'thinkcmf', 'DB_USER' => 'root', 'DB_PWD' => 'root', 'DB_PORT' => '3306', 'DB_PREFIX' =>