翻译(zh-CN):Apache Griffin DSL

许博易
2023-12-01

目的

Griffin DSL 是为 DQ 测量而设计的,作为一种类似 SQL 的语言,它试图描述 DQ 请求。

包括各种测量类型:

  • a Accuracy
  • profiling
  • uniqueness
  • timeliness

包括各种数据源类型:

  • batch
    • hive
    • avro
  • streaming
    • kafka

包括各种数据格式类型:

  • Structured data
    • hive table
    • json string
    • avro
  • Unstructured data

     

设计理念

Griffin DSL 类似 SQL,是为适应 Apache Griffin 用户情况而定制的。

基本上,要计算数据精度(accuracy)质量度量,用户只需要提供像 WHERE 子句这样的比较规则

source.uid = target.uid and source.itemid = target.itemid and source.timestamp =target.timestamp

Apache Griffin 将为用户计算出度量结果。

要计算数据质量分析(profiling)度量,用户只需要提供类似 SQL 的规则(其中关键字 “select” 和 “from” 子句可以被忽略)

source.id.count(), source.age.max() where source.age > 10

 

示例

例1。 在不同的源下计算(source & target)

source.id = target.id AND source.name = target.name AND source.age = target.age

例2。 单个源下计算(source)

SELECT source.country, source.id.count() AS count, source.age.avg() AS avg_age FROM source GROUP BY source.country SORT BY count DESC LIMIT 5

 

语法范式(Syntax BNF)

-- literal --
<literal> ::= <literal-string> | <literal-number> | <literal-time> | <literal-boolean> | <literal-null> | <literal-nan>
<literal-string> ::= <any-string>
<literal-number> ::= <integer> | <double>
<literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms")
<literal-boolean> ::= true | false
<literal-null> ::= null
<literal-nan> ::= nan

-- selection --
<selection> ::= <selection-head> [ <field-sel> | <index-sel> | <function-sel> ]* [<as-alias>]?
<selection-head> ::= ("data source name registered") | <function> | <field-name> | <all-selection>
<field-sel> ::= "." <field-name> | "[" <quote-field-name> "]"
<index-sel> ::= "[" <arg> "]"
<function-sel> ::= "." <function-name> "(" [<arg>]? [, <arg>]* ")"
<arg> ::= <math-expr>

-- as alias --
<as-alias> ::= <as> <field-name>

-- math expr --
<math-factor> ::= <literal> | <function> | <selection> | "(" <math-expr> ")" [<as-alias>]?
<unary-math-expr> ::= [<unary-opr>]* <math-factor>
<binary-math-expr> ::= <unary-math-expr> [<binary-opr> <unary-math-expr>]+
<math-expr> ::= <binary-math-expr>

-- logical expr --
<in-expr> ::= <math-expr> [<not>]? <in> <range-expr>
<between-expr> ::= <math-expr> [<not>]? <between> (<math-expr> <and> <math-expr> | <range-expr>)
<range-expr> ::= "(" [<math-expr>]? [, <math-expr>]+ ")"
<like-expr> ::= <math-expr> [<not>]? <like> <math-expr>
<is-null-expr> ::= <math-expr> <is> [<not>]? <null>
<is-nan-expr> ::= <math-expr> <is> [<not>]? <nan>
<logical-factor> ::= <math-expr> | <in-expr> | <between-expr> | <like-expr> | <is-null-expr> | <is-nan-expr> | "(" <logical-expr> ")" [<as-alias>]?
<unary-logical-expr> ::= [<unary-logical-opr>]* <logical-factor>
<binary-logical-expr> ::= <unary-logical-expr> [<binary-logical-opr> <unary-logical-expr>]+
<logical-expr> ::= <binary-logical-expr>

-- expression --
<expr> = <math-expr> | <logical-expr>

-- function expr --
<function> ::= <function-name> "(" [<arg>] [, <arg>]+ ")" [<as-alias>]?
<function-name> ::= ("function name registered")
<arg> ::= <expr>

-- clauses --
<select-clause> = <expr> [, <expr>]*
<where-clause> = <where> <expr>
<from-clause> = <from> ("data source name registered")
<having-clause> = <having> <expr>
<groupby-clause> = <group> <by> <expr> [ <having-clause> ]?
<orderby-item> = <expr> [ <DESC> ]?
<orderby-clause> = <order> <by> <orderby-item> [ , <orderby-item> ]*
<limit-clause> = <limit> <expr>

-- combined clauses --
<combined-clauses> = <select-clause> [ <from-clause> ]+ [ <where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+

 

语法描述

A 支持过程

逻辑操作:not, and, or, in, between, like, rlike, is null, is nan, =, !=, <>, <=, >=, <, >
数学运算:+, -, *, /, %
SQL 语句:as, where, group by, having, order by, limit

B 关键字

- null, nan, true, false
- not, and, or
- in, between, like, is
- select, distinct, from, as, where, group, by, having, order, desc, asc, limit

C 算子

- !, &&, ||, =, !=, <, >, <=, >=, <>
- +, -, *, /, %
- (, )
- ., [, ]

D 字词

- string: 用一对“” 或者 ‘’ 包围,如果有包含字符串的请求,则使用转义字符 \  
	e.g. "test", 'string 1', "hello \" world \" "
- number: double 或者 integer 数字.  
	e.g. 123, 33.5
- time: 字符串中单位的整数将被转换为以毫秒为单位的整数  
	e.g. 3d, 5h, 4ms
- boolean: 真 或者 假.  
	e.g. true, false

E 选择

- selection head: 数据源名称.
	e.g. source, target, `my table name`
- all field selection: *或前面有数据源名称.
	e.g. *, source.*, target.*
- field selection: 字段名称 或者 数据源名称.字段名称
	e.g. source.age, target.name, user_id
- index selection: 字段名称.[] ,方括号之内填写整数,.
	e.g. source.attributes[3]
- function selection: 带括号“()”的函数名,字段名称在前面或后面.
	e.g. count(*), *.count(), source.user_id.count(), max(source.age)
- alias: 声明别名
	e.g. source.user_id as id, target.user_name as name

F 数学表达式

- math factor: 带括号的文字或功能或选择或数学表达.
	e.g. 123, max(1, 2, 3, 4), source.age, (source.age + 13)
- unary math expression: 有因子的一元数学运算符.
	e.g. -(100 - source.score)
- binary math expression: 二进制数学运算符的数学因子.
	e.g. source.age + 13, score * 2 + ratio

G 逻辑表达式

- in: 像sql一样的 in 子句.
	e.g. source.country in ("USA", "CHN", "RSA")
- between:像sql一样的between子句.
	e.g. source.age between 3 and 30, source.age between (3, 30)
- like:像sql一样的 like 子句.
	e.g. source.name like "%abc%"
- is null:sql一样的 is null 子句
	e.g. source.desc is not null
- is nan: 检查是否为数字,语法类似 is null
	e.g. source.age is not nan
- logical factor: 括号内书写 数学表达式,逻辑表达式或其他的合法表达式。
	e.g. (source.user_id = target.user_id AND source.age > target.age)
- unary logical expression: 具有因子的一元逻辑算子.
	e.g. NOT source.has_data, !(source.age = target.age)
- binary logical expression: 具有二元逻辑运算符的逻辑因子,包括和,或和比较运算符.
	e.g. source.age = target.age OR source.ticket = target.tck

h 表达式

- expression:逻辑表达和数学表达.

i 函数 

- argument: 表达.
- function: 括号内带参数的函数.
	e.g. max(source.age, target.age), count(*)

J 条款

- select clause: 结果列如sql select子句,我们可以忽略Griffin DSL中的“select”这个词。.
	e.g. select user_id.count(), age.max() as max, source.user_id.count() as cnt, source.age.min()
- from clause: 表名如sql from子句,其中数据源名称必须是数据源名称之一或前一规则步骤的输出表名称,我们可以通过配置数据源名称来忽略此子句。
	e.g. from source, from `target`
- where clause: 过滤条件如sql where子句,可选.
	e.g. where source.id = target.id and source.age = target.age
- group-by clause: 像sql中的group-by子句一样,可选。 可选的having子句可以遵循.
	e.g. group by cntry, group by gender having count(*) > 50
- order-by clause: 类似 order-by 语句, 可选.
	e.g. order by name, order by first_name desc, age asc
- limit clause: 类似sql中 limit 语句, 可选.
	e.g. limit 5

K 精度规则 

- Griffin DSL中的准确性规则表达是一种逻辑表达式,它告诉数据源之间的映射关系.
	e.g. source.id = target.id and source.name = target.name and source.age between (target.age, target.age + 5

L 型剖面法 

- Griffin DSL中的分析规则表达式是一个类似sql的表达式,前面有select子句,后跟可选的from子句,where子句,group-by子句,order-by子句,limit子句顺序.
	e.g. source.gender, source.id.count() where source.age > 20 group by source.gender, select country, max(age), min(age), count(*) as cnt from source group by country order by cnt desc limit 5

M 唯一性规则 

- Griffin DSL中的唯一性规则表达式是由逗号分隔的选择表达式列表,表示要测量的重复列.
	e.g. name, (age + 1) as next_age

n 时效性规则 

- Griffin DSL中的时效性规则表达式是由逗号分隔的选择表达式列表,表示输入时间和输出时间(如果未设置则计算时间为默认值).
	e.g. ts

 

Griffin DSL 到 SQL 的翻译

Griffin DSL 是用于 DQ 测量的,用来描述 DQ 域问题。
实际上,在 Griffin 中,我们得到了 Griffin DSL 规则,将它们转换为 Spark-SQL 规则,以便在 Spark-SQL 引擎中进行计算。
在 DQ 域中,有多个维度,我们需要以不同的方式翻译它们。

准确度

为了准确起见,我们需要得到源和目标之间的匹配计数,规则描述了数据源之间的映射关系。Griffin 需要将 DSL 规则转换为多个 SQL 规则。
例如,DSL 规则是“ source.id = target.id and source.name = target.name ",表示精度的匹配条件。翻译后,SQL 规则如下:

  • 从源获取丢失的项目:SELECT source.* FROM source LEFT JOIN target ON coalesce(source.id, '') = coalesce(target.id, '') and coalesce(source.name, '') = coalesce(target.name, '') WHERE (NOT (source.id IS NULL AND source.name IS NULL)) AND (target.id IS NULL AND target.name IS NULL), save as table miss_items
  • 数到小姐的数目:SELECT COUNT(*) AS miss FROM miss_items, save as table miss_count.
  • 从源获取总计数:SELECT COUNT(*) AS total FROM source, save as tabletotal_count.
  • 获取精度度量:SELECT miss_count.miss AS miss, total_count.total AS total, (total_count.total - miss_count.miss) AS matched FROM miss_count FULL JOIN total_count, save as table accuracy.

解析后,指标将保持在tableaccuracy中.

B 型剖面法

对于概要分析,请求始终是数据的聚合函数,该规则主要与 SQL 相同,但只是支持select,from,where,group-by,having,order-by,limit子句,它可以描述大多数剖析请求。如果有复杂的请求,可以直接使用 SQL 规则来描述它。
例如,DSL 规则是source.cntry, source.count(), source.age.max() group by source.cntry"表示分析请求。翻译后,SQL 规则如下:

  • profiling sql rule:SELECT source.cntry, count(source), max(source.age) FROM source GROUP BY source.cntry,save as profiling.

翻译之后,度量将被保存在表中profiling.

C 唯一性

对于唯一性,或称为重复,是为了找出重复的数据项,并将项目计数组按重复的时间卷积。
例如,DSL 规则是 “name,age”,它表示重复的请求,在本例中,源和目标是相同的数据集。翻译后,SQL 规则如下:

  • total count of source: SELECT COUNT(*) AS total FROM source, save as table total_count.
  • group by fields: SELECT name, age, (COUNT(*) - 1) AS dup, TRUE AS dist FROM source GROUP BY name, age, save as table dup_count.
  • distinct metric: SELECT COUNT(*) AS dist_count FROM dup_count WHERE dist, save as table distinct_metric.
  • source join distinct metric: SELECT source.*, dup_count.dup AS dup, dup_count.dist AS dist FROM source LEFT JOIN dup_count ON source.name = dup_count.name AND source.age = dup_count.age, save as table dist_joined.
  • add row number: SELECT *, ROW_NUMBER() OVER (DISTRIBUTE BY name, age SORT BY dist) row_num FROM dist_joined, save as table row_numbered.
  • duplicate records: SELECT name, age, dup FROM row_numbered WHERE NOT dist OR row_num > 1, save as table dup_records.
  • duplicate metric: SELECT name, age, dup, COUNT(*) AS num FROM dup_records GROUP BY name, age, dup, save as table dup_metric.

翻译之后,度量将保存在表 dup _ metrics 中。

时效性

对于及时性,是度量每个项目的延迟,并获得延迟的统计信息。
例如,DSL 规则是 “ts,out _ ts”,第一列表示项的输入时间,第二列表示项的输出时间,如果不设置,“_ _ tmst” 将是默认的输出时间列。翻译后,SQL 规则如下:

  • total count of source: SELECT COUNT(*) AS total FROM source, save as table total_count.
  • incomplete metric: SELECT count(*) as incomplete FROM source WHERE NOT (id IS NOT NULL), save as table incomplete_count.
  • complete metric: SELECT (source.total - incomplete_count.incomplete) AS complete FROM source LEFT JOIN incomplete_count, save as table complete_count.

翻译之后,度量将保存在 table time _ metrics 中。

 

替代规则

您可以简单地使用 Griffin DSL 规则来描述 DQ 域中的问题,对于某些复杂的需求,还可以使用 Griffin 支持的其他规则。

Spark SQL
Griffin 直接支持 Spark-SQL,您可以这样用 SQL 编写规则:

{
	"dsl.type": "spark-sql",
	"name": "source",
	"rule": "SELECT count(id) AS cnt, max(timestamp) AS fresh_time FROM source"
}

Griffin 将直接在 Spark-SQL 引擎中计算它。

B 数据帧操作
Griffin 支持在 Spark 中的数据框架上的一些其他操作,比如将 JSON 字符串数据框架转换为提取出的对象模式的数据框架。例如:

{
	"dsl.type": "df-opr",
	"name": "ext_source",
	"rule": "from_json",
	"details": {
		"df.name": "json_source"
	}
}

Griffin 将执行提取 JSON 字符串的操作。
实际上,您还可以在 Griffin 中扩展 DF-OPR 引擎和 DF-OPR 适配器,以支持更多类型的数据帧操作。

 

小贴士

Griffin 引擎在 Spark 上运行,它可能工作在两个阶段,Pre-Proc 阶段和 Run 阶段。

前期

Griffin 直接计算数据源,以获得适当的数据格式,作为 DQ 计算的准备工作。在此阶段,您可以使用 df-opr 和 spark-sql 规则。
准备好之后,为了支持流 DQ 计算,将在每一行数据中添加一个时间戳列,因此运行阶段的数据帧包含一个额外的列,名为 “_ _ tmst”。

B 运行阶段

Griffin 用准备好的数据来计算 DQ 度量。在此阶段,您可以使用 Griffin-DSL、Spark-SQL 规则和部分 DF-OPR 规则。
对于 Griffin-DSL 规则,Griffin 将其转换为 Spark-SQL 规则,其中包含列 “_ _ tmst” 的分组条件,这对于流 DQ 计算特别有用。但是对于 Spark-SQL 规则,Griffin 直接使用它,您需要在 Spark-SQL 规则中显式地添加 “_ _ tmst” 列,否则计算后无法得到正确的度量结果。

 

 

 类似资料: