在做数据ETL的时候,可能会遇到这样的情景: 需要对某个每天新增原始数据的表,进行数据清理,提取出有价值的数据内容,然后重新下沉到另一张表中,也就是一个初步的ETL过程。由于需要处理的历史数据可能是多天的,所以需要我们编写脚本来去指定日期,自动运行。而不用我们每天都要去手动执行一遍。
本次使用的数据库是Clickhouse,因此会借助一些ck中的函数来方便我们的处理。具体的相关CK函数的内容,可以参考:
首先列出整体的代码内容:
#!/bin/bash
startDate=$1
endDate=$2
####log_correct函数打印正确的输出到日志文件
function log_correct () {
DATE=`date "+%Y-%m-%d %H:%M:%S"` ####显示打印日志的时间
USER=$(whoami) ####那个用户在操作
echo "${DATE} ${USER} execute $0 [INFO] $@" ######($0脚本本身,$@将参数作为整体传输调用)
}
function log_error ()
{
DATE=`date "+%Y-%m-%d %H:%M:%S"`
USER=$(whoami)
echo "${DATE} ${USER} execute $0 [ERROR] $@"
}
log_correct "执行的开始日期为: ${startDate}"
log_correct "执行的结束日期为: ${endDate}"
if [ $# -eq 2 ];
then
############ execute sink clickhouse #############
while :
do
start_date="$startDate 00:00:00"
end_date="$startDate 23:59:59"
log_correct "正在执行${startDate}日期的数据"
sql="
insert into countly_device_info_i_r_local
select app_key as appKey,
device_id as deviceId,
log_time as nginxTime,
visitParamExtractRaw(data, 'rooted') as rooted,
visitParamExtractString(data, 'sys_name') as sys_name,
visitParamExtractString(data, 'carrier') as carrier,
visitParamExtractString(data, 'sys_version') as sys_version,
visitParamExtractString(data, 'name') as name,
visitParamExtractString(data, 'phone_no') as phone_no,
from
(
select app_key, device_id, log_time, trim(BOTH '\"' FROM extract(data, '\"{.*}\"')) as data
from
(
select app_key, device_id, log_time, data from countly_simple where log_time>="\'${start_date}\'" and log_time<="\'${end_date}\'" and data like '%devInfo%'
) tmp where and isValidJSON(data)=1
) tmp2;
"
clickhouse-client --multiline -u default -h 10.105.220.210 --password ch20482048 --query "${sql}" 2>>/home/webedit/sinkLog.txt
############ Check whether the SQL statement is successfully executed #############
exitCode=$?
if [ $exitCode -ne 0 ];
then
log_error "sink execute is failed!!! ${sql}"
exit $exitCode
else
log_correct "${startDate}日期的数据输入导入完成}"
fi
if [[ ${startDate} -eq ${endDate} ]];
then
break
fi
startDate=$(date -d "${startDate} 1 days" "+%Y-%m-%d")
done
else
log_error "请输入执行的开始时间和结束时间,输入的两个日期参数格式必须是yyyy-MM-dd"
fi
首先利用定义了log函数来对脚本运行过程中的信息进行输出
利用while循环来遍历需要处理的日期区间。由于我数据表的日期字段是DateTime类型,所以需要添加时分秒的信息。
接下来,编写SQL。
首先内层SQL限定了所需要处理的时间日期区间的数据,以及筛选出data字段具有特定格式的数据条目。
然后外层SQL借助了Clickhouse的字符串查找函数extract
,利用了正则表达式,匹配出JSON格式的数据部分。然后利用trim
函数,做数据清理,删除掉JSON格式开头和结尾的大括号前后的"
。同时,做了一些限定,比如isValidJSON(data)=1
表示只选取清理后的data字段是JSON格式的数据条目。
保证了得到的每条数据条目都是JSON格式的数据后,利用Clickhouse的JSON函数提取出需要获取的字段,value为String类型的字段使用visitParamExtractString
,由于rooted
字段的value是Boolean类型,所以使用visitParamExtractRaw
函数。
最后利用insert into [table] select ...
的方式,将etl清理好的数据导入到新表中。
执行过程如下所示:
[webedit@bigdata-client-m220-106 ~]$ sh shelltest.sh 2021-12-17 2021-12-20
2021-12-20 17:44:45 webedit execute shelltest.sh [INFO] 执行的开始日期为: 2021-12-17
2021-12-20 17:44:45 webedit execute shelltest.sh [INFO] 执行的结束日期为: 2021-12-20
2021-12-20 17:44:45 webedit execute shelltest.sh [INFO] 正在执行2021-12-17日期的数据
2021-12-20 17:44:48 webedit execute shelltest.sh [INFO] 2021-12-17日期的数据输入导入完成}
2021-12-20 17:44:48 webedit execute shelltest.sh [INFO] 正在执行2021-12-18日期的数据
2021-12-20 17:44:50 webedit execute shelltest.sh [INFO] 2021-12-18日期的数据输入导入完成}
2021-12-20 17:44:50 webedit execute shelltest.sh [INFO] 正在执行2021-12-19日期的数据
2021-12-20 17:44:51 webedit execute shelltest.sh [INFO] 2021-12-19日期的数据输入导入完成}
2021-12-20 17:44:51 webedit execute shelltest.sh [INFO] 正在执行2021-12-20日期的数据
2021-12-20 17:44:52 webedit execute shelltest.sh [INFO] 2021-12-20日期的数据输入导入完成}
查看对应的Clickhouse数据库,可以发现数据导入到对应的数据表中。Clickhouse本身的速度确实很快,千万级别的数据也是几秒就能实现查询和插入了。