此案例适用于基于hive,HDFS等批数据作为数据源进行数据质量监控。
假设我们有一个数据集(demo_src),按小时划分,我们想知道每个小时的数据是什么样的。
为简单起见,假设两个数据集都具有与此相同的架构:
id bigint
age int
desc string
dt string
hour string
dt 和 hour 都是分区,
因为每天我们都有一个每日分区 dt(如 20180912),
每天我们有 24 小时的分区(例如 00、01、02、…、23)。
为 Apache Griffin 测量模块准备环境,包括以下组件:
有关以上组件的详细的配置过程,可以参考griffin/griffin-doc/deploy,本文假定以上环境均已配置完毕。
有关版本匹配的信息,可参考https://github.com/apache/griffin/blob/master/griffin-doc/deploy/measure-build-guide.md
1.在此处下载 Apache Griffin 源包。
2.解压源包。
unzip griffin-0.4.0-source-release.zip
cd griffin-0.4.0-source-release
3.构建 Apache Griffin jar
mvn clean install
并将构建的 apache griffin jar包移动到项目路径中
mv measure/target/measure-0.4.0.jar <work path>/griffin-measure.jar
为了快速开始,我们生成一个hive数据表demo_src.
--create hive tables here. hql script
--Note: replace hdfs location with your own path
CREATE EXTERNAL TABLE `demo_src`(
`id` bigint,
`age` int,
`desc` string)
PARTITIONED BY (
`dt` string,
`hour` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
LOCATION
'hdfs:///griffin/data/batch/demo_src';
数据格式类似于这样:
1|18|student
2|23|engineer
3|42|cook
...
可以下载demo数据并执行 ./gen_demo_data.sh 以获取数据源文件。 然后我们以每小时将数据加载到 hive 表中。
Apache Griffin环境配置
环境配置文件:env.json
{
"spark": {
"log.level": "WARN"
},
"sinks": [
{
"type": "console"
},
{
"type": "hdfs",
"config": {
"path": "hdfs:///griffin/persist"
}
},
{
"type": "elasticsearch",
"config": {
"method": "post",
"api": "http://es:9200/griffin/accuracy"
}
}
]
}
定义griffin数据质量(DQ)
DQ配置文件:dq.json
{
"name": "batch_prof",
"process.type": "batch",
"data.sources": [
{
"name": "src",
"baseline": true,
"connectors": [
{
"type": "hive",
"version": "1.2",
"config": {
"database": "default",
"table.name": "demo_tgt"
}
}
]
}
],
"evaluate.rule": {
"rules": [
{
"dsl.type": "griffin-dsl",
"dq.type": "profiling",
"out.dataframe.name": "prof",
"rule": "src.id.count() AS id_count, src.age.max() AS age_max, src.desc.length().max() AS desc_length_max",
"out": [
{
"type": "metric",
"name": "prof"
}
]
}
]
},
"sinks": ["CONSOLE", "HDFS"]
}
将测量作业提交到 Spark,以配置文件路径作为参数。
spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default \
--driver-memory 1g --executor-memory 1g --num-executors 2 \
<path>/griffin-measure.jar \
<path>/env.json <path>/dq.json
在控制台中可以获取计算日志,作业完成后,可以打印结果指标。 指标也将保存在 hdfs 中:hdfs:///griffin/persist///_METRICS。
还可以根据结果,以及实际业务需要,进一步改进数据质量度量
有关度量指标的详细的各项配置参数的含义,可以参考griffin/griffin-doc/measure