Apache Griffin核心源码measure之DSL转换SQL

萧鹏云
2023-12-01


一 简介

在measuer源码中,BatchDQApp执行run方法时(这里以batch数据处理hive源数据库为例),创建数据检测job时,实际上是通过客户配置的DSL的语法在代码中转换成对一个的sql执行

// 创建数据检测对比job信息
// build job
val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.getEvaluateRule)

进入到DQJobBuilder的buildDQJob方法

def buildDQJob(context: DQContext, ruleParams: Seq[RuleParam]): DQJob = {
    // build steps by datasources
    val dsSteps = context.dataSources.flatMap { dataSource =>
      DQStepBuilder.buildStepOptByDataSourceParam(context, dataSource.dsParam)
    }
    // build steps by rules
    /**
     * SeqDQStep(List(SparkSqlTransformStep(__missRecords,SELECT `source`.* FROM `source` LEFT JOIN `target` ON coalesce(`source`.`user_id`, '') = coalesce(`target`.`user_id`, '') AND upper(`source`.`first_name`) = upper(`target`.`first_name`) AND coalesce(`source`.`last_name`, '') = coalesce(`target`.`last_name`, '') AND coalesce(`source`.`address`, '') = coalesce(`target`.`address`, '') AND coalesce(`source`.`email`, '') = coalesce(`target`.`email`, '') AND coalesce(`source`.`phone`, '') = coalesce(`target`.`phone`, '') AND coalesce(`source`.`post_code`, '') = coalesce(`target`.`post_code`, '') WHERE (NOT (`source`.`user_id` IS NULL AND `source`.`first_name` IS NULL AND `source`.`last_name` IS NULL AND `source`.`address` IS NULL AND `source`.`email` IS NULL AND `source`.`phone` IS NULL AND `source`.`post_code` IS NULL)) AND (`target`.`user_id` IS NULL AND `target`.`first_name` IS NULL AND `target`.`last_name` IS NULL AND `target`.`address` IS NULL AND `target`.`email` IS NULL AND `target`.`phone` IS NULL AND `target`.`post_code` IS NULL),Map(),true), SparkSqlTransformStep(__missCount,SELECT COUNT(*) AS `miss` FROM `__missRecords`,Map(),false), SparkSqlTransformStep(__totalCount,SELECT COUNT(*) AS `total` FROM `source`,Map(),false), SparkSqlTransformStep(accu,
             SELECT A.total AS `total`,
                    A.miss AS `miss`,
                    (A.total - A.miss) AS `matched`,
                    coalesce( (A.total - A.miss) / A.total, 1.0) AS `matchedFraction`
             FROM (
               SELECT `__totalCount`.`total` AS total,
                      coalesce(`__missCount`.`miss`, 0) AS miss
               FROM `__totalCount` LEFT JOIN `__missCount`
             ) AS A
         ,Map(),false), MetricWriteStep(accu,accu,DefaultFlattenType,None), RecordWriteStep(__missRecords,__missRecords,None,None)))
     */
    val ruleSteps = ruleParams.flatMap { ruleParam =>
      DQStepBuilder.buildStepOptByRuleParam(context, ruleParam)
    }
    // metric flush step
    val metricFlushStep = MetricFlushStep()
    /**
     * ++ 用于连接两个集合
     * :+ 用于在集合尾部追加集合
     * +: 用于在集合头部追加集合
     */
    DQJob(dsSteps ++ ruleSteps :+ metricFlushStep)
  }

执行

DQStepBuilder.buildStepOptByRuleParam(context, ruleParam)

最终进入到GriffinDslDQStepBuilder的buildSteps方法

def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = {
    val name = getStepName(ruleParam.getOutDfName())
    val rule = ruleParam.getRule
    val dqType = ruleParam.getDqType
    try {
      val result = parser.parseRule(rule, dqType)
      if (result.successful) {
        val expr = result.get
        val expr2DQSteps = Expr2DQSteps(context, expr, ruleParam.replaceOutDfName(name))
        expr2DQSteps.getDQSteps()
      } else {
        warn(s"parse rule [ ${rule} ] fails: \n${result}")
        Nil
      }
    } catch {
      case e: Throwable =>
        error(s"generate rule plan ${name} fails: ${e.getMessage}", e)
        Nil
    }
  }

二 Apache Griffin DSL

官方文档:Apache Griffin DSL Guide
Griffin DSL 是用于 DQ 测量的,是一个类SQL语法,用来描述 DQ 域问题。
实际上,在 Griffin 中,我们得到了 Griffin DSL 规则,将它们转换为 Spark-SQL 规则,以便在 Spark-SQL 引擎中进行计算。
在 DQ 域中,有多个维度,我们需要以不同的方式翻译它们。

