当前位置: 首页 > 知识库问答 >
问题:

如何使用Spark截断数据并删除配置单元表中的所有分区

西门飞星
2023-03-14
truncate table my_table; // Deletes all data, but keeps partitions in metastore

alter table my_table drop partition(p_col > 0) // does not work from spark

共有1个答案

鲁浩渺
2023-03-14

让我们使用Spark 2.4.3来设置这个问题:

// We create the table
spark.sql("CREATE TABLE IF NOT EXISTS potato (size INT) PARTITIONED BY (hour STRING)")

// Enable dynamic partitioning 
spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")

// Insert some dummy records
(1 to 9).map(i => spark.sql(s"INSERT INTO potato VALUES ($i, '2020-06-07T0$i')"))

// Verify inserts
spark.table("potato").count // 9 records

我们使用外部目录的ListPartitionsDropPartitions函数。

// Get External Catalog
val catalog = spark.sharedState.externalCatalog

// Get the spec from the list of all partitions 
val partitions = catalog.listPartitions("default", "potato").map(_.spec)

// We pass them to the Catalog's dropPartitions function.
// If you purge data, it gets deleted immediately and isn't moved to trash.
// This takes precedence over retainData, so even if you retainData but purge,
// your data is gone.
catalog.dropPartitions("default", "potato", partitions,
                   ignoreIfNotExists=true, purge=true, retainData=false)
spark.table("potato").count // 0 records
catalog.listPartitions("default", "potato").length // 0 partitions

这对于托管的表非常有效,但是对于外部的表呢?

// We repeat the setup above but after creating an EXTERNAL table
// After dropping we see that the partitions appear to be gone (or are they?).
catalog.listPartitions("default", "potato").length // 0 partitions

// BUT repairing the table simply adds them again, the partitions/data 
// were NOT deleted from the underlying filesystem. This is not what we wanted!
spark.sql("MSCK REPAIR TABLE potato")
catalog.listPartitions("default", "potato").length // 9 partitions again!   
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.CatalogTableType

// Identify the table in question
val identifier = TableIdentifier("potato", Some("default"))

// Get its current metadata
val tableMetadata = catalog.getTableMetadata(identifier)

// Clone the metadata while changing the tableType to MANAGED
val alteredMetadata = tableMetadata.copy(tableType = CatalogTableType.MANAGED)

// Alter the table using the new metadata
catalog.alterTable(alteredMetadata)

// Now drop!
catalog.dropPartitions("default", "potato", partitions,
                   ignoreIfNotExists=true, purge=true, retainData=false)
spark.table("potato").count // 0 records
catalog.listPartitions("default", "potato").length // 0 partitions
spark.sql("MSCK REPAIR TABLE potato") // Won't add anything
catalog.listPartitions("default", "potato").length // Still 0 partitions!
 类似资料:
  • 如前所述,我正在MySQL中进行并练习一个分区数据。由于我想清除所有的数据,数据仍然在那里,不能删除一次。 这是我创建的分区表。 我很好奇我的MySQL软件有问题吗?还是分区表导致了这种情况?因为我有一个没有分区的备份表,所以可以很好地使用Truncate,Delete和Select脚本。

  • 插入覆盖表myTable分区(字段)从myTable中选择*,其中机器='xxxxx' 但是SELECT中的数据不会替换MyTable中的数据。

  • 我正在使用Spark2.0,我想知道,是否可以列出特定配置单元表的所有文件?如果是这样,我可以直接使用spark增量地更新这些文件。如何向配置单元表添加新分区?有没有关于蜂巢转移瘤的api我可以从Spark使用? 有什么方法可以获得映射dataframe的内部配置单元函数吗 我的主要理由是对表进行增量更新。现在,我知道的唯一方法是SQL+,这不是很有效,因为他将覆盖所有表,而我主要感兴趣的是对某些

  • 问题内容: 重建之前,我通常需要从PostgreSQL数据库中删除所有数据。我将如何直接在SQL中执行此操作? 目前,我设法提出了一条SQL语句,该语句返回我需要执行的所有命令: 但是,一旦有了它们,我就看不到以编程方式执行它们的方法。 问题答案: FrustratedWithFormsDesigner是正确的,PL / pgSQL可以做到这一点。这是脚本: 这将创建一个存储的函数(您只需要执行一

  • 我正在使用Spark SQL读取一个配置单元表,并将其分配给一个scala val 有什么方法可以绕过这个错误吗?我需要将记录插入到同一个表中。 嗨,我试着按建议做,但仍然得到同样的错误。

  • 我有一个配置单元表,其中一个date列上存在分区,但date列以YYYYMMDD格式存储为INT。该表还可以包含未来日期分区的数据。 现在,作为过程的一部分,我希望删除那些在处理日(位于处理日)上运行的分区。 当我编写drop分区时,比如,那么它工作正常。 考虑到我的输入将是唯一的日期格式YYYY-MM-DD和我已经删除所有分区已给出输入日期-1;如何使上述陈述奏效?