当前位置: 首页 > 知识库问答 >
问题:

通过提供表名而不是表路径,将spark数据帧写入现有增量表

蒙麒
2023-03-14

我正在尝试将火花数据帧写入现有的增量表中。

我有多个场景可以将数据保存到不同的表中,如下所示。

场景-01:

我有一个现有的delta表,我必须使用选项<code>mergeSchema</code>将数据帧写入该表,因为模式可能会随每次加载而改变。

我通过提供delta表路径对下面的命令执行相同的操作

finalDF01.write.format("delta").option("mergeSchema", "true").mode("append") \
  .partitionBy("part01","part02").save(finalDF01DestFolderPath)

我只想知道是否可以通过提供现有的增量表名而不是增量路径来实现这一点。

这已通过更新数据写入命令得到解决,如下所示。

finalDF01.write.format("delta").option("mergeSchema", "true").mode("append") \
  .partitionBy("part01","part02").saveAsTable(finalDF01DestTableName)
  1. 这是正确的路吗?

场景 02:

如果记录已经存在,我必须更新现有的表,如果不存在,则插入新记录。为此,我目前正在做如下所示。

spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true") 
DeltaTable.forPath(DestFolderPath)
   .as("t")
   .merge(
      finalDataFrame.as("s"),
         "t.id = s.id AND t.name= s.name")
       .whenMatched().updateAll()
       .whenNotMatched().insertAll()
   .execute()

我尝试使用以下脚本

destMasterTable.as("t")
      .merge(
         vehMasterDf.as("s"),
          "t.id = s.id")
       .whenNotMatched().insertAll()
       .execute()

但得到以下错误(即使使用别名而不是 as)。

error: value as is not a member of String
    destMasterTable.as("t")

最好提供表名而不是表路径,以防我们以后更改表路径不会影响代码。我在databricks文档中没有看到任何地方提供表名以及< code>mergeSchema和< code>autoMerge。这样做可能吗?

共有1个答案

罗华翰
2023-03-14

要将现有数据用作表而不是路径,您需要从一开始就使用<code>saveAsTable</code>,或者使用SQL命令CREATE table using将现有数据注册到Hive元存储中,如下所示(语法可能略有不同,具体取决于您是在Databricks还是OSS Spark上运行,取决于Spark的版本):

CREATE TABLE IF NOT EXISTS my_table
USING delta
LOCATION 'path_to_existing_data'

之后,您可以使用saveAsTable

对于第二个问题 - 看起来德斯特主表只是一个字符串。要引用现有表,您需要使用来自增量表对象 (doc) 的函数 forName

DeltaTable.forName(destMasterTable)
  .as("t")
  ...
 类似资料:
  • 我需要检查工作表是否存在。如果存在,您必须键入下一个现有行,并且不要创建新的工作表。 您当前正在删除当前的电子表格,而我在电子表格中只写了一行。 我该如何解决这个问题?

  • 问题内容: 我正在尝试更改没有主键或auto_increment列的表。我知道如何添加主键列,但我想知道是否有可能自动将数据插入主键列(我已经在数据库中有500行,并希望为其提供ID,但我不想手动执行) 。有什么想法吗?非常感谢。 问题答案: 在我的测试中,添加列的语句可以正常工作: 在为测试目的而创建的临时表上,以上语句创建了该 列,并为该表中的每个现有行插入了自动递增值,从1开始。

  • 我正在运行一个由Kafka、Spark和Cassandra组成的1节点集群。全部本地在同一台机器上。 从一个简单的Python脚本中,我每5秒将一些虚拟数据流到一个Kafka主题中。然后使用Spark结构化流,我将这个数据流(一次一行)读入PySpark DataFrame中,并使用=。最后,我尝试将此行追加到一个已经存在的Cassandra表中。 我一直在关注(如何向Cassandra编写流数据

  • null 非常感谢任何指向文档或非常基本的示例的指针。

  • 我正在使用Spark 2.3,我需要将Spark数据帧保存到csv文件中,我正在寻找更好的方法。。查看相关/类似的问题,我发现了这个问题,但我需要一个更具体的: 如果DataFrame太大,如何避免使用Pandas?因为我使用了函数(下面的代码),它产生了: 内存不足错误(无法分配内存)。 使用文件I/O直接写入csv是更好的方法吗?它可以保留分隔符吗? 使用df。聚结(1)。写选项(“标题”、“

  • 我正在探索DataBricks Delta表及其时间旅行/时间特性。我有一些过去发生的事件数据。我正在尝试将它们插入delta表,并能够使用数据中的时间戳而不是实际的插入时间进行时间旅行。 我的事件中有一个日期/时间列。我将其重命名为“时间戳”,但它仍然不起作用。 我的 csv 数据如下所示:(数据显示 id=1000 的单个案例发生了 5 次更新) 我使用这些命令来创建增量表: 我有两个问题: