简介: 本文介绍Delta的元数据管理相关内容,包括文件夹结构,元数据类型以及元数据产生流程等
作者:宋军,花名嵩林,阿里云EMR技术专家。从事Spark内核优化,对SparkCore/SprakSQL有深入了解,Spark Contributor
Delta元数据解析
元数据初识
Delta有自己的元数据管理,主要有6种类型的元数据Action:
SetTransaction
AddFile
RemoveFile
Metadata
Protocol
CommitInfo
Delta的元数据统一存放在Delta的logpath下面的_delta_log文件夹中
_delta_log文件夹位置
不管DeltaTable是分区表还是非分区表,_delta_log文件夹只有一个,都位于Delta的logpath下面
delta1.png
_delta_log文件夹内容
_delta_log文件夹下存储了所有Delta的相关元数据,如下所示
Delta每次事务commit都会产生一个json的元数据文件,文件内容包括本次commit做的所有action,比如AddFile/RemoveFile等等;
每产生一个新的json文件就会产生一个新的Delta的snapshot,snapshot的版本即该json文件中的数字,该数字必须是连续自增(不能缺失),Delta的某个版本的snapshot是通过顺序回放所有小于等于该snapshot版本号的所有json文件得到;
每个json文件会有一个对应的crc校验文件(源码中有相关代码,但是并没有实际去写该crc)
对元数据做checkpoint时会产生新的checkpoint文件(parquet)
如下FileNames类用来管理_delta_log文件夹下相关文件的文件名:屏幕快照 2019-11-29 下午2.45.02.png
如下_delta_log文件示例:
屏幕快照 2019-11-29 上午11.46.07.png
_delog_log文件内容
json文件
屏幕快照 2019-11-29 下午2.50.12.png
checkpoint parquet文件:
parquet文件内容
屏幕快照 2019-11-29 下午2.58.31.png
元数据解析
Actions
屏幕快照 2019-11-29 下午5.00.02.png
CommitInfo
Holds provenance information about changes to the table. This [[Action]] is not stored in the checkpoint and has reduced compatibility guarantees. Information stored in it is best effort (i.e. can be falsified by the writer).
如:
{“commitInfo”:{“timestamp”:1574836330000,“operation”:“STREAMING UPDATE”,“operationParameters”:{“outputMode”:“Append”,“queryId”:“f5ef8a90-069a-4311-bd0f-4f0c93d89cfe”,“epochId”:“0”},“isBlindAppend”:true}}
{“commitInfo”:{“timestamp”:1574824794574,“operation”:“WRITE”,“operationParameters”:{“mode”:“Overwrite”,“partitionBy”:"[“date”,“city”]"},“isBlindAppend”:false}}
每次commit一个json文件都会有一个CommitInfo,记录当前commit对Delta表的修改行为,比如示例中的operation类型有STREAMING UPDATE / WRITE等等
Protocol
Used to block older clients from reading or writing the log when backwards incompatible changes are made to the protocol. Readers and writers are responsible for checking that they meet the minimum versions before performing any other operations.Since this action allows us to explicitly block older clients in the case of a breaking change to the protocol, clients should be tolerant of messages and fields that they do not understand.
如:
{“protocol”:{“minReaderVersion”:1,“minWriterVersion”:2}}
用来做Delta本身版本兼容性检查,第一个json文件00000000000000000000.json里面会有该信息,除非调用updateProtocol接口会产生一个新的Protocol
Metadata
Updates the metadata of the table. Only the last update to the [[Metadata]] of a table is kept. It is the responsibility of the writer to ensure that any data already present in the table is still valid after any change.
如:
{“metaData”:{“id”:“c233ce0c-dd80-44f0-a1d2-a9404adef07e”,“format”:{“provider”:“parquet”,“options”:{}},“schemaString”:"{“type”:“struct”,“fields”:[{“name”:“id”,“type”:“long”,“nullable”:true,“metadata”:{}},{“name”:“date”,“type”:“date”,“nullable”:true,“metadata”:{}},{“name”:“name”,“type”:“string”,“nullable”:true,“metadata”:{}},{“name”:“sales”,“type”:“string”,“nullable”:true,“metadata”:{}}]}",“partitionColumns”:[],“configuration”:{},“createdTime”:1574836328664}}
Delta表的schema相关信息,Delta支持schema的演化,所以如果对schema进行修改会产生新的Metadata,当生成某个版本的snapshot进行多个json文件顺序回放时,最终snapshot只会保留最新的Metadata,即以最新的Metadata中的schema为准。
Schema演化规则详见文档(https://databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html)
AddFile
Adds a new file to the table. When multiple [[AddFile]] file actions are seen with the same path only the metadata from the last one is kept.
如:
{“add”:{“path”:“part-00000-c8719ced-3879-4037-b0af-d62c52224af0-c000.snappy.parquet”,“partitionValues”:{},“size”:5906,“modificationTime”:1574836329955,“dataChange”:true}}
分区文件
{“add”:{“path”:“date=20190710/city=bj/part-00000-ef2cca38-7d20-4eaf-a291-81f71fc9d0b5.c000.snappy.parquet”,“partitionValues”:{“date”:“20190710”,“city”:“bj”},“size”:583,“modificationTime”:1574954016825,“dataChange”:true}}
记录在Delta中新增的文件
RemoveFile
Logical removal of a given file from the reservoir. Acts as a tombstone before a file is deleted permanently.
记录删除掉的文件,一般在Merge/Update等操作会有RemoveFile操作。删除的文件并不会被物理删除,只是在元数据中标记该文件删除了,可以通过vacuum命令来实际物理删除超过墓碑时间的文件(默认7天)
SetTransaction
Sets the committed version for a given application. Used to make operations like streaming append idempotent.
如:
{“txn”:{“appId”:“f5ef8a90-069a-4311-bd0f-4f0c93d89cfe”,“version”:370,“lastUpdated”:1574837260639}}
SparkStreaming sink到Delta时,记录相关信息来实现ExactlyOnce特性
元数据产生
屏幕快照 2019-11-29 下午5.24.49.png
如上图所示,_delta_log文件夹下文件的一个产生演化流程,每次对Delta表进行相关修改操作(如Delte/Update等)都会生成一个json文件,记录当前修改的所有actions。
snapshot
从上面流程可以看出,Delta支持snapshot功能,即可以查看历史某个时间点状态下的Delta表数据,这个也是Delta的TimeTravel功能的基础实现,详见文档(https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html)
每个json文件对应一个snapshot版本,Delta在生成这个snapshot的时候,会将小于等于这个版本号的所有json文件按照时间顺序进行回放合并,snapshot 版本为3,那么它是有00000000000000000000.json/00000000000000000001.json/00000000000000000002.json三个按照顺序合并过来,同一个path的action会进行合并,比如0.json中有AddFile(path1), 1.json中有RemoveFile(path1),那么snapshot版本3状态下的Delta表是不包含path1作为实际数据参与计算的。
checkpoint