07-hive的存储格式及hive的调优及FLUME的基本介绍

刘瑞
2023-12-01

离线阶段第七天

hive当中的存储格式

数据的存储格式主要分为两大类,一类是行式存储,一类是列式存储

行式存储:TextFile,SequenceFile。

列式存储:Parquet ,Orc。

第一种文件处处格式:textFile 行式存储

第四种:sequenceFile 二进制的行式存储

第二种存储格式:orc,一个orc文件,由多个stripe组成。一个stripe由三部分构成

​ indexData:存储了row data里面的索引的数据

​ row data:数据都存在row data里面

​ stripe footer:stripe的元数据信息,第多少个stripe,上一个stripe是哪一个,下一个stripe是哪一个

第三种存储格式:parquet twitter + cloudera公司 合作开发 列式存储

主流文件存储格式的对比

原始文件18.1M

textFile存储格式 18.1M

orc存储格式 2.8M 两方面的原因 第一方面数据格式更加紧凑,占用磁盘空间比较少。第二方面:orc自带了一种压缩算法

parquet 存储格式:13.1M

存储文件的压缩比总结:

ORC > Parquet > textFile

文件的查询速度

log_text 9.756 seconds

log_orc 9.513 seconds

log_parquet 10.628 seconds

查询速度总结:log_orc > log_text > log_parquet

存储与压缩结合使用

建表的时候既要指定我们的存储格式,也要指定我们的压缩方式

如果建表的时候指定存储格式为orc,不指定任何的压缩算法,18.1M的文件变成了7.7M

如果文件存储格式指定为orc,压缩方式知道个为snappy,18.1M的文件变成了3.8M

实际工作当中,一般自己使用建立的内部表数据,存储格式都是使用orc,压缩方式都是使用的snappy

hive当中的调优

1、fetch的抓取

hive当中能够避免不走mr的,就尽量不要走mr

hive.fetch.task.conversion 默认值就是more,

如果给调整成了node,所有hql语句都需要走mr

2、本地模式运行

如果输入的数据量 足够小,并且输入文件的个数足够少,那么我们就可以启用本地运行模式

所有的数据全部都在一个maptask里面处理,所有的数据全部都在一个reduceTask里面处理

set hive.exec.mode.local.auto.inputbytes.max=51234560; 调整输入数据量的大小

set hive.exec.mode.local.auto.input.files.max=10; 设置文件的输入的个数

3、表的优化

select count(distinct s_id) from score;

select count(1) from (select count(1) from score group by s_id) temp;

多表关联的时候,尽量拆成多个sql段

大表join大表:

两个表的数据非常大

尽量额过滤无效的数据

不过滤空 id

INSERT OVERWRITE TABLE jointable

SELECT a.* FROM nullidtable a JOIN ori b ON a.id = b.id;

nullidtable 这个表有空id

ori 这个表没有空id

如果空id没有什么意义,可以过滤掉,过滤掉之后,nullidtable 数据会变少

过滤掉空id

INSERT OVERWRITE TABLE jointable

SELECT a.* FROM (SELECT * FROM nullidtable WHERE id IS NOT NULL ) a JOIN ori b ON a.id = b.id;

如果空key不让过滤怎么办???

空key的转换

set hive.exec.reducers.bytes.per.reducer=32123456;

set mapreduce.job.reduces=7;

INSERT OVERWRITE TABLE jointable

SELECT a.*

FROM nullidtable a

LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN ‘hive’ ELSE a.id END = b.id;

空key的转换成随机字符串

set hive.exec.reducers.bytes.per.reducer=32123456;

set mapreduce.job.reduces=7;

INSERT OVERWRITE TABLE jointable

SELECT a.*

FROM nullidtable a

LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN concat(‘hive’, rand()) ELSE a.id END = b.id;

4、大小表的join

hive里面有一个优化器组件,简单的优化我们的sql语句

hive当中大表join小表,以及小表join大表:已经没有效率的区别

设置map端join操作

set hive.auto.convert.join = true;

