数据的存储格式主要分为两大类,一类是行式存储,一类是列式存储
行式存储:TextFile,SequenceFile。
列式存储:Parquet ,Orc。
第一种文件处处格式:textFile 行式存储
第四种:sequenceFile 二进制的行式存储
第二种存储格式:orc,一个orc文件,由多个stripe组成。一个stripe由三部分构成
indexData:存储了row data里面的索引的数据
row data:数据都存在row data里面
stripe footer:stripe的元数据信息,第多少个stripe,上一个stripe是哪一个,下一个stripe是哪一个
第三种存储格式:parquet twitter + cloudera公司 合作开发 列式存储
主流文件存储格式的对比
原始文件18.1M
textFile存储格式 18.1M
orc存储格式 2.8M 两方面的原因 第一方面数据格式更加紧凑,占用磁盘空间比较少。第二方面:orc自带了一种压缩算法
parquet 存储格式:13.1M
存储文件的压缩比总结:
ORC > Parquet > textFile
文件的查询速度
log_text 9.756 seconds
log_orc 9.513 seconds
log_parquet 10.628 seconds
查询速度总结:log_orc > log_text > log_parquet
建表的时候既要指定我们的存储格式,也要指定我们的压缩方式
如果建表的时候指定存储格式为orc,不指定任何的压缩算法,18.1M的文件变成了7.7M
如果文件存储格式指定为orc,压缩方式知道个为snappy,18.1M的文件变成了3.8M
实际工作当中,一般自己使用建立的内部表数据,存储格式都是使用orc,压缩方式都是使用的snappy
hive当中能够避免不走mr的,就尽量不要走mr
hive.fetch.task.conversion 默认值就是more,
如果给调整成了node,所有hql语句都需要走mr
如果输入的数据量 足够小,并且输入文件的个数足够少,那么我们就可以启用本地运行模式
所有的数据全部都在一个maptask里面处理,所有的数据全部都在一个reduceTask里面处理
set hive.exec.mode.local.auto.inputbytes.max=51234560; 调整输入数据量的大小
set hive.exec.mode.local.auto.input.files.max=10; 设置文件的输入的个数
select count(distinct s_id) from score;
select count(1) from (select count(1) from score group by s_id) temp;
多表关联的时候,尽量拆成多个sql段
大表join大表:
两个表的数据非常大
尽量额过滤无效的数据
不过滤空 id
INSERT OVERWRITE TABLE jointable
SELECT a.* FROM nullidtable a JOIN ori b ON a.id = b.id;
nullidtable 这个表有空id
ori 这个表没有空id
如果空id没有什么意义,可以过滤掉,过滤掉之后,nullidtable 数据会变少
过滤掉空id
INSERT OVERWRITE TABLE jointable
SELECT a.* FROM (SELECT * FROM nullidtable WHERE id IS NOT NULL ) a JOIN ori b ON a.id = b.id;
如果空key不让过滤怎么办???
空key的转换
set hive.exec.reducers.bytes.per.reducer=32123456;
set mapreduce.job.reduces=7;
INSERT OVERWRITE TABLE jointable
SELECT a.*
FROM nullidtable a
LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN ‘hive’ ELSE a.id END = b.id;
空key的转换成随机字符串
set hive.exec.reducers.bytes.per.reducer=32123456;
set mapreduce.job.reduces=7;
INSERT OVERWRITE TABLE jointable
SELECT a.*
FROM nullidtable a
LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN concat(‘hive’, rand()) ELSE a.id END = b.id;
hive里面有一个优化器组件,简单的优化我们的sql语句
hive当中大表join小表,以及小表join大表:已经没有效率的区别
设置map端join操作
set hive.auto.convert.join = true;
set hive.mapjoin.smalltable.filesize=25123456;
启动了两个mapreducetask,第一个mapreducetask先去找小表的数据,将小表的数据加载到内存
表的group by优化:
能够在map端进行聚合的数据,就先在map端进行一次聚合
(1)是否在Map端进行聚合,默认为True
set hive.map.aggr = true;
(2)在Map端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval = 100000;
(3)有数据倾斜的时候进行负载均衡(默认是false)
set hive.groupby.skewindata = true;
mapreduce当中的去重操作:
去重只能在一个reducetask里面完成,去重都可以转换成group by
SELECT count(DISTINCT id) FROM bigtable;
SELECT count(id) FROM (SELECT id FROM bigtable GROUP BY id) a;
笛卡尔积:任何时候都要避免笛卡尔积
100 * 100 =10000
使用分区裁剪,列裁剪:
如果是分区表,一定要指定分区字段,如果不需要的列,一定不要写到select里面去
select * 不要写了,select 字段
SELECT a.id
FROM bigtable a
LEFT JOIN ori b ON a.id = b.id
WHERE b.id <= 10;
可以先过滤,再进行left join操作
SELECT a.id
FROM ori a
LEFT JOIN bigtable b ON (a.id <= 10 AND a.id = b.id);
或者直接写成子查询:
SELECT a.id FROM bigtable a RIGHT JOIN (SELECT id FROM ori WHERE id <= 10 ) b ON a.id = b.id;
5、动态分区调整
set hive.exec.dynamic.partition = true;
set hive.exec.dynamic.partition.mode = nonstrict;
set hive.exec.max.dynamic.partitions = 1000;
set hive.exec.max.dynamic.partitions.pernode = 100;
set hive.exec.max.created.files = 100000;
set hive.error.on.empty.partition = false;
create table ori_partitioned(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string)
PARTITIONED BY (p_time bigint)
row format delimited fields terminated by ‘\t’;
create table ori_partitioned_target(id bigint,
time bigint, uid string, keyword string, url_rank int, click_num int, click_url
string) PARTITIONED BY (p_time STRING) row format delimited fields terminated
by ‘\t’;
通过动态分区向目标表当中插入数据
INSERT overwrite TABLE ori_partitioned_target PARTITION (p_time)
SELECT id, time, uid, keyword, url_rank, click_num, click_url, p_time
FROM ori_partitioned;
如果使用动态分区,我们需要带上我们的分组字段,并且,分区字段一定要写在select 最后面
而且insert overwrite table 的时候,目标表不需要手动的指定分区数据了
主要研究如何控制maptask的个数以及reducetask的个数
maptask是不是越多越好???maptaks是不是越少越好???
reduceTask是不是越多越好??? reduceTask是不是越少越好???
合并小文件,减少hive的输入的maptask的个数
1)参数设置(下面的API属于hadoop低版本的API)
set mapred.max.split.size=112345600;
set mapred.min.split.size.per.node=112345600;
set mapred.min.split.size.per.rack=112345600;
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
减少maptask的个数
这个参数表示执行前进行小文件合并,前面三个参数确定合并文件块的大小,大于文件块大小128m的,按照128m来分隔,小于128m,大于100m的,按照100m来分隔,把那些小于100m的(包括小文件和分隔大文件剩下的),进行合并。
如何增加maptask的个数???
将文件全部打散成小文件
假设有这样的一个任务
Select data_desc,
count(1),
count(distinct id),
sum(case when …),
sum(case when …),
sum(…)
from a group by data_desc
文件127M 启动一个maptask去处理,但是处理逻辑太复杂了
设置多个recuetask将文件给打散
set mapreduce.job.reduces =10;
create table a_1 as
select * from a
distribute by rand(123);
估算reduceTask的个数
1)调整reduce个数方法一
(1)每个Reduce处理的数据量默认是256MB
hive.exec.reducers.bytes.per.reducer=256123456
(2)每个任务最大的reduce数,默认为1009
hive.exec.reducers.max=1009
(3)计算reducer数的公式
N=min(参数2,总输入数据量/参数1)
2**)调整reduce****个数方法二**
在hadoop的mapred-default.xml文件中修改
设置每个job的Reduce个数
set mapreduce.job.reduces = 15;
6、并行执行
select * from course union all select * from student;
7、hive的严格模式:
如果开启了严格模式:
第一个:order by 必须带limit字段
第二个:分区表必须带分区条件 只选择我们需要的分区的数据
第三个:笛卡尔积不能执行 笛卡尔积没有任何意义
set hive.mapred.mode=strict开启严格模式
set hive.mapred.mode=nonstrict开启非严格模式
8、jvm重用
设置我们的container最多可以跑10个task
set mapred.job.reuse.jvm.num.tasks=10;
9、推测执行
如果一个maptask长久的没有执行完成,会再启动一个maptask,去并行的执行,谁先执行完,使用谁的结果
一般都是直接关闭map端的推测执行,以及reduce端的推测执行
set mapreduce.map.speculative=false
set hive.mapred.reduce.tasks.speculative.execution=false
1、flume的基本介绍
flume是cloudera公司开源提供的一个数据采集的框架,可以从各个地方采集我们的数据,文件,文件夹,网络端口
flume是一个数据采集的框架,说白了就是搬运数据的
flume的基本架构:
source:对接数据源,获取数据。采集数据的组件
channel:source采集的数据全部都发送到channel里面来,管道,做一个缓冲,缓冲一下source发送过来的数据,然后批量的发送给sink
sink:决定我们的数据需要发送到哪里去
agent:一个flume的程序运行起来,就叫做一个agent
events:我们一条条的数据,一条数据就是一个event
收集文件夹下面的数据到hdfs上面去
注意:flume比较脆弱,一旦抛异常之后,再也不干活儿了
flume有没有做什么监控?????
第一个:数据源,数据一直在增多,但是没有改变后缀名。说白了就是.complete 的文件没有增多
第二个:数据的目的地,数据的目的地没有增多
依据以上两个现象:就可以判断flume有没有在干活儿。判断flume没有干活儿了???杀掉flume的进程,然后将文件夹下面的文件全部移走,然后重新启动flume即可
可以通过shell脚本来实现
需要自己通过shell脚本,或者java程序,定时的检测flume是否正常工作
第二个问题:flum多长时间滚动一次????问你的flume的配置,多长时间上传一次到hdfs上面去
500Gb数据 1小时42Gb数据 每隔10分钟产生768M数据
第一个控制文件的大小1Gb 第二个 每隔10分钟滚动一次 第三个:每隔200event 滚动一次
一天大概所有的数据在50-100Gb基本上你们可以驾驭 一般一到2个人可以搞定
flume还可以与kafka整合:做实时的数据的处理。不存在时间间隔的问题,将数据越快放入kafka越好
flume可不可以采集mysql里面的数据???如何通过flume采集数据库里面的数据
有没有mysql source
需要你们下去调研:如何实时的采集mysql里面的数据,canal streamSet
flume可以采集mysql里面的数据,自定义source组件、如何采集,最好能够实现,将mysql里面的数据采集到hdfs里面来
roll size 10000条 往hdfs上面穿
batch size 100条 100次
roll count 5M
设置我们文件生产的条数,多少条数据生成一次到hdfs上面去 100000
a1.sinks.k1.hdfs.rollCount = 0
生成的文件的时间长短
a1.sinks.k1.hdfs.rollInterval = 30
文件内容大小达到多大的时候生成到hdfs上面去
a1.sinks.k1.hdfs.rollSize = 500M
采用批量的方式,每批次发送多少条数据到hdfs上面去,直到我们积累的数据都发送完成为止
a1.sinks.k1.hdfs.batchSize = 10000
azkaban使用的是gradle 来进行编译。gradle功能 与maven类似