Delta Lake是一个可靠的开源存储层,它提供ACID事务,可伸缩的元数据处理,并支持流/批统一。Delta Lake可以运行在现有数据湖之上,并完全和Apache Spark APIs兼容
Delta Lake 具体提供如下特性:
Spark上的ACID事务:可序列化的隔离级别确保Reader永远看不到不一致的数据;
可扩展的元数据处理:利用Spark的分布式处理能力,可以轻松处理数十亿个文件的PB级表的所有元数据;
流/批统一:Delta Lake中的表既是批处理表,又是流的源或接收器。流数据提取,批处理历史回填,交互式查询都可以直接使用;
强模式性:自动处理模式变化,防止在摄取过程中插入不良记录;
时间旅行:数据版本控制支持回滚,完整的历史审核跟踪以及可重复的机器学习实验;
Upserts和Deletes:支持合并,更新和删除操作,以启用复杂的用例,例如更改数据捕获,缓慢变化尺寸(SCD)操作,流化Upserts等。
本指南可帮助您快速探索三角洲湖的主要特征。它提供了代码片段,显示了如何从交互式,批处理和流查询中读取和写入Delta表。
Delta Lake要求Apache Spark在2.4.2以上。按照以下说明设置Spark使用Delta Lake。你可以通过如下两种方式在本地计算机上运行本文档中的步骤。
交互式运行:启动带Delta Lake的Spark Shell(Scala或者Python),在Shell中交互式运行代码片段;
以项目运行:创建一个支持Delta Lake 的 Maven或者SBT(Scala或者Java)项目,拷贝代码片段到源文件中,并运行项目。
要在Spark Shell中交互使用Delta Lake,需要在本地安装Apache Spark,根据是使用Python还是Scala可以分别启动PySpark或SparkShell。
如果你需要安装或者更新PySpark,运行如下:
pip install --upgrade pyspark
运行带DeltaLake的PySpark
pyspark --packages io.delta:delta-core_2.11:0.5.0
通过下载下载最新版本的Apache Spark(2.4.2以上版本),使用pip或者下载并解压归档文件,然后解压目录中运行spark-shell。
运行带Delta Lake包的spark-shell
$SPARK_HOME/bin/spark-shell --packages io.delta:delta-core_2.11:0.5.0
注:如果看到以下错误,请确保Apache Spark和delta-core是为相同的Scala版本(2.11或2.12)构建的。下载页面中的Apache Spark-2.4.3的预构建发行版是使用Scala-2.11构建的:
java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.delta.sources.DeltaDataSource could not be instantiated
更多信息查看issue
如果要使用Maven中央存储库中的Delta Lake二进制文件构建项目,则可以使用以下Maven坐标。
通过将其作为依赖项添加到POM文件中,将Delta Lake包含在Maven项目中。Delta Lake与Scala 2.11和2.12版本交叉编译;选择与您的项目匹配的版本。如果您正在编写Java项目,则可以使用任何一个版本。
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.11</artifactId>
<version>0.5.0</version>
</dependency>
通过将以下行添加到build.sbt文件中,将Delta Lake包括在SBT项目中:
libraryDependencies += "io.delta" %% "delta-core" % "0.5.0"
将DataFrame按delta格式写出到磁盘上来创建一个Delta表,你可以使用已有的SparkSQL代码,将原来的parquet、csv、json等格式更换成delta即可。
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
val data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
SparkSession spark = ... // create SparkSession
Dataset<Row> data = data = spark.range(0, 5);
data.write().format("delta").save("/tmp/delta-table");
这些选项根据DataFrame推断出schema来创建一个新的Delta表。有关创建新Delta表时可用的全部选项的信息,请参见创建表和写数据到表。
注:此快速入门将本地路径用于Delta表位置。有关为增量表配置HDFS或云存储的信息,请参阅增量存储。
您可以通过指定文件的路径来读取Delta表中的数据,例如"/tmp/delta-table":
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
val df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
Dataset<Row> df = spark.read().format("delta").load("/tmp/delta-table");
df.show();
Delta Lake支持几种使用标准DataFrame API修改表的操作。本示例运行批处理以覆盖表中的数据:
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
df.show()
Dataset<Row> data = data = spark.range(5, 10);
data.write().format("delta").mode("overwrite").save("/tmp/delta-table");
如果重新读取表数据,因为数据已经本覆盖,所以只能读取到5-9。
Delta Lake提供了编程API,可以有条件地将数据更新,删除和合并(向上插入)到表中。这里有一些例子。
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")
# Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })
# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))
# Upsert (merge) new data
newData = spark.range(0, 20)
deltaTable.alias("oldData") \
.merge(
newData.alias("newData"),
"oldData.id = newData.id") \
.whenMatchedUpdate(set = { "id": col("newData.id") }) \
.whenNotMatchedInsert(values = { "id": col("newData.id") }) \
.execute()
deltaTable.toDF().show()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forPath("/tmp/delta-table")
// Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = Map("id" -> expr("id + 100")))
// Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))
// Upsert (merge) new data
val newData = spark.range(0, 20).toDF
deltaTable.as("oldData")
.merge(
newData.as("newData"),
"oldData.id = newData.id")
.whenMatched
.update(Map("id" -> col("newData.id")))
.whenNotMatched
.insert(Map("id" -> col("newData.id")))
.execute()
deltaTable.toDF.show()
Java
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath("/tmp/delta-table");
// Update every even value by adding 100 to it
deltaTable.update(
functions.expr("id % 2 == 0"),
new HashMap<String, Column>() {{
put("id", functions.expr("id + 100"));
}}
);
// Delete every even value
deltaTable.delete(condition = functions.expr("id % 2 == 0"));
// Upsert (merge) new data
Dataset<Row> newData = spark.range(0, 20).toDF();
deltaTable.as("oldData")
.merge(
newData.as("newData"),
"oldData.id = newData.id")
.whenMatched()
.update(
new HashMap<String, Column>() {{
put("id", functions.col("newData.id"));
}})
.whenNotMatched()
.insertExpr(
new HashMap<String, Column>() {{
put("id", functions.col("newData.id"));
}})
.execute();
deltaTable.toDF().show();
您应该看到一些现有行已更新,并且已插入新行。
有关这些操作的更多信息,请参见表删除,更新和合并。
您可以使用称为时间旅行的功能查询Delta表的先前快照。如果要访问覆盖的数据,则可以使用versionAsOf选项查询表的快照,然后覆盖第一组数据。
Python
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()
Scala
val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()
Java
Dataset<Row> df = spark.read().format("delta").option("versionAsOf", 0).load("/tmp/delta-table");
df.show();
你可以查看覆盖之前的数据。时间旅行是一项非常强大的功能,它利用Delta Lake事务日志的功能来访问表中不再存在的数据。删除版本0选项(或指定版本1)将使您再次看到较新的数据。有关更多信息,请参阅查询表的旧快照(时间旅行)。
您也可以使用Structured Streaming写入Delta表。即使有其他流或批查询同时针对表运行,Delta Lake事务日志也可以保证一次处理。默认情况下,流以追加模式运行,这会将新记录添加到表中:
Python
streamingDf = spark.readStream.format("rate").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")
Scala
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")
Java
import org.apache.spark.sql.streaming.StreamingQuery;
Dataset<Row> streamingDf = spark.readStream().format("rate").load();
StreamingQuery stream = streamingDf.selectExpr("value as id").writeStream().format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table");
流运行时,您可以使用较早的命令读取表。
注:如果您在shell中运行此命令,则可能会看到流任务的进度,这使得在该shell中键入命令变得困难。在新终端中启动另一个Shell以查询表可能会很有用。
您可以通过stream.stop()在启动流的同一终端上运行来停止流。
有关Delta Lake与结构化流集成的更多信息,请参阅表流读取和写入。
在将流写入Delta表时,您还可以从该表中读取流作为源。例如,您可以启动另一个流查询,打印出对Delta表所做的所有更改。
Python
stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()
Scala
val stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()
Java
StreamingQuery stream2 = spark.readStream().format("delta").load("/tmp/delta-table").writeStream().format("console").start();
Delta Lake支持Apache Spark DataFrame读写API提供的大多数选项,用于在表上执行批量读写。
使用DataFrameWriter(Scala、Java或Python)作为原子操作将数据写入Delta Lake。至少必须指定格式delta:
df.write.format("delta").save("/delta/events")
您可以对数据进行分区以加快查询或具有涉及分区列的谓词的DML。要在创建增量表时对数据进行分区,请按列指定分区。常见的模式是按日期分区,例如:
scala
df.write.format("delta").partitionBy("date").save("/delta/events")
您可以通过指定路径将Delta表作为DataFrame加载:
Scala
spark.read.format("delta").load("/delta/events")
Delta Lake时间旅行允许您查询Delta表的旧快照。时间旅行有许多用例,包括:
重新创建分析,报告或输出(例如,机器学习模型的输出)。这对于调试或审核尤其有用,特别是在受管制的行业中。
编写复杂的时间查询。
修正数据中的错误。
为快速更改表的一组查询提供快照隔离。
本节介绍了查询表的较旧版本时所支持的方法,数据保留问题并提供了示例。
有几种查询旧版Delta表的方法。
DataFrameReader选项允许您从固定到表的特定版本的Delta表创建DataFrame。
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")
对于timestamp_string,仅接受日期或时间戳记字符串。例如"2019-01-01"和"2019-01-01’T’00:00:00.000Z"。
一种常见的模式是在执行Databricks作业期间使用Delta表的最新状态来更新下游应用程序。
使用append模式,您可以将新数据原子添加到现有的Delta表中:
df.write.format("delta").mode("append").save("/delta/events")
要自动替换表中的所有数据,可以使用overwrite模式:
df.write.format("delta").mode("overwrite").save("/delta/events")
您可以有选择地仅覆盖分区列上与谓词匹配的数据。以下命令用中的数据原子替换一月df:
df.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
.save("/delta/events")
此示例代码在中写入数据df,验证所有数据均位于指定分区内,并执行原子替换。
注:与Apache Spark中的文件API不同,Delta Lake会记住并强制执行表的Schema。这意味着默认情况下,覆盖不会替换现有表的Schema。
有关Delta Lake支持更新表的信息,请参阅更新表。
Delta Lake自动验证正在写入的DataFrame的Schema与表的Schema兼容。Delta Lake使用以下规则来确定从DataFrame到表的写入是否兼容:
所有DataFrame列都必须存在于目标表中。如果表中不存在DataFrame中的列,则会引发异常。表中存在但DataFrame中不存在的列设置为null。
DataFrame列数据类型必须与目标表中的列数据类型匹配。如果它们不匹配,则会引发异常。
DataFrame列名称只能大小写不同。这意味着您不能在同一表中定义诸如“ Foo”和“ foo”之类的列。虽然可以在区分大小写或不区分大小写(默认)模式下使用Spark,但在存储和返回列信息时,Parquet区分大小写。Delta Lake保留大小写,但在存储Schema时不敏感,并且具有此限制以避免潜在的错误,数据损坏或丢失问题。
如果您指定其他选项(例如partitionBy与附加模式结合使用),则Delta Lake会验证它们是否匹配,并为任何不匹配项引发错误。如果partitionBy不存在,则追加将自动跟随现有数据的分区。
Delta Lake可以作为DML事务的一部分(附加或覆盖)自动更新表的Schema,并使该schema与正在写入的数据兼容。
在以下情况下,DataFrame中存在但表中缺失的列将作为写事务的一部分自动添加:
添加的列将追加到它们所在的结构的末尾。追加新列时将保留大小写。
由于Parquet不支持NullType,NullType因此在写入Delta表时会将列从DataFrame中删除,但仍存储在Schema中。当为该列接收到不同的数据类型时,Delta Lake会将Schema合并到新的数据类型。如果Delta Lake收到NullType现有列的,则在写入过程中将保留旧模式,并删除新列。
NullType不支持流式传输。由于必须在使用流式传输时设置模式,因此这种情况很少见。NullType也不适用于诸如ArrayType和的复杂类型MapType。
默认情况下,覆盖表中的数据不会覆盖schema。当覆盖使用表mode(“overwrite”)没有replaceWhere,你可能仍然要覆盖写入的数据的schema。通过将overwriteSchema选项设置为true,可以替换表的schema和分区:
df.write.option("overwriteSchema", "true")
Delta Lake支持在Delta表之上创建视图,就像使用数据源表一样。
使用视图进行操作时的核心挑战是解决模式。如果更改Delta表Schema,则必须重新创建派生视图以说明对该Schema的任何添加。例如,如果将新列添加到Delta表中,则必须确保该列在该基表顶部构建的适当视图中可用。
Delta Lake与Spark Structured Streaming 通过readStream和writeStream深度集成。Delta Lake克服了常见的与流系统和文件相关的许多限制,包括:
保持多个流(或并发批处理作业)的“仅一次”处理
使用文件作为流的源时有效地发现哪些文件是新文件
当您将Delta表加载为流源并在流查询中使用它时,该查询将处理表中存在的所有数据以及流启动后到达的所有新数据。
spark.readStream.format("delta").load("/delta/events")
您还可以通过设置maxFilesPerTrigger选项来控制Delta Lake提供给流的任何微批处理的最大大小。这指定了每个触发器中关联的新文件最大数量。默认值为1000。
Structured Streaming 只处理追加的输入,并且如果对用作源的表进行了任何修改,则抛出异常。有两种主要的处理无法自动向下游传播的更改的策略:
由于默认情况下Delta表保留所有历史记录,因此在许多情况下,您可以删除输出和检查点并从头开始重新启动流。
您可以设置以下两个选项之一:
ignoreDeletes忽略在分区边界删除数据的事务。例如,如果您的源表按日期进行了分区,并且删除了30天之前的数据,则该删除将不会传播到下游,但是流可以继续运行。
ignoreChanges 如果文件在被诸如更新、合并写入、删除分区或者覆盖时会被重新写入到源文件,从而导致重新更新。不变的行可能仍会发出,因此您的下游使用者应该能够处理重复项。删除不会传播到下游。ignoreChanges包括ignoreDeletes,因此,如果您使用ignoreChanges,则流将不会被源表的删除或更新中断。
假如你有一个表user_events,字段包括date、user_email和action,按date进行了分区。您从user_events表中提取数据,但是由于GDPR的关系,您需要从中删除一些数据。
events.readStream
.format("delta")
.option("ignoreDeletes", "true")
.load("/delta/user_events")
然而,如果你必须基于user_email删除数据,那么你需要使用:
events.readStream
.format("delta")
.option("ignoreChanges", "true")
.load("/delta/user_events")
如果你使用UPDATE语句更新user_email,则包含user_email的文件都会被重写。如果使用了ignoreChanges,更新的记录和文件中未被修改的记录会被一并传播到下游。你的逻辑应该要能够处理这些传入重复的记录。
您也可以使用结构化流将数据写入Delta表。事务日志使Delta Lake能够保证一次处理,即使针对该表同时运行其他流或批查询。
默认情况下,流以追加模式运行,这会将新记录添加到表中。
events.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
.start("/delta/events") // as a path
您还可以使用Structured Streaming在每个批次替换整个表。示例使用聚合来计算摘要:
spark.readStream
.format("delta")
.load("/delta/events")
.groupBy("customerId")
.count()
.writeStream
.format("delta")
.outputMode("complete")
.option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg")
.start("/delta/eventsByCustomer")
前面的示例不断更新包含客户事件总数的表。
对于延迟要求更宽松的应用程序,您可以使用一次性触发器来节省计算资源。使用这些更新按给定的时间表更新汇总聚合表,仅处理自上次更新以来已到达的新数据。