当前位置: 首页 > 知识库问答 >
问题:

Spark SQL:如何将新行追加到dataframe表(来自另一个表)

康文昌
2023-03-14

我在数据帧中使用SparkSQL。我有一个输入数据帧,我想将其行附加(或插入)到具有更多列的较大数据帧中。我该怎么做?

如果这是SQL,我会使用INSERT-INTO-OUTPUT-SELECT。。。从输入,但我不知道如何使用Spark SQL实现这一点。

对于具体性:

var input = sqlContext.createDataFrame(Seq(
        (10L, "Joe Doe", 34),
        (11L, "Jane Doe", 31),
        (12L, "Alice Jones", 25)
        )).toDF("id", "name", "age")

var output = sqlContext.createDataFrame(Seq(
        (0L, "Jack Smith", 41, "yes", 1459204800L),
        (1L, "Jane Jones", 22, "no", 1459294200L),
        (2L, "Alice Smith", 31, "", 1459595700L)
        )).toDF("id", "name", "age", "init", "ts")


scala> input.show()
+---+-----------+---+
| id|       name|age|
+---+-----------+---+
| 10|    Joe Doe| 34|
| 11|   Jane Doe| 31|
| 12|Alice Jones| 25|
+---+-----------+---+

scala> input.printSchema()
root
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)


scala> output.show()
+---+-----------+---+----+----------+
| id|       name|age|init|        ts|
+---+-----------+---+----+----------+
|  0| Jack Smith| 41| yes|1459204800|
|  1| Jane Jones| 22|  no|1459294200|
|  2|Alice Smith| 31|    |1459595700|
+---+-----------+---+----+----------+

scala> output.printSchema()
root
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)
 |-- init: string (nullable = true)
 |-- ts: long (nullable = false)

我想把输入的所有行附加到输出的末尾。同时,我想将initoutput列设置为空字符串'ts列设置为当前时间戳,例如1461883875L。

任何帮助都将不胜感激。

共有2个答案

丁高峯
2023-03-14

我有一个类似的问题匹配到你的SQL-问题:

我想将一个数据帧附加到一个现有的hive表中,它也更大(更多列)。为了保留您的示例:输出是我现有的表,输入可以是数据帧。我的解决方案只使用SQL为了完整起见,我想提供它:

import org.apache.spark.sql.SaveMode

var input = spark.createDataFrame(Seq(
        (10L, "Joe Doe", 34),
        (11L, "Jane Doe", 31),
        (12L, "Alice Jones", 25)
        )).toDF("id", "name", "age")

//--> just for a running example: In my case the table already exists
var output = spark.createDataFrame(Seq(
        (0L, "Jack Smith", 41, "yes", 1459204800L),
        (1L, "Jane Jones", 22, "no", 1459294200L),
        (2L, "Alice Smith", 31, "", 1459595700L)
        )).toDF("id", "name", "age", "init", "ts")

output.write.mode(SaveMode.Overwrite).saveAsTable("appendTest");
//<--

input.createOrReplaceTempView("inputTable");

spark.sql("INSERT INTO TABLE appendTest SELECT id, name, age, null, null FROM inputTable");
val df = spark.sql("SELECT * FROM appendTest")
df.show()

哪些输出:

+---+-----------+---+----+----------+
| id|       name|age|init|        ts|
+---+-----------+---+----+----------+
|  0| Jack Smith| 41| yes|1459204800|
|  1| Jane Jones| 22|  no|1459294200|
|  2|Alice Smith| 31|    |1459595700|
| 12|Alice Jones| 25|null|      null|
| 11|   Jane Doe| 31|null|      null|
| 10|    Joe Doe| 34|null|      null|
+---+-----------+---+----+----------+

如果您可能有问题,不知道缺少多少字段,可以使用difflike

val missingFields = output.schema.toSet.diff(input.schema.toSet)

然后(在错误的伪代码中)

val sqlQuery = "INSERT INTO TABLE appendTest SELECT " + commaSeparatedColumnNames + commaSeparatedNullsForEachMissingField + " FROM inputTable"

希望能帮助未来有类似问题的人!

注意:在您的特殊情况下(init的当前时间戳空字段),您甚至可以使用

spark.sql("INSERT INTO TABLE appendTest SELECT id, name, age, '' as init, current_timestamp as ts FROM inputTable");

这导致了

+---+-----------+---+----+----------+
| id|       name|age|init|        ts|
+---+-----------+---+----+----------+
|  0| Jack Smith| 41| yes|1459204800|
|  1| Jane Jones| 22|  no|1459294200|
|  2|Alice Smith| 31|    |1459595700|
| 12|Alice Jones| 25|    |1521128513|
| 11|   Jane Doe| 31|    |1521128513|
| 10|    Joe Doe| 34|    |1521128513|
+---+-----------+---+----+----------+
阎淮晨
2023-03-14

SparkDataFrames是不可变的,因此不可能追加/插入行。相反,您可以添加缺少的列并使用UNION ALL

output.unionAll(input.select($"*", lit(""), current_timestamp.cast("long")))
 类似资料:
  • 问题内容: 我有看起来像这样的数据: 流程将定期运行并为每个实体评分。该过程将生成数据并将其添加到得分表中,如下所示: 我希望能够为每个实体选择所有实体以及最新记录的分数,从而得到如下所示的一些数据: 我可以使用以下查询获取单个实体的数据: 但是我不知道如何为所有实体选择相同的内容。也许它正盯着我看? 非常感谢您抽出宝贵的时间。 感谢您的好评。我将花几天时间查看首选解决方案是否冒泡,然后选择答案。

  • 问题内容: 我有2张桌子: 表-列,, 表列, 我必须更新的列与表的的与顶部1,其中表匹配。该表具有不同资产的相同名称。 表格示例: 我怎样才能做到这一点?我尝试使用其他查询,但无法获取。 问题答案:

  • 我正在使用齐柏林飞艇0.6.2和火花2.0。 我尝试在循环中执行查询,但效果不是很好。 我需要循环一个数据帧的每一行,大约5000行,并执行一个查询,这将在另一个数据帧中增加一个值。 以下是我的尝试: 我试着从两个数据帧中提取一小部分,但仍然很慢。我觉得我做得不对。 知道如何快速更新数据帧吗?

  • 我有一个for循环,它遍历CSV中的每一行,我创建了一个包含列表的字典,但是列表被重写了,因为字典键被重复了几次。我如何总结或追加到列表中的第二(1)位置相同的键的下一个值下一个循环迭代? 因为使用append时,如果再次找到现有键,则会覆盖该值,因此该键的值会被一次又一次地覆盖。 CSV中的字段包括: 输出应该是这样的:

  • 我有一个带有数值的数据框。添加表示每列总和的行(具有给定索引值)的最简单方法是什么?

  • 问题内容: 我正在编写一个包含10个存储桶列表的简单哈希表。使用内置函数计算索引,然后对表大小取模。但是,当我尝试将对象附加到该索引的存储桶列表时,它会附加到每个存储桶列表。我尝试用不同的方式定义add_HT,但我一直得到相同的结果。我究竟做错了什么? 问题答案: 使指向 同一列表 的指针数量增加。这里不是问题。您需要定义为。