set hive.mapjoin.smalltable.filesize=25123456;

启动了两个mapreducetask,第一个mapreducetask先去找小表的数据,将小表的数据加载到内存

表的group by优化:

能够在map端进行聚合的数据,就先在map端进行一次聚合

(1)是否在Map端进行聚合,默认为True

set hive.map.aggr = true;

(2)在Map端进行聚合操作的条目数目

​ set hive.groupby.mapaggr.checkinterval = 100000;

(3)有数据倾斜的时候进行负载均衡(默认是false)

​ set hive.groupby.skewindata = true;

mapreduce当中的去重操作:

去重只能在一个reducetask里面完成,去重都可以转换成group by

SELECT count(DISTINCT id) FROM bigtable;

SELECT count(id) FROM (SELECT id FROM bigtable GROUP BY id) a;

笛卡尔积:任何时候都要避免笛卡尔积

100 * 100 =10000

使用分区裁剪,列裁剪:

如果是分区表,一定要指定分区字段,如果不需要的列,一定不要写到select里面去

select * 不要写了,select 字段

SELECT a.id

FROM bigtable a

LEFT JOIN ori b ON a.id = b.id

WHERE b.id <= 10;

可以先过滤,再进行left join操作

SELECT a.id

FROM ori a

LEFT JOIN bigtable b ON (a.id <= 10 AND a.id = b.id);

或者直接写成子查询:

SELECT a.id FROM bigtable a RIGHT JOIN (SELECT id FROM ori WHERE id <= 10 ) b ON a.id = b.id;

5、动态分区调整

set hive.exec.dynamic.partition = true;

set hive.exec.dynamic.partition.mode = nonstrict;

set hive.exec.max.dynamic.partitions = 1000;

set hive.exec.max.dynamic.partitions.pernode = 100;

set hive.exec.max.created.files = 100000;

set hive.error.on.empty.partition = false;

create table ori_partitioned(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string)

PARTITIONED BY (p_time bigint)

row format delimited fields terminated by ‘\t’;

create table ori_partitioned_target(id bigint,
time bigint, uid string, keyword string, url_rank int, click_num int, click_url
string) PARTITIONED BY (p_time STRING) row format delimited fields terminated
by ‘\t’;

通过动态分区向目标表当中插入数据

INSERT overwrite TABLE ori_partitioned_target PARTITION (p_time)

SELECT id, time, uid, keyword, url_rank, click_num, click_url, p_time

FROM ori_partitioned;

如果使用动态分区,我们需要带上我们的分组字段,并且,分区字段一定要写在select 最后面

而且insert overwrite table 的时候,目标表不需要手动的指定分区数据了

5、数据的倾斜

主要研究如何控制maptask的个数以及reducetask的个数

maptask是不是越多越好???maptaks是不是越少越好???

reduceTask是不是越多越好??? reduceTask是不是越少越好???

合并小文件,减少hive的输入的maptask的个数

1)参数设置(下面的API属于hadoop低版本的API)

set mapred.max.split.size=112345600;

set mapred.min.split.size.per.node=112345600;

set mapred.min.split.size.per.rack=112345600;

set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

减少maptask的个数

这个参数表示执行前进行小文件合并,前面三个参数确定合并文件块的大小,大于文件块大小128m的,按照128m来分隔,小于128m,大于100m的,按照100m来分隔,把那些小于100m的(包括小文件和分隔大文件剩下的),进行合并。

如何增加maptask的个数???

将文件全部打散成小文件

假设有这样的一个任务

Select data_desc,

count(1),

count(distinct id),

sum(case when …),

sum(case when …),

sum(…)

from a group by data_desc

文件127M 启动一个maptask去处理,但是处理逻辑太复杂了

设置多个recuetask将文件给打散

set mapreduce.job.reduces =10;

create table a_1 as

select * from a

distribute by rand(123);

估算reduceTask的个数

1)调整reduce个数方法一

(1)每个Reduce处理的数据量默认是256MB

hive.exec.reducers.bytes.per.reducer=256123456

​ (2)每个任务最大的reduce数,默认为1009

hive.exec.reducers.max=1009

(3)计算reducer数的公式

N=min(参数2,总输入数据量/参数1)

2**)调整reduce****个数方法二**

在hadoop的mapred-default.xml文件中修改

设置每个job的Reduce个数

set mapreduce.job.reduces = 15;

6、并行执行

select * from course union all select * from student;

7、hive的严格模式:

如果开启了严格模式:

第一个:order by 必须带limit字段

第二个:分区表必须带分区条件 只选择我们需要的分区的数据

第三个:笛卡尔积不能执行 笛卡尔积没有任何意义

set hive.mapred.mode=strict开启严格模式

set hive.mapred.mode=nonstrict开启非严格模式

8、jvm重用

设置我们的container最多可以跑10个task

set mapred.job.reuse.jvm.num.tasks=10;

9、推测执行

如果一个maptask长久的没有执行完成,会再启动一个maptask,去并行的执行,谁先执行完,使用谁的结果

一般都是直接关闭map端的推测执行,以及reduce端的推测执行

set mapreduce.map.speculative=false

set hive.mapred.reduce.tasks.speculative.execution=false

大数据当中的辅助系统

1、flume的基本介绍

flume是cloudera公司开源提供的一个数据采集的框架,可以从各个地方采集我们的数据,文件,文件夹,网络端口

flume是一个数据采集的框架,说白了就是搬运数据的

flume的基本架构:

source:对接数据源,获取数据。采集数据的组件

channel:source采集的数据全部都发送到channel里面来,管道,做一个缓冲,缓冲一下source发送过来的数据,然后批量的发送给sink

sink:决定我们的数据需要发送到哪里去

agent:一个flume的程序运行起来,就叫做一个agent

events:我们一条条的数据,一条数据就是一个event

收集文件夹下面的数据到hdfs上面去

注意:flume比较脆弱,一旦抛异常之后,再也不干活儿了

flume有没有做什么监控?????

第一个:数据源,数据一直在增多,但是没有改变后缀名。说白了就是.complete 的文件没有增多

第二个:数据的目的地,数据的目的地没有增多

依据以上两个现象:就可以判断flume有没有在干活儿。判断flume没有干活儿了???杀掉flume的进程,然后将文件夹下面的文件全部移走,然后重新启动flume即可

可以通过shell脚本来实现

需要自己通过shell脚本,或者java程序,定时的检测flume是否正常工作

第二个问题:flum多长时间滚动一次????问你的flume的配置,多长时间上传一次到hdfs上面去

500Gb数据 1小时42Gb数据 每隔10分钟产生768M数据

第一个控制文件的大小1Gb 第二个 每隔10分钟滚动一次 第三个:每隔200event 滚动一次

一天大概所有的数据在50-100Gb基本上你们可以驾驭 一般一到2个人可以搞定

flume还可以与kafka整合:做实时的数据的处理。不存在时间间隔的问题,将数据越快放入kafka越好

flume可不可以采集mysql里面的数据???如何通过flume采集数据库里面的数据

有没有mysql source

需要你们下去调研:如何实时的采集mysql里面的数据,canal streamSet

flume可以采集mysql里面的数据,自定义source组件、如何采集,最好能够实现,将mysql里面的数据采集到hdfs里面来

roll size 10000条 往hdfs上面穿

batch size 100条 100次

roll count 5M

设置我们文件生产的条数,多少条数据生成一次到hdfs上面去 100000

a1.sinks.k1.hdfs.rollCount = 0

生成的文件的时间长短

a1.sinks.k1.hdfs.rollInterval = 30

文件内容大小达到多大的时候生成到hdfs上面去

a1.sinks.k1.hdfs.rollSize = 500M

采用批量的方式,每批次发送多少条数据到hdfs上面去,直到我们积累的数据都发送完成为止

a1.sinks.k1.hdfs.batchSize = 10000

azkaban使用的是gradle 来进行编译。gradle功能 与maven类似

 类似资料: