当前位置: 首页 > 工具软件 > Apache Hudi > 使用案例 >

Apache Hudi 入门学习总结

史昀
2023-12-01

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun

前言

学习和使用Hudi近一年了,由于之前忙于工作和学习,没时间总结,现在从头开始总结一下,先从入门开始

Hudi 概念

Apache Hudi 是一个支持插入、更新、删除的增量数据湖处理框架,有两种表类型:COW和MOR,可以自动合并小文件,Hudi自己管理元数据,元数据目录为.hoodie,
具体的概念可以查看官网https://hudi.apache.org/cn/docs/0.9.0/overview

Hudi 学习

Hudi 安装

只需要将Hudi的jar包放到Spark和Hive对应的路径下,再修改几个配置

Spark

Hudi支持Spark程序读写Hudi表,同时也支持Spark SQL insert/update/delete/merge等

包名:hudi-spark-bundle_2.11-0.9.0.jar
下载地址:https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark-bundle_2.11/0.9.0/hudi-spark-bundle_2.11-0.9.0.jar
包名:hudi-utilities-bundle_2.11-0.9.0.jar
下载地址:https://repo1.maven.org/maven2/org/apache/hudi/hudi-utilities-bundle_2.11/0.9.0/hudi-utilities-bundle_2.11-0.9.0.jar

将hudi-spark-bundle_2.11-0.9.0.jar 和 hudi-utilities-bundle_2.11-0.9.0.jar拷贝到$SPARK_HOME/jars,当前版本目录为/usr/hdp/3.1.0.0-78/spark2/jars/
版本说明:0.9.0为hudi发行版本,2.11为HDP中Spark对应的scala版本
这里提供的是Maven的下载地址,对于其他版本,Maven上可以下载到,当然也可以自己打包

Hive

Hudi可以将元数据同步到Hive表中,Hive只能用来查询,不能insert/update/delete

包名:hudi-hadoop-mr-bundle-0.9.0.jar
下载地址:https://repo1.maven.org/maven2/org/apache/hudi/hudi-hadoop-mr-bundle/0.9.0/hudi-hadoop-mr-bundle-0.9.0.jar
1、将hudi-hadoop-mr-bundle-0.9.0.jar 拷贝至$HIVE_HOME/lib,当前版本目录为:/usr/hdp/3.1.0.0-78/hive/lib/
2、修改hive配置(在hive-site.xml) hive.input.format=org.apache.hudi.hadoop.HoodieParquetInputFormat
hive.resultset.use.unique.column.names=false (修改这里的配置是因为如果我们用hudi-utilities-bundle中的工具类HoodieDeltaStreamer,其中的JdbcbasedSchemaProvider解析Hive表Schema时需要设置这个属性,否则解析异常,关于HoodieDeltaStreamer的使用我会单独在另一篇文章中总结)
3、重启hive

Tez

1、将上述hudi-hadoop-mr-bundle-0.9.0.jar 打到/hdp/apps/${hdp.version}/tez/tez2.tar.gz中
注意:这里的路径是指HDFS路径
2、修改hive配置(在hive-site.xml) hive.tez.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat
3、重启Tez、Hive

关于第一个打包到tez2.tar.gz,我自己写了一个脚本,如下:

jar=$1