准确性【一致性】(Accuracy)

对于准确性,我们需要得到源和目标之间的匹配计数,规则描述了数据源之间的映射关系。Griffin 需要将 DSL 规则转换为多个 SQL 规则。

例如,DSL 规则是“ source.id = target.id and source.name = target.name ",表示精度的匹配条件。翻译后,SQL 规则如下:

  • 从源获取丢失的项目:SELECT source.* FROM source LEFT JOIN target ON
    coalesce(source.id, ‘’) = coalesce(target.id, ‘’) and
    coalesce(source.name, ‘’) = coalesce(target.name, ‘’) WHERE (NOT
    (source.id IS NULL AND source.name IS NULL)) AND (target.id IS NULL
    AND target.name IS NULL), save as table miss_items

  • 获取丢失的总数:SELECT COUNT(*) AS miss FROM miss_items, save as table
    miss_count.

  • 从源表中获取统计的总数:SELECT COUNT(*) AS total FROM source, save as
    tabletotal_count.

  • 获取精度度量:SELECT miss_count.miss AS miss, total_count.total AS total,
    (total_count.total - miss_count.miss) AS matched FROM miss_count FULL
    JOIN total_count, save as table accuracy.

解析后,将数据保存到tableaccuracy中

统计分析(Profiling)

对于统计的分析,请求始终是数据的聚合函数,该规则主要与 SQL 相同,但只是支持select,from,where,group-by,having,order-by,limit子句,它可以描述大多数剖析请求。如果有复杂的请求,可以直接使用 SQL 规则来描述它。
例如,DSL 规则是source.cntry, source.count(), source.age.max() group by source.cntry"表示分析请求。翻译后,SQL 规则如下:

  • SELECT source.cntry, count(source), max(source.age) FROM source GROUP
    BY source.cntry,save as profiling.

翻译之后,度量将被保存在表中profiling.

唯一性【重复性】(Distinctness)

对于唯一性,或称为重复,是为了找出重复的数据项,并将项目计数组按重复的时间卷积。
例如,DSL 规则是 “name,age”,它表示重复的请求,在本例中,源和目标是相同的数据集。翻译后,SQL 规则如下:

  • 数据集总和:SELECT COUNT(*) AS total FROM source, save as table total_count.

  • 根据指定域分组:SELECT name, age, (COUNT(*) - 1) AS dup, TRUE AS dist FROM source GROUP BY name, age, save as table dup_count.

  • 不同度量:SELECT COUNT(*) AS dist_count FROM dup_count WHERE dist, save as table distinct_metric.

  • 源数据关联不同度量的表: SELECT source.*, dup_count.dup AS dup, dup_count.dist AS
    dist FROM source LEFT JOIN dup_count ON source.name = dup_count.name
    AND source.age = dup_count.age, save as table dist_joined.

  • 增加行数: SELECT *, ROW_NUMBER() OVER (DISTRIBUTE BY name, age SORT BY
    dist) row_num FROM dist_joined, save as table row_numbered.

  • 重复记录:SELECT name, age, dup FROM row_numbered WHERE NOT dist OR row_num > 1, save as table dup_records.

  • 重复指标:SELECT name, age, dup, COUNT(*) AS num FROM dup_records GROUP BY name, age, dup, save as table dup_metric.

完整性(Completeness)

对于完整性,需要检查NULL。如果您测量的列为NULL,则它们是不完整的。

  • 获取源数据的总和:SELECT COUNT(*) AS total FROM source, save as table total_count.
  • 不完整指标:SELECT count(*) as incomplete FROM source WHERE NOT (id IS NOT NULL), save as table incomplete_count
  • 完整指标:complete metric: SELECT (source.total - incomplete_count.incomplete) AS complete FROM source LEFT JOIN incomplete_count, save as table complete_count
时效性(Timeliness)

对于及时性,是度量每个项目的延迟,并获得延迟的统计信息。
例如,DSL 规则是 “ts,out _ ts”,第一列表示项的输入时间,第二列表示项的输出时间,如果不设置,“_ _ tmst” 将是默认的输出时间列。翻译后,SQL 规则如下:

  • 获取输入和输出时间列:SELECT *, ts AS _bts, out_ts AS _ets FROM source, save as table origin_time
  • 获取输入时间和输出时间的时间差:get latency: SELECT *, (_ets - _bts) AS latency FROM origin_time, save as table lat.
  • 获取时效性指标:SELECT CAST(AVG(latency) AS BIGINT) AS avg, MAX(latency) AS max, MIN(latency) AS min FROM lat, save as table time_metric.
 类似资料: