我在数据帧中使用SparkSQL。我有一个输入数据帧,我想将其行附加(或插入)到具有更多列的较大数据帧中。我该怎么做?
如果这是SQL,我会使用INSERT-INTO-OUTPUT-SELECT。。。从输入
,但我不知道如何使用Spark SQL实现这一点。
对于具体性:
var input = sqlContext.createDataFrame(Seq(
(10L, "Joe Doe", 34),
(11L, "Jane Doe", 31),
(12L, "Alice Jones", 25)
)).toDF("id", "name", "age")
var output = sqlContext.createDataFrame(Seq(
(0L, "Jack Smith", 41, "yes", 1459204800L),
(1L, "Jane Jones", 22, "no", 1459294200L),
(2L, "Alice Smith", 31, "", 1459595700L)
)).toDF("id", "name", "age", "init", "ts")
scala> input.show()
+---+-----------+---+
| id| name|age|
+---+-----------+---+
| 10| Joe Doe| 34|
| 11| Jane Doe| 31|
| 12|Alice Jones| 25|
+---+-----------+---+
scala> input.printSchema()
root
|-- id: long (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
scala> output.show()
+---+-----------+---+----+----------+
| id| name|age|init| ts|
+---+-----------+---+----+----------+
| 0| Jack Smith| 41| yes|1459204800|
| 1| Jane Jones| 22| no|1459294200|
| 2|Alice Smith| 31| |1459595700|
+---+-----------+---+----+----------+
scala> output.printSchema()
root
|-- id: long (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
|-- init: string (nullable = true)
|-- ts: long (nullable = false)
我想把输入
的所有行附加到输出
的末尾。同时,我想将init
的output
列设置为空字符串'
,ts
列设置为当前时间戳,例如1461883875L。
任何帮助都将不胜感激。
我有一个类似的问题匹配到你的SQL-问题:
我想将一个数据帧附加到一个现有的hive表中,它也更大(更多列)。为了保留您的示例:输出是我现有的表,输入可以是数据帧。我的解决方案只使用SQL为了完整起见,我想提供它:
import org.apache.spark.sql.SaveMode
var input = spark.createDataFrame(Seq(
(10L, "Joe Doe", 34),
(11L, "Jane Doe", 31),
(12L, "Alice Jones", 25)
)).toDF("id", "name", "age")
//--> just for a running example: In my case the table already exists
var output = spark.createDataFrame(Seq(
(0L, "Jack Smith", 41, "yes", 1459204800L),
(1L, "Jane Jones", 22, "no", 1459294200L),
(2L, "Alice Smith", 31, "", 1459595700L)
)).toDF("id", "name", "age", "init", "ts")
output.write.mode(SaveMode.Overwrite).saveAsTable("appendTest");
//<--
input.createOrReplaceTempView("inputTable");
spark.sql("INSERT INTO TABLE appendTest SELECT id, name, age, null, null FROM inputTable");
val df = spark.sql("SELECT * FROM appendTest")
df.show()
哪些输出:
+---+-----------+---+----+----------+
| id| name|age|init| ts|
+---+-----------+---+----+----------+
| 0| Jack Smith| 41| yes|1459204800|
| 1| Jane Jones| 22| no|1459294200|
| 2|Alice Smith| 31| |1459595700|
| 12|Alice Jones| 25|null| null|
| 11| Jane Doe| 31|null| null|
| 10| Joe Doe| 34|null| null|
+---+-----------+---+----+----------+
如果您可能有问题,不知道缺少多少字段,可以使用diff
like
val missingFields = output.schema.toSet.diff(input.schema.toSet)
然后(在错误的伪代码中)
val sqlQuery = "INSERT INTO TABLE appendTest SELECT " + commaSeparatedColumnNames + commaSeparatedNullsForEachMissingField + " FROM inputTable"
希望能帮助未来有类似问题的人!
注意:在您的特殊情况下(init的当前时间戳空字段),您甚至可以使用
spark.sql("INSERT INTO TABLE appendTest SELECT id, name, age, '' as init, current_timestamp as ts FROM inputTable");
这导致了
+---+-----------+---+----+----------+
| id| name|age|init| ts|
+---+-----------+---+----+----------+
| 0| Jack Smith| 41| yes|1459204800|
| 1| Jane Jones| 22| no|1459294200|
| 2|Alice Smith| 31| |1459595700|
| 12|Alice Jones| 25| |1521128513|
| 11| Jane Doe| 31| |1521128513|
| 10| Joe Doe| 34| |1521128513|
+---+-----------+---+----+----------+
SparkDataFrames
是不可变的,因此不可能追加/插入行。相反,您可以添加缺少的列并使用UNION ALL
:
output.unionAll(input.select($"*", lit(""), current_timestamp.cast("long")))
问题内容: 我有看起来像这样的数据: 流程将定期运行并为每个实体评分。该过程将生成数据并将其添加到得分表中,如下所示: 我希望能够为每个实体选择所有实体以及最新记录的分数,从而得到如下所示的一些数据: 我可以使用以下查询获取单个实体的数据: 但是我不知道如何为所有实体选择相同的内容。也许它正盯着我看? 非常感谢您抽出宝贵的时间。 感谢您的好评。我将花几天时间查看首选解决方案是否冒泡,然后选择答案。
问题内容: 我有2张桌子: 表-列,, 表列, 我必须更新的列与表的的与顶部1,其中表匹配。该表具有不同资产的相同名称。 表格示例: 我怎样才能做到这一点?我尝试使用其他查询,但无法获取。 问题答案:
我正在使用齐柏林飞艇0.6.2和火花2.0。 我尝试在循环中执行查询,但效果不是很好。 我需要循环一个数据帧的每一行,大约5000行,并执行一个查询,这将在另一个数据帧中增加一个值。 以下是我的尝试: 我试着从两个数据帧中提取一小部分,但仍然很慢。我觉得我做得不对。 知道如何快速更新数据帧吗?
我有一个for循环,它遍历CSV中的每一行,我创建了一个包含列表的字典,但是列表被重写了,因为字典键被重复了几次。我如何总结或追加到列表中的第二(1)位置相同的键的下一个值下一个循环迭代? 因为使用append时,如果再次找到现有键,则会覆盖该值,因此该键的值会被一次又一次地覆盖。 CSV中的字段包括: 输出应该是这样的:
我有一个带有数值的数据框。添加表示每列总和的行(具有给定索引值)的最简单方法是什么?
问题内容: 我正在编写一个包含10个存储桶列表的简单哈希表。使用内置函数计算索引,然后对表大小取模。但是,当我尝试将对象附加到该索引的存储桶列表时,它会附加到每个存储桶列表。我尝试用不同的方式定义add_HT,但我一直得到相同的结果。我究竟做错了什么? 问题答案: 使指向 同一列表 的指针数量增加。这里不是问题。您需要定义为。