sudo rm -r tez_temp
mkdir tez_temp
cd tez_temp
hadoop fs -get /hdp/apps/3.1.0.0-78/tez/tez.tar.gz
mkdir tez
tar -zxvf tez.tar.gz -C tez
mkdir gz
sudo rm -r tez/lib/hudi-hadoop-mr*
cp $jar tez/lib/
cd tez
tar -zcvf  ../gz/tez.tar.gz ./*
hadoop fs -rm -r  /hdp/apps/3.1.0.0-78/tez/tez.tar.gz.back
hadoop fs -mv  /hdp/apps/3.1.0.0-78/tez/tez.tar.gz /hdp/apps/3.1.0.0-78/tez/tez.tar.gz.back
cd ../gz/
hadoop fs -put tez.tar.gz /hdp/apps/3.1.0.0-78/tez/
su - hdfs <<EOF
kinit -kt /etc/security/keytabs/hdfs.headless.keytab hdfs-cluster1@INDATA.COM
hadoop fs -chown hdfs:haoop /hdp/apps/3.1.0.0-78/tez/tez.tar.gz
hadoop fs -chmod 444  /hdp/apps/3.1.0.0-78/tez/tez.tar.gz
hadoop fs -ls /hdp/apps/3.1.0.0-78/tez/
exit
EOF

这个脚本在我自己的环境上是可以正常运行使用的,当然可能因本人水平有限,写的还不够好,不能适用所有环境,可以自行修改,仅做参考

Flink

Hudi也支持Flink,本人目前还不会Flink~,可以参考官网https://hudi.apache.org/cn/docs/0.9.0/flink-quick-start-guide

Hudi 写入

Hudi支持Spark、Flink、Java等多种客户端,本人常用Spark、Java客户端,这俩相比较而言,大家用Spark较多,这里就以Spark代码进行简单的示例总结

Spark 配置参数

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.spark.sql.SaveMode.{Append, Overwrite}
import org.apache.spark.sql.hudi.command.UuidKeyGenerator
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}


 val spark = SparkSession.builder().
      master("local[*]").
      appName("SparkHudiDemo").
      config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
      // 扩展Spark SQL,使Spark SQL支持Hudi
      config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension").
      // 支持Hive,本地测试时,注释掉
      //      enableHiveSupport().
      getOrCreate()

写Hudi并同步到Hive表

代码示例:

    val spark = SparkSession.builder().
      master("local[*]").
      appName("SparkHudiDemo").
      config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
      // 扩展Spark SQL,使Spark SQL支持Hudi
      config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension").
      // 支持Hive,本地测试时,注释掉
      //      enableHiveSupport().
      getOrCreate()

    import spark.implicits._
    val df = Seq((1, "a1", 10, 1000, "2022-05-12")).toDF("id", "name", "value", "ts", "dt")

    val databaseName = "default"
    val tableName1 = "test_hudi_table_1"
    val primaryKey = "id"
    val preCombineField = "ts"
    val partitionField = "dt"
    val tablePath1 = "/tmp/test_hudi_table_1"
    save2HudiSyncHiveWithPrimaryKey(df, databaseName, tableName1, primaryKey, preCombineField, partitionField,
      UPSERT_OPERATION_OPT_VAL, tablePath1, Overwrite)
    spark.read.format("hudi").load(tablePath1).show(false)
    // 删除表
    save2HudiSyncHiveWithPrimaryKey(df, databaseName, tableName1, primaryKey, preCombineField, partitionField,
      DELETE_OPERATION_OPT_VAL, tablePath1, Append)
    spark.read.format("hudi").load(tablePath1).show(false)


  /**
   * 写hudi并同步到hive,有主键,分区字段dt
   *
   */
  def save2HudiSyncHiveWithPrimaryKey(df: DataFrame, databaseName: String, tableName: String, primaryKey: String, preCombineField: String,
                                      partitionField: String, operation: String, tablePath: String, mode: SaveMode): Unit = {
    df.
      write.format("hudi").
      option(RECORDKEY_FIELD.key, primaryKey). // 主键字段
      option(PRECOMBINE_FIELD.key, preCombineField). // 预合并字段
      option(PARTITIONPATH_FIELD.key, partitionField).
      option(TBL_NAME.key, tableName).
      option(KEYGENERATOR_CLASS_NAME.key(), classOf[ComplexKeyGenerator].getName).
      option(OPERATION.key(), operation).
      // 下面的参数和同步hive元数据,查询hive有关
      option(META_SYNC_ENABLED.key, true).
      option(HIVE_USE_JDBC.key, false).
      option(HIVE_DATABASE.key, databaseName).
      option(HIVE_AUTO_CREATE_DATABASE.key, true).
      // 内部表,这里非必须,但是在用saveAsTable时则必须,因为0.9.0有bug,默认外部表
      option(HIVE_CREATE_MANAGED_TABLE.key, true).
      option(HIVE_TABLE.key, tableName).
      option(HIVE_CREATE_MANAGED_TABLE.key, true).
      option(HIVE_STYLE_PARTITIONING.key, true).
      option(HIVE_PARTITION_FIELDS.key, partitionField).
      option(HIVE_PARTITION_EXTRACTOR_CLASS.key, classOf[MultiPartKeysValueExtractor].getName).
      // 为了SparkSQL更新用,0.9.0版本有bug,需要设置这个参数,最新版本已经修复,可以不设置这个参数
      // 详情查看PR:https://github.com/apache/hudi/pull/3745
      option(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key, s"primaryKey=$primaryKey").
      mode(mode)
      .save(tablePath)
  }

