做数据仓库的同学会面临三大问题:性能、稳定性、准确性,归根结底还是性能问题;框架的天花板以及sql复杂度、计算资源的紧张都会导致数据仓库的性能受到挑战,随着业务的积累,性能的问题变的越来越明显,性能差直接导致整个数仓集群的稳定性差,经常出问题的数仓自然也就会数据不准,所以解决上述3个问题,应优先解决性能问题。
SnappyData是一个基于内存的数据库,和redis不同的是SnappyData以数据分析为主,可以处理一定的并发量,但不建议太高,100%支持hive sql,但聚合速度比hive快数百上甚至千倍,自带spark(指定的spark版本) ,是一款支持delete/update(性能不是很高)操作的OLAP库。
随着易企秀业务的发展,大量的分析需求&报表需求接踵而至,产品和运营同学对数据分析的要求越来越高、对数据分析延迟越来越低,之前通过由数据产品梳理业务指定分析指标,再由研发进行完成计算的方式已不能胜任,需要重新重视数据仓库的重要性。
这里采用的优化方案是SnappyData + CBoard,通过jdbc连接SnappyData进行业务数据实时交互分析。其中CBoard是一个开源的自助分析BI系统,简单的分组排序需求,产品和运营同学可以在上面托拉拽实现,从此研发同学不用加班加点疲于完成各种定制化报表了、大量释放了研发同学的宝贵时间;SnappyData是一个开源的内存型OLAP分析数据库,支持列式与行式存储,目前我们绝大部分数据分析业务都能秒出,支持adhoc, 100%兼容spark算子与sql操作,可通过spark 任务快速实现HDFS或者hive中的数据导入到SnappyData。
数据库 | 响应时间 | 并发能力 | 社区 | 处理能力 | 分析能力 | 缺点 |
---|---|---|---|---|---|---|
Impala | 慢 | 低 | 适中 | 支持的数据规模大 | 支持标准SQL以及多表join和窗口函数 | 性能差、不实时 |
Kylin | 快 | 高 | 活跃 | 支持的数据规模大 | 性能高,支持标准SQL | 需要预计算、不支持多表关联 |
Druid | 快 | 高 | 活跃 | 支持的数据规模大 | 性能高,但SQL支持弱 | 不支持adhoc、吃内存 |
ES | 特快 | 中 | 活跃 | 支持的数据规模小 | 性能高,但SQL支持弱 | 仅支持单表的分组聚合排序、统计结果会有<1%的误差 |
ClickHouse | 中 | 中 | 不活跃 | 支持的数据规模一般 | 性能中,但SQL支持弱 | 扩展性弱、多表关联弱 |
Doris | 快 | 中 | 适中 | 支持的数据规模大 | 除update和delete外的标准SQL | 还在孵化阶段、不兼容hadoop生态 |
SnappyData | 快 | 中 | 不活跃 | 支持的数据规模中等 | 完全兼容spark sql,支持update与delete操作 | 稳定性差、存在OOM的风险 |
个人测试:在亿级别单表查询分析 ES都是最快的,基本都是毫秒级响应,但数据是抽样预估出来的不是很准;SnappyData是综合表现最好的,虽然snappydata的社区并不活跃,但个人觉得官网提供的一手资料已经足够了,即便snappydata稳定性稍差但易用性和性能做的真是太诱人了
这里以一个简单例子讲解SnappyData的使用:将现有日志数据导入到SnappyData,实现用户多维度自助分析。
1、官网下载最新的SnappyData文件 ,
2、分发在20台hadoop nodemanager节点上
3、
启动 locator(任意两台):
./sbin/snappy-locator.sh start -peer-discovery-address=hadoop011 -peer-discovery-port=8888 -dir=/data/work/snappydata-1.1.0-bin/data/locator_1 -heap-size=1024m -locators=hadoop010:8888
启动 lead(任意两台):
./sbin/snappy-lead.sh start -dir=/data/work/snappydata-1.1.0-bin/data/lead_1 -heap-size=4096m -spark.ui.port=2480 -locators=hadoop011:8888,hadoop010:8888 -spark.executor.cores=20
启动 server(所有节点):
./sbin/snappy-server.sh start -dir=/data/work/snappydata-1.1.0-bin/data/server_1 -heap-size=8096m -locators=hadoop010:8888,hadoop011:8888
4、监控
浏览器输入如下地址
http://hadoop010.eqxiu.com:2480/dashboard/
val snappy = new org.apache.spark.sql.SnappySession(spark.sparkContext)
val df = spark.read.parquet("/data/merge/tracker_view/page_view/201906/01")
#如果数据在hive表中 则
val df = spark.read.table("tab_name")
val sn = snappy.createDataFrame(df.rdd, df.schema)
#第一次执行saveAsTable 会自动在SnappyData中创建对应的表
sn.write.format("column").saveAsTable("tracker_view")
#之后每天增量执行
sn.write.format("column").insertInto("tracker_view")
也可以手动先创建表 ,然后再导入数据
CREATE TABLE CUSTOMER (
C_1 INTEGER NOT NULL,
...
...
)
USING COLUMN OPTIONS (BUCKETS '10', PARTITION_BY 'C_1')
其它参数说明
1、COLOCATE_WITH:COLOCATE_WITH {exist_table}语法的含义是对于新建的表,与exist_table具有相同的分区键,且相同的BUCKETS数据量;数据存储本地化,这样做的好处是当2个表发生基于key的join时,那些非常耗资源的hash join就不用跨节点进行数据传输(广播),而是在本地进行join。这个设计思路非常像关系型数据库Oracle中的cluster存储。这种数据存储本地化的特点,也是SnappyData在做join时比Spark快很多的原因之一。
2、PARTITION_BY:PARTITION_BY {COLUMN}语法的含义是按某列进行分区,当然也可以指定多个列作为组合。行表如果没有指定分区键,那么将是一张全局复制表;列表如果没有指定,那么内部也会有个默认的分区。列表中的分区遵循Spark Catalyst的hash分区,使得join时最小化shuffle。
3、BUCKETS:分区的个数。默认是128个,最小的数据存储单元。本地存储,这个值可以设置为集群core数量的2倍。
4、REDUNDANCY:分区的副本数,如果设置为0,表示没有副本;如果设置大于0,则会为partition创建对应的副本数,以防止member失败,达到数据的高可用性的目的。
5、EVICTION_BY:驱逐,很像Flink window中的eviction设置。列表上默认的参数值是LRUHEAPPERCENT,根据LRU算法达到阀值时,开始将内存中的较“冷”的数据溢出到本地磁盘:SnappyStore存储。
6、PERSISTENCE:持久化。默认是允许持久化的,数据会从内存中持久化到本地SnappyStore存储中,当重启memeber时,SnappyData会自动从本地的SnappyStore存储中恢复数据。
7、OVERFLOW:溢出,默认是true,即允许溢出。如果没有指定PERSISTENCE,且将OVERFLOW设置为false,那么当失败时,内存中的数据将被丢失。
8、DISKSTORE:为持久化的数据或溢出的数据提供持久化目录。可以通过CREATE DISKSTORE为表提前创建出本地文件目录,可以指定文件、配置数据收缩、配置数据异步到磁盘的频率等等.
9、EXPIRE:过期时间。为了提高内存使用率,对于很老的历史数据,可以通过设置过期时间使得超过阀值的行数据过期。但是过期参数只适合行表。
10、COLUMN_BATCH_SIZE:刚才提到了,delta row buffer的batch大小,默认24MB。超过阀值就会写到列表。
11、COLUMN_MAX_DELTA_ROWS:delta row buffer的最大行数,默认10000行。超过阀值会写到列表。
select pro,count(distinct c_i) cn from tracker_view group by pro order by cn desc limit 11
在做SnappyData搭建过程中也遇到了不少坑,这里就不一一列举了,因为这不是一篇填坑的文章,大家使用过程中有遇到问题也欢迎留言。
经测试,我们最大的两张业务表 作品(上亿)+用户(千万) 关联分析,可秒出;使用过程中发现灵活性足够了,但稳定性稍差一些,建议在一些对稳定性要求不高、时效性要求很高的内部分析场景中使用。