Apache Iceberg的Schema Evolution详解

邵麒
2023-12-01

导语

作为构建新一代数据湖的三个中间件Apache Iceberg, Apache Hudi, Delta Lake都支持Schema Evolution,但是三者的支持能力不尽相同,其中Iceberg宣称支持 Full Schema Evolution。本文将详细分析Iceberg 的Full Schema Evolution, 同时捎带对比下和Delta Lake以及Hudi的Schema Evolution的不同。

为什么需要Schema Evolution

用户的数据随着时间和业务量的增长会需要有一些格式上的变化,例如添加新的纬度,更细的分区粒度等。传统的Hive表如果想要处理这些变化可能需要创建一个新的表,将旧的数据读出来再写到新的表里。如果表的分区粒度也需要发生变化,例如,分区从天变成小时,那么还需要上层更改相关的查询语句,甚至还有引起正确性的问题。所以表结构更新Schema Evolution是新一代数据湖的一个重要特性, 使用新新一代数据湖的Schema Evolution特性很容易的对表的结构进行一些微调,例如添加某些列,从而满足用户数据变化的需求。

作为构建新一代数据湖的主流中间件,Apache Iceberg支持Full Schema Evolution的功能,包括添加列,删除列,更新列,更新分区列等操作。用户可以任意的对表的结构进行in-place的更新,包括对普通列以及嵌套类型的列进行结构更新,甚至当用户的存储更换时还支持对分区的列进行更新,后面的内容将详细介绍这个功能的重要作用。值得一提的是,Iceberg的表结构更新是内在的元信息更新,不需要花费数据迁移或者数据重写的代价。

Iceberg支持的表结构更新操作如下:

  • 添加列:添加一个列到表,支持添加到嵌套类型列

  • 删除列:删除表的某个列,支持删除嵌套类型

  • 重命名列:重命名一个现有的列,支持嵌套类型

  • 更新列:支持将列改的更宽,例如,Integer 更新成Long,支持更新struct, map, list复杂类型

  • 重排序列:支持更新列的顺序,包括嵌套类型里面列的顺序

同时由于Iceberg的Schema逻辑是独立与上层引擎和底层文件格式,所以Iceberg 的schema evolution可以保证:

  • 添加的列不会去读现有列上的数据

  • 删除一个列不会影响其他列

  • 更新一个列不会影响其他列的值

  • 更新列顺序不会影响对应列的值

如何使用Iceberg Schema Evolution

首先,这里定义一个Iceberg的表结构如下:

final Schema SCHEMA = new Schema(  required(1, "id", Types.IntegerType.get()),  optional(2, "data", Types.StringType.get()),  optional(3, "preferences", Types.StructType.of(    required(8, "feature1", Types.BooleanType.get()),    optional(9, "feature2", Types.BooleanType.get())  ), "struct of named boolean options"),  required(4, "locations", Types.MapType.ofRequired(10, 11,    Types.StructType.of(      required(, "address", Types.StringType.get()),      required(, "city", Types.StringType.get()),      required(, "state", Types.StringType.get()),      required(, "zip", Types.IntegerType.get())    ),    Types.StructType.of(      required(, "lat", Types.FloatType.get()),      required(17, "long", Types.FloatType.get())    )), "map of address to coordinate"),  optional(5, "points", Types.ListType.ofOptional(,    Types.StructType.of(      required(19, "x", Types.LongType.get()),      required(, "y", Types.LongType.get())    )), "2-D cartesian points"),  required(6, "doubles", Types.ListType.ofRequired(17,    Types.DoubleType.get()  )),  optional(7, "properties", Types.MapType.ofOptional(, ,    Types.StringType.get(),    Types.StringType.get()  ), "string map of properties"));

Column Evolution

添加列​​​​​​​

//添加一个顶级列    table.UpdateSchema()  .addColumn("toplevel",     Types.DecimalType.of(9, 2))  .commit()
//添加一个列到嵌套类型里面table.UpdateSchema()  .addColumn("points", "z",    Types.LongType.get(), "z axis")  .commit()

更新列​​​​​​​

// 更新列类型table.UpdateSchema()  .updateColumn("id",    Types.LongType.get())  .commit()
// 更新嵌套类里面子列类型table.UpdateSchema()  .updateColumn("locations.lat",    Types.DoubleType.get())  .commit()table.UpdateSchema()  .updateColumn("locations.long",    Types.DoubleType.get())  .commit()

重命名列​​​​​​​

// 重命名顶级列table.UpdateSchema()  .renameColumn("data", "json")  .commit()
// 重命名嵌套类型子列table.UpdateSchema()  .renameColumn("preferences.feature2", "newfeature")  .commit() table.UpdateSchema()  .renameColumn("locations.lat", "latitude")  .commit()table.UpdateSchema()  .renameColumn("points.x", "X")  .commit()

删除列​​​​​​​

table.UpdateSchema()  .deleteColumn("points.z").commit();

Partition Evolution

和Spark与Hive不同,Iceberg采用的分区方式是隐式分区,这种隐式分区的方式使得分区更加灵活,可以通过以某些列作为输入,然后指定一个变换函数结合起来作为一个分区格式。例如,假设表的设计里面有一列是event_time,那么可能的分区方法有:

  • date(event_time):根据日期分区

  • mouth(event_time):根据月分区

  • year(event_time):根据年分区

  • day(event_time):根据天分区

  • bucket(event_time, 10):分成10个桶

  • identity(event_time):根据event_time分区

  • truncate(event_time, 5):根据event_time前5位的宽度分区

当然如果你觉得这些都不够用, 那么你还可以自己写一个Transform 接口的实现来指定分区策略。

在隐式分区技术的基础上,Iceberg实现了Partition Evolution,这个功能可以让上层的查询语句不需要做任何的更新,仍然可以无缝的使用分区过滤功能。这非常关键,例如原先你的数据是按照日期进行分区,随着数据不断增长原先分区里的数据越来越多,分区过滤后数据还是很多,现在想换下分区策略,改成按小时分区。原先的一个查询SQL:​​​​​​​

SELECT level, count(1) as count FROM logs WHERE event_time  BETWEEN '2018-12-01 10:00:00' AND '2018-12-01 12:00:00' AND event_date = '2018-12-01'

如果是不使用Iceberg, 那么正常的变更分区方式可能需要更新表,重新添加一个虚拟列hour作为分区列。同时需要添加一个条件语句是 and hour = '10', 如果上层查询语句没有更新,那么分区过滤也无效。

如果使用Iceberg,可以用下面代码更新下分区策略:​​​​​​​

//可以通过 table = (BaseTable)origTable的方式获取内部table实例
TableMetadata current = table.operations().current();
PartitionSpec newSpec = PartitionSpec.builderFor(table.schema())  .hour("event_time")  .withSpecId(1)  .build();table.ops().commit(current, current.updatePartitionSpec(newSpec));

更新好策略之后,不需要做任何的数据迁移,上层查询语句不需要做任何变化即可直接用上分区过滤功能。是不是节省了很多事

Schema Evolution内核剖析

与Delta Lake和Hudi不同,Iceberg有自己独立定义的Schema,它定义了field id, field name到NestedField的映射,同时还定义了一系列的visitor用于访问和更新Schema。通过这套独立的Schema逻辑以及一系列visitor,Iceberg Schema可以不用和Spark 的Schema以及底层的文件格式Schema耦合,从而实现 full schema evolution。

丰富的Schema Visitor

Iceberg为方便对Schema操作以及与引擎和底层文件格式之间的转换,定义了一系列的Visitor。

与Spark Type转换的Visitor