代码说明:本地测试需要把同步Hive的代码部分注释掉,因为同步Hive需要连接Hive metaStore
服务器spark-shell里可以跑完整的代码,可以成功同步Hive,0.9.0版本同步Hive时会抛出一个关闭Hive的异常,这个可以忽略,这是该版本的一个bug,虽然有异常但是已同步成功,最新版本已经修复该bug,具体可以查看PR:https://github.com/apache/hudi/pull/3364

我已经将该PR合到0.9.0版本,如果想使用的话,可以查看:https://gitee.com/dongkelun/hudi/commits/0.9.0,该分支也包含其他基于0.9.0版本的bug修复和特性支持。

读Hudi

Spark 读取如上述代码示例:

spark.read.format("hudi").load(tablePath1).show(false)

结果:

+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---+----+-----+----+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                       |id |name|value|ts  |dt        |
+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---+----+-----+----+----------+
|20220512101542     |20220512101542_0_1  |id:1              |2022-05-12            |38c1ff87-8bc9-404c-8d2c-84f720e8133c-0_0-20-12004_20220512101542.parquet|1  |a1  |10   |1000|2022-05-12|
+-------------------+--------------------+------------------+----------------------+------------------------------------------------------------------------+---+----+-----+----+----------+

可以看到多了几个Hudi元数据字段其中_hoodie_record_key为Hudi主键,如果设置了RECORDKEY_FIELD,比如这里的ID,那么_hoodie_record_key是根据我们设置字段生成的,默认不是复合主键,这里代码示例改为了复合主键,具体配置为

option(KEYGENERATOR_CLASS_NAME.key(), classOf[ComplexKeyGenerator].getCanonicalName).

这里主要为了和SparkSQL保持一致,因为SparkSQL默认的为复合主键,如果不一致,那么upsert/delete时会有问题

默认情况RECORDKEY_FIELD是必须设置的,RECORDKEY_FIELD的默认值为uuid,如果不设置,则会去找uuid,因为schema里没有uuid,那么会报错

Hive

在服务器上运行示例代码是可以成功同步到Hive表的,我们看一下Hive表情况:

show create table test_hudi_table_1;

下面是Hive Hudi表的建表语句,和普通的Hive表的建表语句的区别可以自己比较,其中SERDEPROPERTIES里的内容是为了SparkSQL用的,可以看到这里包含了’primaryKey’=‘id’,在0.9.0版本,Spark SQL获取Hudi的主键字段是根据Hive表里这里的’primaryKey’获取的,如果没有这个属性,那么
Spark SQL认为该表不是主键表,则不能进行update等操作

+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE `test_hudi_table_1`(                  |
|   `_hoodie_commit_time` string,                    |
|   `_hoodie_commit_seqno` string,                   |
|   `_hoodie_record_key` string,                     |
|   `_hoodie_partition_path` string,                 |
|   `_hoodie_file_name` string,                      |
|   `id` int,                                        |
|   `name` string,                                   |
|   `value` int,                                     |
|   `ts` int)                                        |
| PARTITIONED BY (                                   |
|   `dt` string)                                     |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
| WITH SERDEPROPERTIES (                             |
|   'hoodie.query.as.ro.table'='false',              |
|   'path'='/tmp/test_hudi_table_1',                  |
|   'primaryKey'='id')                               |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hudi.hadoop.HoodieParquetInputFormat'  |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION                                           |
|   'hdfs://cluster1/tmp/test_hudi_table_1'           |
| TBLPROPERTIES (                                    |
|   'last_commit_time_sync'='20220512101500',        |
|   'spark.sql.sources.provider'='hudi',             |
|   'spark.sql.sources.schema.numPartCols'='1',      |
|   'spark.sql.sources.schema.numParts'='1',         |
|   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"value","type":"integer","nullable":false,"metadata":{}},{"name":"ts","type":"integer","nullable":false,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
|   'spark.sql.sources.schema.partCol.0'='dt',       |
|   'transient_lastDdlTime'='1652320902')            |
+----------------------------------------------------+

Hive查询Hudi表:

select * from test_hudi_table_1;
+----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+-------+--------+-------+-------------+--+
| _hoodie_commit_time  | _hoodie_commit_seqno  | _hoodie_record_key  | _hoodie_partition_path  |                             _hoodie_file_name                             | id  | name  | value  |  ts   |     dt      |
+----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+-------+--------+-------+-------------+--+
| 20220513150854       | 20220513150854_0_1    | id:1                | dt=2022-05-12           | dd4ef080-97b6-4046-a337-abb01e26943e-0_0-21-12005_20220513150854.parquet  | 1   | a1    | 10     | 1000  | 2022-05-12  |
+----------------------+-----------------------+---------------------+-------------------------+---------------------------------------------------------------------------+-----+-------+--------+-------+-------------+--+


Hive是可以查询Hudi表的,但是不能update/delete,要想使用update/delete等语句,只能使用Spark SQL,另外Hive可以增量查询。关于如何使用Hudi Spark SQL和Hive的增量查询,这里不展开描述,以后会单独写

配置项说明

这里只说明几个比较重要的配置,其他相关的配置可以看官网和源码

  • RECORDKEY_FIELD:默认情况RECORDKEY_FIELD是必须设置的,RECORDKEY_FIELD的默认值为uuid,如果不设置,则会去找uuid,因为schema里没有uuid,那么会报错。另外Hudi0.9.0支持非主键Hudi表,只需要配置
    option(KEYGENERATOR_CLASS_NAME.key, classOf[UuidKeyGenerator].getName).即可,但是在后面的版本已经不支持了
  • KEYGENERATOR_CLASS_NAME:默认值为SimpleKeyGenerator,默认不支持复合主键,默认情况下上述_hoodie_record_key的内容为1,而不是id:1,而SparkSQL的默认值为SqlKeyGenerator,该类是ComplexKeyGenerator的子类:
class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)

也就是本示例所使用的的复合主键类,当使用SimpleKeyGeneratorComplexKeyGenerator同时upsert一个表时,那么会生成两条记录,因为_hoodie_record_key的内容不一样,所以一张表的 KEYGENERATOR_CLASS_NAME必须保证一致(父类和子类也是一致的)

  • PRECOMBINE_FIELD: 预合并字段,默认值:ts,想详细了解预合并可以参考我的另外两篇博客https://dongkelun.com/2021/07/10/hudiPreCombinedField/https://dongkelun.com/2021/11/30/hudiPreCombineField2/ upsert时,预合并是必须的,如果我们的表里没有预合并字段,或者不想使用预合并,不设置的话是会抛异常的,因为默认去找ts字段,找不到则跑异常,那么我们可以将预合并字段设置为主键字段
  • PARTITIONPATH_FIELD: Hudi的分区字段,默认值partitionpath,对于没有分区的表,我们需要将该字段设置为空字符串option(PARTITIONPATH_FIELD.key, ""),否则可能会因找不到默认值partitionpath而抛异常。最新版本已经去掉分区字段默认值,详情可见:https://github.com/apache/hudi/pull/4195
  • OPERATION: Hudi的写操作类型,默认值为UPSERT_OPERATION_OPT_VAL即upsert,Hudi支持多种操作类型 如:upsert、insert、bulk_insert、delete等,具体每个版本支持哪些操作类型,可以查看官网或源码,可以根据自己的需求选择选择操作类型。本代码展示了upsert成功后,又删除成功。

下面的参数和同步hive元数据,查询hive有关

  • META_SYNC_ENABLED: 默认为false,不同步Hive,要想同步Hive可以将该值设为true,另外也可以设置HIVE_SYNC_ENABLED进行同步Hive,作用差不多,至于区别,这里不详细解说
  • HIVE_USE_JDBC: 是否使用jdbc同步hive,默认为true,如果使用jdbc,那么需要设置HIVE_URLHIVE_USERHIVE_PASS等配置,因为url和ip有关,每个环境不一样,用起来比较麻烦,所以这里不采用,另外因为实际使用是和Hive绑定的,可以直接使用HMS进行同步,使用起来比较方便,改为false后默认使用HMS同步Hive,具体逻辑可以看Hudi Hive 同步模块的源码,这里不展开
  • HIVE_STYLE_PARTITIONING: 是否使用Hive格式的分区路径,默认为false,如果设置为true,那么分区路径格式为<partition_column_name>=<partition_value>,在这里为dt=2022-05-12,默认情况下只有<partition_value>即2022-05-12,因为我们常用Hive表查询Hudi所以,这里设置为true
  • HIVE_CREATE_MANAGED_TABLE: 同步Hive建表时是否为内部表,默认为false,使用saveAsTable(实际调用的Hudi Spark SQL CTAS)建表时0.9.0版本有,本应该为内部表,但还是为外部表,可以通过设置这个参数修正,最新版本已修复,详情可见PR:https://github.com/apache/hudi/pull/3146
  • HIVE_TABLE_SERDE_PROPERTIES: 同步到Hive表SERDEPROPERTIES,为了Hudi Spark SQL 使用,在0.9.0版本,Spark SQL获取Hudi的主键字段是根据Hive表里这里的’primaryKey’获取的,如果没有这个属性,那么Spark SQL认为该表不是主键表,则不能进行update等操作,而默认情况同步Hive时没有将主键字段同步过去,最新版本已经不需要设置该属性了。相关PR:https://github.com/apache/hudi/pull/3745 这个PR添加了支持HIVE_CREATE_MANAGED_TABLE配置,但是CTAS依旧有bug,代码里的虽然判断表类型是否为内部表,并添加到options中,但是最后并没有将options用到最终写Hudi的参数中。另一个PR:https://github.com/apache/hudi/pull/3998 该PR的主要目的不是为了解决这个bug,但是附带解决了这个问题,因为options最终被正确传到写Hudi的参数中了

其他Hive相关的配置参数不一一解释,可自行查看官网

hoodie.properties

.hoodie目录下有表属性文件.hoodie.properties,内容为:

hoodie.table.precombine.field=ts
hoodie.table.partition.fields=dt
hoodie.table.type=COPY_ON_WRITE
hoodie.archivelog.folder=archived
hoodie.populate.meta.fields=true
hoodie.timeline.layout.version=1
hoodie.table.version=2
hoodie.table.recordkey.fields=id
hoodie.table.base.file.format=PARQUET
hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.table.name=test_hudi_table_1

新版本在该属性文件里增加了很多属性,如HIVE_STYLE_PARTITIONINGhoodie.datasource.write.hive_style_partitioning,增加属性便于使表的属性前后保持统一

非主键表

如上面配置项说明所示Hudi0.9.0版本支持非主键表,对于纯insert的表有用,这里进行简单的代码示例

    val tableName2 = "test_hudi_table_2"
    val tablePath2 = "/tmp/test_hudi_table_2"
    save2HudiWithNoPrimaryKey(df, tableName2, tablePath2)
    spark.read.format("hudi").load(tablePath2).show(false)

  /**
   * 非主键表,非分区表
   */
  def save2HudiWithNoPrimaryKey(df: DataFrame, tableName: String, tablePath: String): Unit = {
    df.
      write.format("hudi").
      option(KEYGENERATOR_CLASS_NAME.key, classOf[UuidKeyGenerator].getName).
      option(RECORDKEY_FIELD.key, "").
      option(PARTITIONPATH_FIELD.key, "").
      option(TBL_NAME.key, tableName).
      option(OPERATION.key(), INSERT_OPERATION_OPT_VAL).
      mode(Overwrite).
      save(tablePath)
  }

结果:

+-------------------+--------------------+------------------------------------+----------------------+------------------------------------------------------------------------+---+----+-----+----+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key                  |_hoodie_partition_path|_hoodie_file_name                                                       |id |name|value|ts  |dt        |
+-------------------+--------------------+------------------------------------+----------------------+------------------------------------------------------------------------+---+----+-----+----+----------+
|20220512145525     |20220512145525_0_1  |7263eac1-51f6-42eb-834d-bb5dfe13708e|                      |4fe619f1-58b1-4f58-94e6-002f9f5f5155-0_0-20-12004_20220512145525.parquet|1  |a1  |10   |1000|2022-05-12|
+-------------------+--------------------+------------------------------------+----------------------+------------------------------------------------------------------------+---+----+-----+----+----------+

可以看到Hudi的主键为uuid,_hoodie_partition_path为空,即非主键非分区表

备注:insert默认是会随机更新的(如果是主键表,大家可以将程序改为主键表,自行测试),随机指某些情况下,这和Hudi合并小文件有关,原理这里不详细解释,可以自行查看源码(以后可能会单独总结一篇相关的文章,和Hudi写文件、合并文件有关)。
要想是insert操作不更新,可以使用以下配置:

hoodie.merge.allow.duplicate.on.inserts = true

相关PR:https://github.com/apache/hudi/pull/3644,这个PR是在Java客户端支持这个参数的,Spark客户端本身(在这之前)就支持这个参数

saveAsTable

利用saveAsTable写Hudi并同步Hive,实际最终调用的是Spark SQL CTAS(CreateHoodieTableAsSelectCommand)
CTAS 先用的insert into(InsertIntoHoodieTableCommand),再建表,默认insert,这里展示怎么配置参数使用bulk_insert,并且不使用预合并,这对于转化没有重复数据的历史表时很有用。
insert into SQL 默认是insert,配置一些参数就可以使用upsert/bulk_insert,具体可以看InsertIntoHoodieTableCommand源码

    val tableName3 = "test_hudi_table_3"
    save2HudiWithSaveAsTable(df, databaseName, tableName3, primaryKey)
    spark.table(tableName3).show()

  def save2HudiWithSaveAsTable(df: DataFrame, databaseName: String, tableName: String, primaryKey: String): Unit = {
    df.
      write.format("hudi").
      option(RECORDKEY_FIELD.key(), primaryKey).
      // 不需要预合并,所以设置为primaryKey
      // 当insert/bulk_insert等操作,并且关闭了相关参数,则不需要设置
      // SparkSQL中如果没有显示配置预合并字段,则默认将预合并字段设置为schema的最后一个字段
      // 如果为默认值的话,则可能会报null异常,所以设置为主键
      // `PRECOMBINE_FIELD.key -> tableSchema.fields.last.name`
      // 相关issue:https://github.com/apache/hudi/issues/4131
      option(PRECOMBINE_FIELD.key(), primaryKey).
      option(DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key, s"primaryKey=$primaryKey").
      option(TBL_NAME.key(), tableName).
      option(HIVE_CREATE_MANAGED_TABLE.key, true).
      // 关闭预合并,虽然默认值为false,但是0.9.0版本SparkSQL,当有主键时,设置为了true
      option(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key, false).
      // 使用bulk_insert
      option(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key, true).
      // 这里虽然为Overwrite,但是Hudi CTAS要求目录必须为空,否则会报验证错误
      mode(Overwrite).
      saveAsTable(s"$databaseName.$tableName")
  }

这段代码本地是可以直接跑通的,结果为:

+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+----+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|name|value|  ts|        dt|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+----+----------+
|     20220512154039|  20220512154039_0_1|              id:1|                      |de3c99a2-3858-462...|  1|  a1|   10|1000|2022-05-12|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+-----+----+----------+

本地测试并没有同步到Hive表,因为并没有开启enableHiveSupport()(本地验证时,注释掉这个配置),当在服务器上运行时,则可以成功同步到Hive表,可以自己试试,用saveAsTable的好处是,很多配置比如同步Hive都在Hudi Spark SQL的源码里配置了,所以配置较少。CTAS也有一些限制,比如不能覆盖写,不如save(path)灵活

代码

完整代码地址:https://gitee.com/dongkelun/spark-hudi/blob/master/src/main/scala/com/dkl/blog/hudi/SparkHudiDemo.scala
备注:以后可能因重构地址有所变动

总结

本文对Hudi安装、读写进行了简单的总结,因为精力原因写的可能没有很全面,希望对刚入门Hudi的同学有所帮助,后面会继续总结Hudi Spark SQL 等其他方面的知识。

 类似资料: