四.Apache Griffin基于Hive batch批数据的质量监控实战

郑卜鹰
2023-12-01

数据集

此案例适用于基于hive,HDFS等批数据作为数据源进行数据质量监控。

假设我们有一个数据集(demo_src),按小时划分,我们想知道每个小时的数据是什么样的。

为简单起见,假设两个数据集都具有与此相同的架构:

id                      bigint                                      
age                     int                                         
desc                    string                                      
dt                      string                                      
hour                    string 

dt 和 hour 都是分区,

因为每天我们都有一个每日分区 dt(如 20180912),

每天我们有 24 小时的分区(例如 00、01、02、…、23)。

环境准备

为 Apache Griffin 测量模块准备环境,包括以下组件:

  • JDK (1.8+)
  • Hadoop (2.6.0+)
  • Spark (2.2.1+)
  • Hive (2.2.0)

有关以上组件的详细的配置过程,可以参考griffin/griffin-doc/deploy,本文假定以上环境均已配置完毕。
有关版本匹配的信息,可参考https://github.com/apache/griffin/blob/master/griffin-doc/deploy/measure-build-guide.md

构建 Apache Griffin 测量模块

1.在此处下载 Apache Griffin 源包。
2.解压源包。

unzip griffin-0.4.0-source-release.zip
cd griffin-0.4.0-source-release

3.构建 Apache Griffin jar

mvn clean install

并将构建的 apache griffin jar包移动到项目路径中

mv measure/target/measure-0.4.0.jar <work path>/griffin-measure.jar

数据准备

为了快速开始,我们生成一个hive数据表demo_src.

--create hive tables here. hql script
--Note: replace hdfs location with your own path
CREATE EXTERNAL TABLE `demo_src`(
  `id` bigint,
  `age` int,
  `desc` string) 
PARTITIONED BY (
  `dt` string,
  `hour` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '|'
LOCATION
  'hdfs:///griffin/data/batch/demo_src';

数据格式类似于这样:

1|18|student
2|23|engineer
3|42|cook
...

可以下载demo数据并执行 ./gen_demo_data.sh 以获取数据源文件。 然后我们以每小时将数据加载到 hive 表中。

定义数据质量指标

Apache Griffin环境配置
环境配置文件:env.json

{
  "spark": {
    "log.level": "WARN"
  },
  "sinks": [
    {
      "type": "console"
    },
    {
      "type": "hdfs",
      "config": {
        "path": "hdfs:///griffin/persist"
      }
    },
    {
      "type": "elasticsearch",
      "config": {
        "method": "post",
        "api": "http://es:9200/griffin/accuracy"
      }
    }
  ]
}

定义griffin数据质量(DQ)
DQ配置文件:dq.json

{
  "name": "batch_prof",
  "process.type": "batch",
  "data.sources": [
    {
      "name": "src",
      "baseline": true,
      "connectors": [
        {
          "type": "hive",
          "version": "1.2",
          "config": {
            "database": "default",
            "table.name": "demo_tgt"
          }
        }
      ]
    }
  ],
  "evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "profiling",
        "out.dataframe.name": "prof",
        "rule": "src.id.count() AS id_count, src.age.max() AS age_max, src.desc.length().max() AS desc_length_max",
        "out": [
          {
            "type": "metric",
            "name": "prof"
          }
        ]
      }
    ]
  },
  "sinks": ["CONSOLE", "HDFS"]
}

测量数据质量

将测量作业提交到 Spark,以配置文件路径作为参数。

spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default \
--driver-memory 1g --executor-memory 1g --num-executors 2 \
<path>/griffin-measure.jar \
<path>/env.json <path>/dq.json

报告数据质量指标

在控制台中可以获取计算日志,作业完成后,可以打印结果指标。 指标也将保存在 hdfs 中:hdfs:///griffin/persist///_METRICS。

优化数据质量报告

还可以根据结果,以及实际业务需要,进一步改进数据质量度量

有关度量指标的详细的各项配置参数的含义,可以参考griffin/griffin-doc/measure

 类似资料: