___ ____ _____ _ _ __ _ _ _
/ __| |_ / |_ _| ___ | |__ (_) / _` | __| | __ _ | |_ __ _
\__ \ / / | | |___| | '_ \ | | \__, | / _` | / _` | | _| / _` |
|___/ /___| _|_|_ _____ |_.__/ _|_|_ |___/ \__,_| \__,_| _\__| \__,_|
_|"""""|_|"""""|_|"""""|_| |_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|_|"""""|
"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'"`-0-0-'
原图 .file/.doc/SZT-bigdata-2.png
数字标记不分先后顺序,对应代码:
1-cn.java666.sztcommon.util.SZTData
2-cn.java666.etlflink.app.Jsons2Redis
3-cn.java666.etlspringboot.controller.RedisController#get
4-cn.java666.etlflink.app.Redis2ES
5-cn.java666.etlflink.app.Redis2Csv
6-Hive sql 脚本(开发维护成本最低)
7-Saprk 程序(开发维护成本最高,但是功能更强)
8-HUE 方便查询和展示 Hive 数据
9-cn.java666.etlflink.app.Redis2HBase
10、14-cn.java666.szthbase.controller.KafkaListen#sink2Hbase
11-cn.java666.etlflink.app.Redis2HBase
12-CDH HDFS+HUE+Hbase+Hive 一站式查询
13-cn.java666.etlflink.app.Redis2Kafka
15-cn.java666.sztflink.realtime.Kafka2MyCH
16-cn.java666.sztflink.realtime.sink.MyClickhouseSinkFun
下一步,计划开发数据湖中台解决方案
我发现越来越多的国产开源软件用户体验值得肯定。。。
以下是我的开发环境,仅作参考:
如果你选用原版 Apache 组件搭建大数据集群,那么你会有踩不完的坑。我的头发不够掉了,所以我选 CDH!!!
以上软件分开部署在我的三台电脑上,Win10 笔记本 VMware + Win10 台式机 VMware + 古董笔记本 CentOS7。物理机全都配置 SSD + 千兆以太网卡,HDFS 需要最快的网卡。好马配好鞍,当然你得有个千兆交换机配合千兆网线,木桶原理警告!!!
如果你想避免网线牵来牵去,可以采用电力猫实现分布式家庭组网方案;
理论上可以当作实时数据,但是这个接口响应太慢了,如果采用 kafka 队列方式,也可以模拟出实时效果。
本项目采用离线 + 实时思路 多种方案处理。
准备好 java、scala、大数据开发常用的环境,比如 IDEA、VMware 虚拟机、CDH等,然后手机静音盖上,跟我一起左手画个龙,右手划一道彩虹,开始表演吧
��
https://opendata.sz.gov.cn/data/api/toApiDetails/29200_00403601
cn.java666.etlspringboot.source.SZTData#saveData
获取原始数据存盘 /tmp/szt-data/szt-data-page.jsons
,核对数据量 1337,注意这里每条数据包含1000条子数据;cn.java666.etlflink.sink.RedisSinkPageJson#main
实现 etl 清洗,去除重复数据,redis 天然去重排序,保证数据干净有序,跑完后核对 redis 数据量 1337。hget szt:pageJson 1
cn.java666.etlspringboot.EtlSApp#main
启动后,也可以用 knife4j 在线调试 REST API:cn.java666.etlflink.source.MyRedisSourceFun#run
清洗数据发现 133.7 万数据中,有小部分源数据字段数为9,缺少两个字段:station、car_no;丢弃脏数据。合格源数据示例:
{
"deal_date": "2018-08-31 21:15:55",
"close_date": "2018-09-01 00:00:00",
"card_no": "CBHGDEEJB",
"deal_value": "0",
"deal_type": "地铁入站",
"company_name": "地铁五号线",
"car_no": "IGT-104",
"station": "布吉",
"conn_mark": "0",
"deal_money": "0",
"equ_no": "263032104"
}
不合格的源数据示例:
{
"deal_date": "2018-09-01 05:24:22",
"close_date": "2018-09-01 00:00:00",
"card_no": "HHAAABGEH",
"deal_value": "0",
"deal_type": "地铁入站",
"company_name": "地铁一号线",
"conn_mark": "0",
"deal_money": "0",
"equ_no": "268005140"
}
cn.java666.etlflink.app.Redis2Kafka#main
根据需求推送满足业务要求的源数据到 kafka,topic-flink-szt-all
保留了所有源数据 1337000 条, topic-flink-szt
仅包含清洗合格的源数据 1266039 条。ksql 命令查询: select * from "topic-flink-szt" where "partition" in (0) limit 1000
cn.java666.etlflink.app.Redis2Csv#main
实现了 flink sink csv 格式文件,并且支持按天分块保存。cn.java666.etlflink.app.Redis2ES#main
实现了 ES 存储源数据。实现实时全文检索,实时跟踪深圳通刷卡数据。这个模块涉及技术细节比较多,如果没有 ES 使用经验,可以先做下功课,不然的话会很懵。
我之前在处理 ES 各种问题踩了不少坑,熬了不少通宵,掉了很多头发。
遇到问题心态要稳,因为你今天处理了一个问题,明天接触新的版本新的框架大概率又会出现新的问题。。
所以最佳实践很重要!!!
�� �� �� 这部分内容有更新:修正了上一个版本时区问题。
2018-09-01 00:00:00.000~2018-09-01 23:59:59.999
,看看当天深圳通刷卡记录的统计图曲线走向是否科学,间接验证数据源的完整性。
修正时区后统计数量,字段完整的合格源数据 1266039 条,2018-09-01全天 1229180 条。
图中可以看出 2018-09-01 这一天刷卡记录集中在上午6点~12点之间,早高峰数据比较吻合,虽然这一天是周六,高峰期不是特别明显。我们继续缩放 kibana 时间轴看看更详细的曲线:
回顾一下本项目 ETL 处理流程:
1337000 条源数据清洗去除字段不全的脏数据,剩余的合格数据条数 1266039 已经进入 ES 索引
szt-data
在 1266039 条合格数据中,有 1227234 条数据集中在 2018-09-01 这一天的上午时段;
我们暂且相信上午时段的数据是真实的,那么是否说明官方提供的数据并不是全部的当天完整刷卡数据???
如果按照上午的刷卡量来估测全天的刷卡量,考虑到是周六,那么深圳通全天的刷卡记录数据应该在 122万 X 2 左右,当然这么武断的判断方式不是程序员的风格,接下来我们用科学的大数据分析方式来研究这些数据背后的意义。
注意,ES 大坑:
{
"properties": {
"deal_date": {
"format": "yyyy-MM-dd HH����ss",
"type": "date"
}
}
}
这里并没有指定时区信息,但是 ES 默认使用 0 时区,这个软件很坑,无法设置全局默认时区。但是很多软件产生的数据都是默认机器所在时区,国内就是东八区。因为我们的源始数据本身也没有包含时区信息,这里我不想改源数据,那就假装自己在 ES 的 0 时区。同时需要修改 kibana 默认时区为 UTC,才可以保证 kibana 索引图表时间轴正确对位。不过这并不是一个科学的解决方案。
如果是企业项目,必须要用数据质量监控软件!!!要不然得有多少背锅侠要杀去祭天
日志当中卡号脱敏字段密文反解猜想:
由脱敏的密文卡号反推真实卡号,因为所有卡号密文当中没有J开头的数据,但是有A开头的数据,A != 0,而且出现了 BCDEFGHIJ 没有 K,所以猜想卡号映射关系如图!!!
类似摩斯电码解密。。。我现在还不确定这个解密方式是否正确
cn.java666.sztcommon.util.ParseCardNo#parse
实现了支持自动识别卡号明文和密文、一键互转功能。 cn.java666.etlspringboot.controller.CardController#get
实现了卡号明文和密文互转 REST API。确定业务流程 ---> 声明粒度 ---> 确定维度 ---> 确定事实
参考行业通用的数仓分层模式:ODS、DWD、DWS、ADS,虽然原始数据很简单,但是我们依然使用规范的流程设计数据仓库。
ods/ods_szt_data/day=2018-09-01/
# szt_szt_page/day=2018-09-01/
dwd_fact_szt_in_detail 进站事实详情表
dwd_fact_szt_out_detail 出站事实详情表
dwd_fact_szt_in_out_detail 地铁进出站总表
dws_card_record_day_wide 每卡每日行程记录宽表【单卡单日所有出行记录】
【体现进站压力】 每站进站人次排行榜
ads_in_station_day_top
【体现出站压力】 每站出站人次排行榜
ads_out_station_day_top
【体现进出站压力】 每站进出站人次排行榜
ads_in_out_station_day_top
【体现通勤车费最多】 每卡日消费排行
ads_card_deal_day_top
【体现线路运输贡献度】 每线路单日运输乘客总次数排行榜,进站算一次,出站并且联程算一次
ads_line_send_passengers_day_top
【体现利用率最高的车站区间】 每日运输乘客最多的车站区间排行榜
ads_stations_send_passengers_day_top
【体现线路的平均通勤时间,运输效率】 每条线路单程直达乘客耗时平均值排行榜
ads_line_single_ride_average_time_day_top
【体现深圳地铁全市乘客平均通勤时间】 所有乘客从上车到下车间隔时间平均值
ads_all_passengers_single_ride_spend_time_average
【体现通勤时间最长的乘客】 单日从上车到下车间隔时间排行榜
ads_passenger_spend_time_day_top
【体现车站配置】 每个站点进出站闸机数量排行榜
每个站点入站闸机数量 ads_station_in_equ_num_top
每个站点出站闸机数量 ads_station_out_equ_num_top
【体现各线路综合服务水平】 各线路进出站闸机数排行榜
各线路进站闸机数排行榜 ads_line_in_equ_num_top.png
各线路出站闸机数排行榜 ads_line_out_equ_num_top
【体现收入最多的车站】 出站交易收入排行榜
ads_station_deal_day_top
【体现收入最多的线路】 出站交易所在线路收入排行榜
ads_line_deal_day_top
【体现换乘比例、乘车体验】 每天每线路换乘出站乘客百分比排行榜
ads_conn_ratio_day_top
【体现每条线的深圳通乘车卡普及程度 9.5 折优惠】 出站交易优惠人数百分比排行榜
ads_line_sale_ratio_top
【体现换乘的心酸】 换乘耗时最久的乘客排行榜
ads_conn_spend_time_top
【体现线路拥挤程度】 上车以后还没下车,每分钟、小时每条线在线人数
ads_on_line_min_top
hdfs 关闭权限检查。hive 设置保存目录 /warehouse;
hue 创建 hue 用户,赋予超级组。hue 切换到 hue 用户,执行 hive sql 建库 szt;
库下面建目录 ods dwd dws ads;
上传原始数据到 /warehouse/szt.db/ods/
szt-etl-data.csv szt-etl-data_2018-09-01.csv szt-page.jsons
查看: hdfs dfs -ls -h hdfs://cdh231:8020/warehouse/szt.db/ods/
接下来使用 HUE 按照 sql/hive.sql
依次执行 HQL 语句.....
也可以使用 IDEA Database 工具栏操作,附送idea cdh hive 完美驱动 https://github.com/timveil/hive-jdbc-uber-jar/releases:
也可以使用 DBeaver (我只想说, 上古产品 Sqlyog、navicat、heidisql、workbench 全都是战五渣),因为有时候复杂的查询可以一边执行一边在另一个客户端工具查看结果,这对于复杂的嵌套查询 debug 非常有助于分析和跟踪问题。DBeaver 客户端自带图表,不过没有 HUE 好看:
已经完成的指标分析:
2018-09-01,当天依次为:五和、布吉、丹竹头,数据说明当天这几个站点进站人数最多。
2018-09-01,当天出站乘客主要去向分别为:深圳北高铁站、罗湖火车站、福田口岸。
2018-09-01,当天车站吞吐量排行榜:
五和站???、布吉站(深圳东火车站)、罗湖站(深圳火车站)、深圳北(深圳北高铁站)。。。
五和站为什么这么秀???
2018-09-01,当天车费最高的乘客花了 48 元人民币
2018-09-01,当天五号线客运量遥遥领先,龙岗线碾压一号线,心疼龙岗人民!
2018-09-01当天前三名分别是:赤尾>华强北,福民>福田口岸,五和>深圳北
2018-09-01,当天五号线单程直达乘客平均耗时1500s,约合25分钟,平均值最长的是 11号线,平均耗时 40 分钟
2018-09-01,当天所有乘客通勤时间平均值 1791 s,约合 30 分钟
2018-09-01,当天所有乘客通勤时间排行榜,站内滞留最久的乘客间隔 17123 秒,约合 4.75 小时,实际情况只需要 20 分钟车程,难道是进站搞事情???
2018-09-01,当天福田站双项第一
2018-09-01,当天深圳地铁一号线长脸了@_@,两个指标都是第一,港铁四号线全部垫底,后妈养的???
2018-09-01,当天上午深圳北站收入 4 万元人民币,排名第一
2018-09-01,数据显示一号线依然是深圳地铁最多收入的线路,1号线上午收入 30 万元人民币,其次是五号线紧随其后
换乘后从五号线出来的乘客是占比最高的 15.6%,从九号线出站的乘客,换乘比例最低,仅 9.42%
目前可以确定的是,持有深圳通地铁卡可以享受9.5折优惠乘坐地铁,从统计结果看,2018-09-01当天,七号线使用地铁卡优惠的乘客人次占比最高,达到 90.36%,排名最低的是五号线,占比 84.3%
统计过程发现难以理解的现象,有几个乘客进站以后,没有刷卡出站就换乘了公交车,于是出现了同一个地铁站进出站,但是标记为联程的记录
SZT-kafka-hbase project for Spring Boot2
看过开源的 spring-boot-starter-hbase、spring-data-hadoop-hbase,基础依赖过于老旧,长期不更新;引入过程繁琐,而且 API 粒度受限;数据库连接没有复用,导致数据库服务读写成本太高。
于是自己实现了 hbase-2.1 + springboot-2.1.13 + kafka-2.0 的集成,一个长会话完成 hbase 连续的增删改查
主要特色:
knife4j 在线调试,点击鼠标即可完成 hbase 写入和查询,再也不用记住繁琐的命令
hbase 列族版本历史设置为 10,支持配置文件级别的修改。可以查询某卡号最近 10 次交易记录。
hbase rowkey 设计为卡号反转,使得字典排序过程消耗的服务器算力在分布式环境更加均衡。
全自动的建库建表【本项目的 hbase 命名空间为 szt】,实现幂等操作,无需担心 hbase 数据库的污染。
效果展示:
api-debug,随便写点东西进去,狂点发送。能写多快就考验你的手速了
hbase-shell 命令:
全表扫描,返回十个版本格式化为字符串显示,压榨服务器性能的时候到啦!!!
scan 'szt:data', {FORMATTER => 'toString',VERSIONS=>10}
cn.java666.etlflink.app.Redis2Kafka
,生产消息,适当调慢生产速度,以免机器崩溃。如果 hbase 崩溃了,看看内存够不够,我就直接怼上 2GB X 3 个节点
SZT-flink
模块新增 cn.java666.etlflink.app.Json2HBase
实现了从 redis 或者其他数据源取出 json 串,保存到 hbase 表。本项目中从 redis 获取 json(当然更推荐 kafka),通过 flink 清洗存到 hbase flink:flink2hbase 表中。用于实时保存深圳通刷卡记录,通过卡号查询可以获取卡号最近10次(如果有10次)交易记录。
简化了上一版 hbase 写入 bean 的方式,JSON 再一次赢得掌声
�� �� �� 。
val keys = jsonObj.keySet().toList
val size = keys.size()
for (i <- 0 until size) {
val key = keys.get(i)
val value = jsonObj.getStr(key)
putCell(card_no_re, cf, key, value)
}
完成 flink 读取 kafka,存到 clickhouse 功能。
...继续开发中
2020-05-25:
2020-05-22:
2020-05-14:
RedisSinkPageJson
从 package cn.java666.etlflink.sink
移到 package cn.java666.etlflink.app
更名为 Jsons2Redis
,方便归类,该模块用于解析原始数据多行json到redis;2020-05-01:
2020-04-30:
2020-04-27:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
######################### 实时热部署 ###################################
#"关闭缓存, 即时刷新"
spring.freemarker.cache=false
spring.thymeleaf.cache=false
#热部署生效
spring.devtools.restart.enabled=true
#是否支持livereload
spring.devtools.livereload.enabled=true
#设置重启的目录,添加那个目录的文件需要restart
spring.devtools.restart.additional-paths=src/main/*
#设置不需要重启的目录
#spring.devtools.restart.exclude=static/**,public/**
alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
alter table PARTITION_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8;
alter table INDEX_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
2020-04-24:
2020-04-23:
2020-04-22:
2020-04-21:
2020-04-20:
2020-04-19:
rm -rf /usr/
2020-04-18:
2020-04-17
2020-04-16
2020-04-15
2020-04-14
2020-04-13
百度和谷歌能找到的问题就不要再问了!很累的
�� �� ��
坚持原则和底线。
比心
程序员这辈子一定会遇到的三个问题:
我的100米大小,量化数据: 期望的结果:
我只想用dask保存CSV文件。我得到了超过30GB的数据。读取csv没有问题。但下班后我需要将其保存为CSV文件。它不起作用。帮助我 有如下错误消息。。 文件"熊猫/_libs/parsers.pyx",第894行,在熊猫中。_libs.parsers.TextReader.read文件"熊猫/_libs/parsers.pyx",第916行,在熊猫中。_libs.parsers.TextRea
> 用例:需要将Talend的bigdata组件即tSqoopImport连接到MapR集群上的MySQL DB。 Talend Open studio for Big-Data(5.6.2)位于我的工作站上。MySQL(5.5)数据库安装在5节点MapR(M3-edition)集群上。集群是物理设置的,可以通过像putty和MobaX这样的远程客户端访问 -->已经附加了代表我的用例的talen
BIG DATA ECOSYSTEM COM DOCKER Ambiente para estudo dos principais frameworks big data em docker. Esse setup vai criar dockers com os frameworks HDFS, HBase, Hive, Presto, Spark, Jupyter, Hue, Mongodb,