最近部门准备做数据质量管理,在调研了社区中常用的几个数据质量管理组件后,最终选取了Griffin作为大数据这边的数据质量管理组件。
本文档为CDH集群集成Griffin集成文档,所以Griffin需要的基础依赖服务,例如HDFS、Hive、Spark 下面将不再额外说明。
Livy csd jar
包放到/opt/cloudera/csd/
目录下,更改文件所有者和所属组都为cloudera-scm
,重启systemctl restart cloudera-scm-server
。es的官方文档很全,而且是中文的,直接按照文档上的说明部署即可,这里附录下我的config/elasticsearch.yml
的配置项:
# es集群名称
cluster.name: enbrands-sky
# 集群访问ip。如果是默认localhost只能通过localhost:9200访问
network.host: 192.168.1.xxx
# es 推荐配置,自动创建索引
action.auto_create_index: .monitoring*,.watches,.triggered_watches,.watcher-history*,.ml*
# 数据目录,用户elasticsearch要有读写权限
path.data: /data/var/lib/elasticsearch/
# 日志目录,用户elasticsearch要有读写权限
path.logs: /data/var/log/elasticsearch/
# 我部署的es是单节点的,选用single-node类型
discovery.type: single-node
# 启用http
http.cors.enabled: true
http.cors.allow-origin: "*"
# 关闭安全校验,内部使用es暂不作安全校验
xpack.security.enabled: false
配置完后,启动es:
./bin/elasticsearch -d -p pid
使用:curl -X GET "http://192.168.1.xxx:9200/?pretty"
检查es服务是否运行正常:
{
"name" : "qTMjIqp",
"cluster_name" : "enbrands-sky",
"cluster_uuid" : "Ci_zl6s9SWiYlBR1PSUz-w",
"version" : {
"number" : "6.4.1",
"build_flavor" : "default",
"build_type" : "tar",
"build_hash" : "e36acdb",
"build_date" : "2018-09-13T22:18:07.696808Z",
"build_snapshot" : false,
"lucene_version" : "7.4.0",
"minimum_wire_compatibility_version" : "5.6.0",
"minimum_index_compatibility_version" : "5.0.0"
},
"tagline" : "You Know, for Search"
}
```
curl -H "Content-Type: application/json" -XPUT http://192.168.1.xxx:9200/griffin -d '
{
"aliases": {},
"mappings": {
"accuracy": {
"properties": {
"name": {
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
},
"type": "text"
},
"tmst": {
"type": "date"
}
}
}
},
"settings": {
"index": {
"number_of_replicas": "2",
"number_of_shards": "5"
}
}
}
'
```
在mysql 中创建 Griffin 用到的库表,新建 数据库 quartz,格式为utf-8,建表语句详见Init_quartz_mysql_innodb.sql
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Apache Griffin port
server.port = 8123
spring.application.name=griffin_service
# MySQL datasource
spring.datasource.url=jdbc:mysql://192.168.1.xxx:3306/quartz?autoReconnect=true&useSSL=false
spring.datasource.username=root
spring.datasource.password=xxx
spring.jpa.generate-ddl=true
#spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
# auto create/update table
spring.jpa.hibernate.ddl-auto=update
spring.jpa.show-sql=true
# Hive metastore
hive.metastore.uris=thrift://192.168.1.xxx:9083
hive.metastore.dbname=metastore
hive.hmshandler.retry.attempts=15
hive.hmshandler.retry.interval=2000ms
#Hive jdbc
hive.jdbc.className=org.apache.hive.jdbc.HiveDriver
hive.jdbc.url=jdbc:hive2://192.168.1.xxx:10000/
hive.need.kerberos=false
hive.keytab.user=xxx@xx.com
hive.keytab.path=/path/to/keytab/file
# Hive cache time
cache.evict.hive.fixedRate.in.milliseconds=900000
# Kafka schema registry
kafka.schema.registry.url=
# Update job instance state at regular intervals
jobInstance.fixedDelay.in.milliseconds=60000
# Expired time of job instance which is 7 days that is 604800000 milliseconds.Time unit only supports milliseconds
jobInstance.expired.milliseconds=604800000
# schedule predicate job every 5 minutes and repeat 12 times at most
#interval time unit s:second m:minute h:hour d:day,only support these four units
predicate.job.interval=5m
predicate.job.repeat.count=12
# external properties directory location
external.config.location=
# external BATCH or STREAMING env
external.env.location=
# login strategy ("default" or "ldap")
login.strategy=default
# ldap
ldap.url=ldap://hostname:port
ldap.email=@example.com
ldap.searchBase=DC=org,DC=example
ldap.searchPattern=(sAMAccountName={0})
# hdfs default name
fs.defaultFS=hdfs://sky-ns
# elasticsearch
elasticsearch.host=192.168.1.xxx
elasticsearch.port=9200
elasticsearch.scheme=http
# elasticsearch.user = user
# elasticsearch.password = password
# livy
livy.uri=http://192.168.1.xxx:8998/batches
livy.need.queue=false
livy.task.max.concurrent.count=20
livy.task.submit.interval.second=3
livy.task.appId.retry.count=3
livy.need.kerberos=false
livy.server.auth.kerberos.principal=livy/kerberos.principal
livy.server.auth.kerberos.keytab=/path/to/livy/keytab/file
# yarn url
yarn.uri=http://192.168.1.xxx:8088
# griffin event listener
internal.event.listeners=GriffinJobEventHook
logging.file=./logs/griffin-service.log
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
org.quartz.scheduler.instanceName=spring-boot-quartz
org.quartz.scheduler.instanceId=AUTO
org.quartz.threadPool.threadCount=5
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
# If you use postgresql as your database,set this property value to org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
# If you use mysql as your database,set this property value to org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# If you use h2 as your database, it's ok to set this property value to StdJDBCDelegate, PostgreSQLDelegate or others
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties=true
org.quartz.jobStore.misfireThreshold=60000
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=true
org.quartz.jobStore.clusterCheckinInterval=20000
{
"file": "hdfs:///griffin/griffin-measure.jar",
"className": "org.apache.griffin.measure.Application",
"queue": "default",
"numExecutors": 2,
"executorCores": 2,
"driverMemory": "1g",
"executorMemory": "4g",
"conf": {
"spark.yarn.dist.files": "hdfs:///home/griffin_spark_conf/hive-site.xml"
},
"files": [
]
}
{
"spark": {
"log.level": "INFO"
},
"sinks": [
{
"name": "console",
"type": "CONSOLE",
"config": {
"max.log.lines": 10
}
},
{
"name": "hdfs",
"type": "HDFS",
"config": {
"path": "hdfs:///griffin/persist",
"max.persist.lines": 10000,
"max.lines.per.file": 10000
}
},
{
"name": "elasticsearch",
"type": "ELASTICSEARCH",
"config": {
"method": "post",
"api": "http://192.168.1.xxx:9200/griffin/accuracy",
"connection.timeout": "1m",
"retry": 10
}
}
],
"griffin.checkpoint": []
}
<build>
<plugins>
<plugin>
<groupId>com.ethlo.persistence.tools</groupId>
<artifactId>eclipselink-maven-plugin</artifactId>
<version>2.7.0</version>
<executions>
<execution>
<phase>process-classes</phase>
<goals>
<goal>weave</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.eclipse.persistence</groupId>
<artifactId>org.eclipse.persistence.jpa</artifactId>
<version>${eclipselink.version}</version>
</dependency>
</dependencies>
</plugin>
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-jar-plugin</artifactId>-->
<!-- <version>3.1.1</version>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <phase>package</phase>-->
<!-- <goals>-->
<!-- <goal>jar</goal>-->
<!-- </goals>-->
<!-- <configuration>-->
<!-- <classifier>lib</classifier>-->
<!-- </configuration>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->
<!-- <plugin>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-maven-plugin</artifactId>-->
<!-- <version>${spring-boot-maven-plugin.version}</version>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <goals>-->
<!-- <goal>build-info</goal>-->
<!-- </goals>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- <configuration>-->
<!-- <executable>false</executable>-->
<!-- </configuration>-->
<!-- </plugin>-->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot-maven-plugin.version}</version>
<configuration>
<mainClass>org.apache.griffin.core.GriffinWebApplication</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<skipAssembly>false</skipAssembly>
<outputDirectory>../target</outputDirectory>
<descriptors>
<descriptor>src/main/resources/assembly/assembly.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
</plugins>
</build>
// 在 initHiveMetastoreClient 方法中,添加:
// 添加配置项,解决错误:HiveMetaStoreClient.class#open#UserGroupInformation获取
//用户和组信息错误,关闭metastore操作定义用户和组权限。不然无法连接上metastore
hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, false);
修改 字段长度:
// @Column(length = 20480)
@Column(length = 20000)
private String config;
使用命令打包:
mvn -Dmaven.test.skip=true clean install
将 measure\target\measure-0.6.0.jar
上传到 hdfs 上hadoop fs -put measure-0.6.0.jar /griffin/griffin-measure.jar
,避免出现spark在yarn集群上执行任务时,需要到HDFS的/griffin目录下加载griffin-measure.jar,避免发生类org.apache.griffin.measure.Application找不到的错误。
把service启动jar复制到服务器上,启动服务:
nohup java -jar service-0.6.0.jar>service.out 2>&1 &
访问http://192.168.1.xxx:8123/#/measures
即可:
登录的用户名:admin
登录的密码:admin
使用文档参考官方文档:user-guide.md
2022-05-19 :在使用 Measure
统计 count
、 distinct count
时,有时候,会发现 distinct count
的值要大于 count
的值,结果明显是有问题的(因为去重后的结果会小于等于没去重的)。经过查看 Griffin
的源代码,发现在统计 distinct count
时使用的是Spark
的估算
函数 approx_count_distinct
,性能要比distinct count
,就是结果会不太准确。。。