当前位置: 首页 > 工具软件 > Delta Lake > 使用案例 >

Delta Lake

轩辕乐邦
2023-12-01

Delta Lake 简介

Delta Lake是一个可靠的开源存储层,它提供ACID事务,可伸缩的元数据处理,并支持流/批统一。Delta Lake可以运行在现有数据湖之上,并完全和Apache Spark APIs兼容

Delta Lake 具体提供如下特性:

  • Spark上的ACID事务:可序列化的隔离级别确保Reader永远看不到不一致的数据;

  • 可扩展的元数据处理:利用Spark的分布式处理能力,可以轻松处理数十亿个文件的PB级表的所有元数据;

  • 流/批统一:Delta Lake中的表既是批处理表,又是流的源或接收器。流数据提取,批处理历史回填,交互式查询都可以直接使用;

  • 强模式性:自动处理模式变化,防止在摄取过程中插入不良记录;

  • 时间旅行:数据版本控制支持回滚,完整的历史审核跟踪以及可重复的机器学习实验;

  • Upserts和Deletes:支持合并,更新和删除操作,以启用复杂的用例,例如更改数据捕获,缓慢变化尺寸(SCD)操作,流化Upserts等。

Delta Lake 快速开始

本指南可帮助您快速探索三角洲湖的主要特征。它提供了代码片段,显示了如何从交互式,批处理和流查询中读取和写入Delta表。

设置Apache Spark支持Delta Lake

Delta Lake要求Apache Spark在2.4.2以上。按照以下说明设置Spark使用Delta Lake。你可以通过如下两种方式在本地计算机上运行本文档中的步骤。

  1. 交互式运行:启动带Delta Lake的Spark Shell(Scala或者Python),在Shell中交互式运行代码片段;

  2. 以项目运行:创建一个支持Delta Lake 的 Maven或者SBT(Scala或者Java)项目,拷贝代码片段到源文件中,并运行项目。

启动交互式Shell

要在Spark Shell中交互使用Delta Lake,需要在本地安装Apache Spark,根据是使用Python还是Scala可以分别启动PySpark或SparkShell。

PySpark

如果你需要安装或者更新PySpark,运行如下:

pip install --upgrade pyspark

运行带DeltaLake的PySpark

pyspark --packages io.delta:delta-core_2.11:0.5.0
Spark Scala Shell

通过下载下载最新版本的Apache Spark(2.4.2以上版本),使用pip或者下载并解压归档文件,然后解压目录中运行spark-shell。

运行带Delta Lake包的spark-shell

$SPARK_HOME/bin/spark-shell --packages io.delta:delta-core_2.11:0.5.0

注:如果看到以下错误,请确保Apache Spark和delta-core是为相同的Scala版本(2.11或2.12)构建的。下载页面中的Apache Spark-2.4.3的预构建发行版是使用Scala-2.11构建的:

java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.delta.sources.DeltaDataSource could not be instantiated

更多信息查看issue

项目启动

如果要使用Maven中央存储库中的Delta Lake二进制文件构建项目,则可以使用以下Maven坐标。

Maven

通过将其作为依赖项添加到POM文件中,将Delta Lake包含在Maven项目中。Delta Lake与Scala 2.11和2.12版本交叉编译;选择与您的项目匹配的版本。如果您正在编写Java项目,则可以使用任何一个版本。

<dependency>
  <groupId>io.delta</groupId>
  <artifactId>delta-core_2.11</artifactId>
  <version>0.5.0</version>
</dependency>
SBT

通过将以下行添加到build.sbt文件中,将Delta Lake包括在SBT项目中:

libraryDependencies += "io.delta" %% "delta-core" % "0.5.0"

创建表

将DataFrame按delta格式写出到磁盘上来创建一个Delta表,你可以使用已有的SparkSQL代码,将原来的parquet、csv、json等格式更换成delta即可。

  • Python
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
  • scala
val data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
  • java
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

SparkSession spark = ...   // create SparkSession

Dataset<Row> data = data = spark.range(0, 5);
data.write().format("delta").save("/tmp/delta-table");

这些选项根据DataFrame推断出schema来创建一个新的Delta表。有关创建新Delta表时可用的全部选项的信息,请参见创建表写数据到表

注:此快速入门将本地路径用于Delta表位置。有关为增量表配置HDFS或云存储的信息,请参阅增量存储。

读取数据

您可以通过指定文件的路径来读取Delta表中的数据,例如"/tmp/delta-table":

  • Python
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
  • scala
val df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
  • java
Dataset<Row> df = spark.read().format("delta").load("/tmp/delta-table");
df.show();

更新表数据

Delta Lake支持几种使用标准DataFrame API修改表的操作。本示例运行批处理以覆盖表中的数据:

覆盖写

  • Python
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
  • scala
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
df.show()
  • java
Dataset<Row> data = data = spark.range(5, 10);
data.write().format("delta").mode("overwrite").save("/tmp/delta-table");

如果重新读取表数据,因为数据已经本覆盖,所以只能读取到5-9。

有条件的更新数据

Delta Lake提供了编程API,可以有条件地将数据更新,删除和合并(向上插入)到表中。这里有一些例子。

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

# Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })

# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

# Upsert (merge) new data
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath("/tmp/delta-table")

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))

// Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

// Upsert (merge) new data
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData")
  .merge(
    newData.as("newData"),
    "oldData.id = newData.id")
  .whenMatched
  .update(Map("id" -> col("newData.id")))
  .whenNotMatched
  .insert(Map("id" -> col("newData.id")))
  .execute()

deltaTable.toDF.show()

Java

import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

DeltaTable deltaTable = DeltaTable.forPath("/tmp/delta-table");

// Update every even value by adding 100 to it
deltaTable.update(
  functions.expr("id % 2 == 0"),
  new HashMap<String, Column>() {{
    put("id", functions.expr("id + 100"));
  }}
);

// Delete every even value
deltaTable.delete(condition = functions.expr("id % 2 == 0"));

// Upsert (merge) new data
Dataset<Row> newData = spark.range(0, 20).toDF();

deltaTable.as("oldData")
  .merge(
    newData.as("newData"),
    "oldData.id = newData.id")
  .whenMatched()
  .update(
    new HashMap<String, Column>() {{
      put("id", functions.col("newData.id"));
    }})
  .whenNotMatched()
  .insertExpr(
    new HashMap<String, Column>() {{
      put("id", functions.col("newData.id"));
    }})
  .execute();

deltaTable.toDF().show();

您应该看到一些现有行已更新,并且已插入新行。

有关这些操作的更多信息,请参见表删除,更新和合并

时间旅行实现读取历史版本的数据

您可以使用称为时间旅行的功能查询Delta表的先前快照。如果要访问覆盖的数据,则可以使用versionAsOf选项查询表的快照,然后覆盖第一组数据。

Python

df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()

Scala

val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()

Java

Dataset<Row> df = spark.read().format("delta").option("versionAsOf", 0).load("/tmp/delta-table");
df.show();

你可以查看覆盖之前的数据。时间旅行是一项非常强大的功能,它利用Delta Lake事务日志的功能来访问表中不再存在的数据。删除版本0选项(或指定版本1)将使您再次看到较新的数据。有关更多信息,请参阅查询表的旧快照(时间旅行)

将Stream写入表

您也可以使用Structured Streaming写入Delta表。即使有其他流或批查询同时针对表运行,Delta Lake事务日志也可以保证一次处理。默认情况下,流以追加模式运行,这会将新记录添加到表中:

Python

streamingDf = spark.readStream.format("rate").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")

Scala

val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")

Java

import org.apache.spark.sql.streaming.StreamingQuery;

Dataset<Row> streamingDf = spark.readStream().format("rate").load();
StreamingQuery stream = streamingDf.selectExpr("value as id").writeStream().format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table");

流运行时,您可以使用较早的命令读取表。

注:如果您在shell中运行此命令,则可能会看到流任务的进度,这使得在该shell中键入命令变得困难。在新终端中启动另一个Shell以查询表可能会很有用。

您可以通过stream.stop()在启动流的同一终端上运行来停止流。

有关Delta Lake与结构化流集成的更多信息,请参阅表流读取和写入

从表中读取更改的Stream

在将流写入Delta表时,您还可以从该表中读取流作为源。例如,您可以启动另一个流查询,打印出对Delta表所做的所有更改。

Python

stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()

Scala

val stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()

Java

StreamingQuery stream2 = spark.readStream().format("delta").load("/tmp/delta-table").writeStream().format("console").start();

批程序读取和写入表

Delta Lake支持Apache Spark DataFrame读写API提供的大多数选项,用于在表上执行批量读写。

创建表

使用DataFrameWriter(Scala、Java或Python)作为原子操作将数据写入Delta Lake。至少必须指定格式delta:

df.write.format("delta").save("/delta/events")

分区数据

您可以对数据进行分区以加快查询或具有涉及分区列的谓词的DML。要在创建增量表时对数据进行分区,请按列指定分区。常见的模式是按日期分区,例如:

scala

df.write.format("delta").partitionBy("date").save("/delta/events")

从表中读取数据

您可以通过指定路径将Delta表作为DataFrame加载:

Scala

spark.read.format("delta").load("/delta/events")

查询表的旧快照(时间旅行)

Delta Lake时间旅行允许您查询Delta表的旧快照。时间旅行有许多用例,包括:

  • 重新创建分析,报告或输出(例如,机器学习模型的输出)。这对于调试或审核尤其有用,特别是在受管制的行业中。

  • 编写复杂的时间查询。

  • 修正数据中的错误。

  • 为快速更改表的一组查询提供快照隔离。

本节介绍了查询表的较旧版本时所支持的方法,数据保留问题并提供了示例。

语法

有几种查询旧版Delta表的方法。

  • DataFrameReader选项

DataFrameReader选项允许您从固定到表的特定版本的Delta表创建DataFrame。

df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")

对于timestamp_string,仅接受日期或时间戳记字符串。例如"2019-01-01"和"2019-01-01’T’00:00:00.000Z"。

一种常见的模式是在执行Databricks作业期间使用Delta表的最新状态来更新下游应用程序。

写入表

使用数据框追加

使用append模式,您可以将新数据原子添加到现有的Delta表中:

df.write.format("delta").mode("append").save("/delta/events")

使用DataFrames覆盖

要自动替换表中的所有数据,可以使用overwrite模式:

df.write.format("delta").mode("overwrite").save("/delta/events")

您可以有选择地仅覆盖分区列上与谓词匹配的数据。以下命令用中的数据原子替换一月df:

df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
  .save("/delta/events")

此示例代码在中写入数据df,验证所有数据均位于指定分区内,并执行原子替换。

注:与Apache Spark中的文件API不同,Delta Lake会记住并强制执行表的Schema。这意味着默认情况下,覆盖不会替换现有表的Schema。

有关Delta Lake支持更新表的信息,请参阅更新表

Schema 验证

Delta Lake自动验证正在写入的DataFrame的Schema与表的Schema兼容。Delta Lake使用以下规则来确定从DataFrame到表的写入是否兼容:

  • 所有DataFrame列都必须存在于目标表中。如果表中不存在DataFrame中的列,则会引发异常。表中存在但DataFrame中不存在的列设置为null。

  • DataFrame列数据类型必须与目标表中的列数据类型匹配。如果它们不匹配,则会引发异常。

  • DataFrame列名称只能大小写不同。这意味着您不能在同一表中定义诸如“ Foo”和“ foo”之类的列。虽然可以在区分大小写或不区分大小写(默认)模式下使用Spark,但在存储和返回列信息时,Parquet区分大小写。Delta Lake保留大小写,但在存储Schema时不敏感,并且具有此限制以避免潜在的错误,数据损坏或丢失问题。

如果您指定其他选项(例如partitionBy与附加模式结合使用),则Delta Lake会验证它们是否匹配,并为任何不匹配项引发错误。如果partitionBy不存在,则追加将自动跟随现有数据的分区。

自动更新 schema

Delta Lake可以作为DML事务的一部分(附加或覆盖)自动更新表的Schema,并使该schema与正在写入的数据兼容。

添加列

在以下情况下,DataFrame中存在但表中缺失的列将作为写事务的一部分自动添加:

  • write或writeStream有.option(“mergeSchema”, “true”)

添加的列将追加到它们所在的结构的末尾。追加新列时将保留大小写。

NullType列

由于Parquet不支持NullType,NullType因此在写入Delta表时会将列从DataFrame中删除,但仍存储在Schema中。当为该列接收到不同的数据类型时,Delta Lake会将Schema合并到新的数据类型。如果Delta Lake收到NullType现有列的,则在写入过程中将保留旧模式,并删除新列。

NullType不支持流式传输。由于必须在使用流式传输时设置模式,因此这种情况很少见。NullType也不适用于诸如ArrayType和的复杂类型MapType。

替代表 Schema

默认情况下,覆盖表中的数据不会覆盖schema。当覆盖使用表mode(“overwrite”)没有replaceWhere,你可能仍然要覆盖写入的数据的schema。通过将overwriteSchema选项设置为true,可以替换表的schema和分区:

df.write.option("overwriteSchema", "true")

表的视图

Delta Lake支持在Delta表之上创建视图,就像使用数据源表一样。

使用视图进行操作时的核心挑战是解决模式。如果更改Delta表Schema,则必须重新创建派生视图以说明对该Schema的任何添加。例如,如果将新列添加到Delta表中,则必须确保该列在该基表顶部构建的适当视图中可用。

流程序读取和写入表

Delta Lake与Spark Structured Streaming 通过readStream和writeStream深度集成。Delta Lake克服了常见的与流系统和文件相关的许多限制,包括:

  • 保持多个流(或并发批处理作业)的“仅一次”处理

  • 使用文件作为流的源时有效地发现哪些文件是新文件

Delta表作为Stream源

当您将Delta表加载为流源并在流查询中使用它时,该查询将处理表中存在的所有数据以及流启动后到达的所有新数据。

spark.readStream.format("delta").load("/delta/events")

您还可以通过设置maxFilesPerTrigger选项来控制Delta Lake提供给流的任何微批处理的最大大小。这指定了每个触发器中关联的新文件最大数量。默认值为1000。

忽略更新和错误

Structured Streaming 只处理追加的输入,并且如果对用作源的表进行了任何修改,则抛出异常。有两种主要的处理无法自动向下游传播的更改的策略:

  • 由于默认情况下Delta表保留所有历史记录,因此在许多情况下,您可以删除输出和检查点并从头开始重新启动流。

  • 您可以设置以下两个选项之一:

    • ignoreDeletes忽略在分区边界删除数据的事务。例如,如果您的源表按日期进行了分区,并且删除了30天之前的数据,则该删除将不会传播到下游,但是流可以继续运行。

    • ignoreChanges 如果文件在被诸如更新、合并写入、删除分区或者覆盖时会被重新写入到源文件,从而导致重新更新。不变的行可能仍会发出,因此您的下游使用者应该能够处理重复项。删除不会传播到下游。ignoreChanges包括ignoreDeletes,因此,如果您使用ignoreChanges,则流将不会被源表的删除或更新中断。

例子

假如你有一个表user_events,字段包括date、user_email和action,按date进行了分区。您从user_events表中提取数据,但是由于GDPR的关系,您需要从中删除一些数据。

events.readStream
  .format("delta")
  .option("ignoreDeletes", "true")
  .load("/delta/user_events")

然而,如果你必须基于user_email删除数据,那么你需要使用:

events.readStream
  .format("delta")
  .option("ignoreChanges", "true")
  .load("/delta/user_events")

如果你使用UPDATE语句更新user_email,则包含user_email的文件都会被重写。如果使用了ignoreChanges,更新的记录和文件中未被修改的记录会被一并传播到下游。你的逻辑应该要能够处理这些传入重复的记录。

将Delta表作为一个Sink源

您也可以使用结构化流将数据写入Delta表。事务日志使Delta Lake能够保证一次处理,即使针对该表同时运行其他流或批查询。

追加模式

默认情况下,流以追加模式运行,这会将新记录添加到表中。

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .start("/delta/events") // as a path

完全模式

您还可以使用Structured Streaming在每个批次替换整个表。示例使用聚合来计算摘要:

spark.readStream
  .format("delta")
  .load("/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg")
  .start("/delta/eventsByCustomer")

前面的示例不断更新包含客户事件总数的表。

对于延迟要求更宽松的应用程序,您可以使用一次性触发器来节省计算资源。使用这些更新按给定的时间表更新汇总聚合表,仅处理自上次更新以来已到达的新数据。

 类似资料:

相关阅读

相关文章

相关问答