上面[Iceberg Schema Evolution](如何使用Iceberg Schema Evolution)的使用中我们定义了一个典型的Iceberg Schema, 例子中我们可以看出Iceberg Schema的定义和Parquet表的定义很类似,支持嵌套类型array, list, map,它可以很容易的和Spark的StructType进行转换:​​​​​​​

//从Spark表获取Schema并转成Iceberg SchemaSchema schema = SparkSchemaUtil  .schemaForTable(sparkSession, tableName);
//从Spark表获取Partition信息,转成Iceberg PartitionSpecPartitionSpec spc = SparkSchemaUtil  .specForTable(spark, tableName);
// Spark StructType和Iceberg Schema互相Schema schema = SparkSchemaUtil.convert(structType);StructType struct = SparkSchemaUtil.convert(schema);

这里主要得益于两个Schema Visitor, TypeToSparkType 和SparkTypeToType, 这两个visitor可以对已有的表结构进行深度遍历,并在遍历同时生成另外一种格式的表结构。我们看下其中一个的具体visit方法:​​​​​​​

static <T> T visit(DataType type, SparkTypeVisitor<T> visitor) {  if (type instanceof StructType) {    StructField[] fields = ((StructType) type).fields();    List<T> fieldResults = Lists.newArrayListWithExpectedSize(fields.length);
    for (StructField field : fields) {      fieldResults.add(visitor.field(        field,        visit(field.dataType(), visitor)));    }
    return visitor.struct((StructType) type, fieldResults);
  } else if (type instanceof MapType) {    return visitor.map((MapType) type,      visit(((MapType) type).keyType(), visitor),      visit(((MapType) type).valueType(), visitor));
  } else if (type instanceof ArrayType) {    return visitor.array(      (ArrayType) type,      visit(((ArrayType) type).elementType(), visitor));
  } else if (type instanceof UserDefinedType) {    throw new UnsupportedOperationException(      "User-defined types are not supported");  } else {    return visitor.atomic(type);  }}

这是典型的visitor模式,通过深度遍历Schema 访问内部结构构建想要的结果。Schema相关的许多操作都是通过Visitor模式完成。例如分配ID, Schema更新。

与Avro和Parquet 类型转换的 Visitor

与Parquet MessageType互相转换的Visitor: MessageTypeToType和TypeToMessageType

与Avro Schema互相转换的Visitor : TypeToSchema和SchemaToType

分配Field ID的Visitor

分配由AssignFreshIds这个visitor来完成,AssignFreshIds负责对一个Type分配新的Id, 由于Type是一个嵌套类型,所以需要由visitor来遍历。值得注意的,AssignFreshId分配id的方式并不是先序深度遍历,它在访问节点时先分配了当前所有子节点id,然后将子节点的访问作为Future保存,这样就有点类似广度优先。这一点可以从我们上面的示例Schema中看出。

更新Schema的Visitor

Schema更新由ApplyChanges这个Schema Visitor来完成,它包含了新增列,删除列,更新列三个操作,这三个更新操作分别会生成新的列的集合如下:

  • 新增列:用一个MultiMap<Integer, NestedField> adds表示,key是parent field, value是新增的filed集合

  • 删除列:用一个Setdeletes表示

  • 更新列:用一个Map<Integer, NestedField> updates表示

当用户需要commit这些更新时,ApplyChanges的在遍历当前Schema的时候会分别查询下当前的field的id是否在adds, deletes, updates三个集合中,如果正好在的话就会进行相应的处理,最后ApplyChanges会生成一个新的Schema文件,值得注意的是,那些没有变化的field它们的field ID也没有变化。这就保证了一定的兼容性。

总结

Schema Evolution是新一代数据湖必备的技能,不管是Iceberg, Delta Lake, Hudi都宣称自己有Schema Evolution的功能,但只有Iceberg真正做到full schema evolution。Iceberg的亮点在于它把Schema的逻辑独立抽象出来,使得Schema和Partition与上层引擎和底层的文件格式解耦做到更强大的功能。

 类似资料: