在measuer源码中,BatchDQApp执行run方法时(这里以batch数据处理hive源数据库为例),创建数据检测job时,实际上是通过客户配置的DSL的语法在代码中转换成对一个的sql执行
// 创建数据检测对比job信息
// build job
val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.getEvaluateRule)
进入到DQJobBuilder的buildDQJob方法
def buildDQJob(context: DQContext, ruleParams: Seq[RuleParam]): DQJob = {
// build steps by datasources
val dsSteps = context.dataSources.flatMap { dataSource =>
DQStepBuilder.buildStepOptByDataSourceParam(context, dataSource.dsParam)
}
// build steps by rules
/**
* SeqDQStep(List(SparkSqlTransformStep(__missRecords,SELECT `source`.* FROM `source` LEFT JOIN `target` ON coalesce(`source`.`user_id`, '') = coalesce(`target`.`user_id`, '') AND upper(`source`.`first_name`) = upper(`target`.`first_name`) AND coalesce(`source`.`last_name`, '') = coalesce(`target`.`last_name`, '') AND coalesce(`source`.`address`, '') = coalesce(`target`.`address`, '') AND coalesce(`source`.`email`, '') = coalesce(`target`.`email`, '') AND coalesce(`source`.`phone`, '') = coalesce(`target`.`phone`, '') AND coalesce(`source`.`post_code`, '') = coalesce(`target`.`post_code`, '') WHERE (NOT (`source`.`user_id` IS NULL AND `source`.`first_name` IS NULL AND `source`.`last_name` IS NULL AND `source`.`address` IS NULL AND `source`.`email` IS NULL AND `source`.`phone` IS NULL AND `source`.`post_code` IS NULL)) AND (`target`.`user_id` IS NULL AND `target`.`first_name` IS NULL AND `target`.`last_name` IS NULL AND `target`.`address` IS NULL AND `target`.`email` IS NULL AND `target`.`phone` IS NULL AND `target`.`post_code` IS NULL),Map(),true), SparkSqlTransformStep(__missCount,SELECT COUNT(*) AS `miss` FROM `__missRecords`,Map(),false), SparkSqlTransformStep(__totalCount,SELECT COUNT(*) AS `total` FROM `source`,Map(),false), SparkSqlTransformStep(accu,
SELECT A.total AS `total`,
A.miss AS `miss`,
(A.total - A.miss) AS `matched`,
coalesce( (A.total - A.miss) / A.total, 1.0) AS `matchedFraction`
FROM (
SELECT `__totalCount`.`total` AS total,
coalesce(`__missCount`.`miss`, 0) AS miss
FROM `__totalCount` LEFT JOIN `__missCount`
) AS A
,Map(),false), MetricWriteStep(accu,accu,DefaultFlattenType,None), RecordWriteStep(__missRecords,__missRecords,None,None)))
*/
val ruleSteps = ruleParams.flatMap { ruleParam =>
DQStepBuilder.buildStepOptByRuleParam(context, ruleParam)
}
// metric flush step
val metricFlushStep = MetricFlushStep()
/**
* ++ 用于连接两个集合
* :+ 用于在集合尾部追加集合
* +: 用于在集合头部追加集合
*/
DQJob(dsSteps ++ ruleSteps :+ metricFlushStep)
}
执行
DQStepBuilder.buildStepOptByRuleParam(context, ruleParam)
最终进入到GriffinDslDQStepBuilder的buildSteps方法
def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = {
val name = getStepName(ruleParam.getOutDfName())
val rule = ruleParam.getRule
val dqType = ruleParam.getDqType
try {
val result = parser.parseRule(rule, dqType)
if (result.successful) {
val expr = result.get
val expr2DQSteps = Expr2DQSteps(context, expr, ruleParam.replaceOutDfName(name))
expr2DQSteps.getDQSteps()
} else {
warn(s"parse rule [ ${rule} ] fails: \n${result}")
Nil
}
} catch {
case e: Throwable =>
error(s"generate rule plan ${name} fails: ${e.getMessage}", e)
Nil
}
}
官方文档:Apache Griffin DSL Guide
Griffin DSL 是用于 DQ 测量的,是一个类SQL语法,用来描述 DQ 域问题。
实际上,在 Griffin 中,我们得到了 Griffin DSL 规则,将它们转换为 Spark-SQL 规则,以便在 Spark-SQL 引擎中进行计算。
在 DQ 域中,有多个维度,我们需要以不同的方式翻译它们。
对于准确性,我们需要得到源和目标之间的匹配计数,规则描述了数据源之间的映射关系。Griffin 需要将 DSL 规则转换为多个 SQL 规则。
例如,DSL 规则是“ source.id = target.id and source.name = target.name ",表示精度的匹配条件。翻译后,SQL 规则如下:
从源获取丢失的项目:SELECT source.* FROM source LEFT JOIN target ON
coalesce(source.id, ‘’) = coalesce(target.id, ‘’) and
coalesce(source.name, ‘’) = coalesce(target.name, ‘’) WHERE (NOT
(source.id IS NULL AND source.name IS NULL)) AND (target.id IS NULL
AND target.name IS NULL), save as table miss_items
获取丢失的总数:SELECT COUNT(*) AS miss FROM miss_items, save as table
miss_count.
从源表中获取统计的总数:SELECT COUNT(*) AS total FROM source, save as
tabletotal_count.
获取精度度量:SELECT miss_count.miss AS miss, total_count.total AS total,
(total_count.total - miss_count.miss) AS matched FROM miss_count FULL
JOIN total_count, save as table accuracy.
解析后,将数据保存到tableaccuracy中
对于统计的分析,请求始终是数据的聚合函数,该规则主要与 SQL 相同,但只是支持select,from,where,group-by,having,order-by,limit子句,它可以描述大多数剖析请求。如果有复杂的请求,可以直接使用 SQL 规则来描述它。
例如,DSL 规则是source.cntry, source.count(), source.age.max() group by source.cntry"表示分析请求。翻译后,SQL 规则如下:
翻译之后,度量将被保存在表中profiling.
对于唯一性,或称为重复,是为了找出重复的数据项,并将项目计数组按重复的时间卷积。
例如,DSL 规则是 “name,age”,它表示重复的请求,在本例中,源和目标是相同的数据集。翻译后,SQL 规则如下:
数据集总和:SELECT COUNT(*) AS total FROM source, save as table total_count.
根据指定域分组:SELECT name, age, (COUNT(*) - 1) AS dup, TRUE AS dist FROM source GROUP BY name, age, save as table dup_count.
不同度量:SELECT COUNT(*) AS dist_count FROM dup_count WHERE dist, save as table distinct_metric.
源数据关联不同度量的表: SELECT source.*, dup_count.dup AS dup, dup_count.dist AS
dist FROM source LEFT JOIN dup_count ON source.name = dup_count.name
AND source.age = dup_count.age, save as table dist_joined.
增加行数: SELECT *, ROW_NUMBER() OVER (DISTRIBUTE BY name, age SORT BY
dist) row_num FROM dist_joined, save as table row_numbered.
重复记录:SELECT name, age, dup FROM row_numbered WHERE NOT dist OR row_num > 1, save as table dup_records.
重复指标:SELECT name, age, dup, COUNT(*) AS num FROM dup_records GROUP BY name, age, dup, save as table dup_metric.
对于完整性,需要检查NULL。如果您测量的列为NULL,则它们是不完整的。
对于及时性,是度量每个项目的延迟,并获得延迟的统计信息。
例如,DSL 规则是 “ts,out _ ts”,第一列表示项的输入时间,第二列表示项的输出时间,如果不设置,“_ _ tmst” 将是默认的输出时间列。翻译后,SQL 规则如下: