当前位置: 首页 > 面试经验 >

大数据(MapReduce)面试题及答案

优质
小牛编辑
104浏览
2023-03-28

大数据(MapReduce)面试题及答案

  1. 介绍下MapReduce

● 1.1 MapReduce定义

○ MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架。

○ MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并行运行在一个 Hadoop 集群上。

  1. MapReduce优缺点

● 1.2 MapReduce优缺点

○ 1.2.1 优点

■ 1)MapReduce 易于编程-MR

  ● 简单地实现一些接口,就可以完成一个分布式的程序。

■ 2)良好的扩展性

  ● 分布式程序可以分不到大量的廉价PC机器上运行,计算资源不够时,可以通过简单的增加机器数量来拓展集群的计算能力

■ 3)高容错性

  ● 当集群中的节点宕机后,Hadoop自动完成将宕机节点的计算存储任务重新分配到正常运行的节点上。

■ 4)适合 PB 级以上海量数据的离线处理

  ● 可以实现上千台服务器集群并发工作,提供超强的数据处理能力

○ 1.2.2 缺点

■ 1)不擅长实时计算(I/O 操作多,落盘)

  ● MapReduce无法像Mysql一样,在毫秒级或者秒级内返回计算结果

■ 2)不擅长流式计算

  ● 流式计算的数据输入是动态的,而MapReduce的数据输入是静态的,不能动态变化。

■ 3)不擅长 类似于DAG(有向无环图)的串行计算

  ● 每个MapReduce作业的输出结果都会写到磁盘,会造成大量的磁盘IO,导致性能非常低下。
  1. MapReduce架构
    和HDFS一样,MapReduce也是采用Master/Slave的架构,其架构如下图所示:

MapReduce包含四个组成部分,分别为Client,JobTracker,TaskTracker,Task。
a)client客户端
每一个Job都会在用户端通过Client类将应用程序以及参数配置Configuration打包成Jar文件存储在HDFS,并把路径提交到JobTracker的master服务,然后由master创建每一个Task(即MapTask和ReduceTask),将它们分发到各个TaskTracker服务中去执行。

b)JobTracker
JobTracker负责资源监控和作业调度。JobTracker监控所有的TaskTracker与job的健康状况,一旦发现失败,就将相应的任务转移到其它节点;同时JobTracker会跟踪任务的执行进度,资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在Hadoop中,任务调度器是一个可插拔的模块,用于可以根据自己的需要设计相应的调度器。

c)TaskTracker
TaskTracker会周期性地通过HeartBeat将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时执行JobTracker发送过来的命令 并执行相应的操作(如启动新任务,杀死任务等)。TaskTracker使用“slot”等量划分本节点上的资源量。“slot”代表计算资源(cpu,内存等) 。一个Task获取到一个slot之后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot分为MapSlot和ReduceSlot两种,分别提供MapTask和ReduceTask使用。TaskTracker通过slot数目(可配置参数)限定Task的并发度。

d)Task
Task分为MapTask和Reduce Task两种,均由TaskTracker启动。HDFS以固定大小的block为基本单位存储数据,而对于MapReduce而言,其处理单位是split。split是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。但需要注意的是,split的多少决定了MapTask的数目,因为每一个split只会交给一个MapTask处理。
————————————————
版权声明:本文为CSDN博主「墨梅寒香」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u010176083/article/details/53269317

  1. MapReduce工作原理
    mapreduce工作原理为:MapReduce是一种编程模型,用于大规模数据集的并行运算。MapReduce采用”分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。

MapReduce就是”任务的分解与结果的汇总”,它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。

  1. MapReduce哪个阶段最费时间

● 3.2 MapReduce工作流程

○ 简易版:

■ InputFormat --> Mapper --> shuffle --> Reducer --> OutputFormat

○ 详细版:

■ InputFormat -->标记分区 --> 环形缓冲区内进行排序、溢写 ---> 将溢写文件进行归并排序 --> Reduce:不同的Reduce节点copy Mapper阶段对应的文件 -->将copy的文件进行归并 -> 执行reduce方法 --> OutputFormat

Shuffle阶段涉及到的操作最多,并且包含多次磁盘IO,所以最费时间。

  1. MapReduce中的Combine是干嘛的? 有什么好外?

● Combiner合并

○ 1)Combiner是MR程序汇总Mapper和Reducer之外的一种组件

○ 2)Combiner组件的父类就是Reducer

○ 3) Combiner和Reducer组件的区别局在于运行的位置

■ Combiner是在每一个MapTask所在的节点运行

■ Reducer是接受全局的所有Mapper的输出结果,然后进行运算。

○ 4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减少网络传输量。

○ 5)Combiner能够应用的前提是不能影响最终的业务逻辑。

○ 自定义Combiner实现步骤:

■ 1.自定义一个Combiner并继承Reducer,重写Reduce方法

■ 2.在Job驱动中配置使用。
  1. 环型缓冲区是什么?

环形缓冲区分为三块,空闲区、数据区、索引区。初始位置取名叫做“赤道”,就是圆环上的白线那个位置。初始状态的时候,数据和索引都为0,所有空间都是空闲状态。数据是从赤道的右边开始写入,索引(每次申请4kb)是从赤道是左边开始写,两个文件是独立的,执行期间互不干涉。

在数据和索引的大小到了mapreduce.map.sort.spill.percent参数设置的比例时(默认80%,这个是调优的参数),会有两个动作:
1、对写入的数据进行原地排序,并把排序好的数据和索引spill到磁盘上去;
2、在空闲的20%区域中,重新算一个新的赤道,然后在新赤道的右边写入数据,左边写入索引;
3、当20%写满了,但是上一次80%的数据还没写到磁盘的时候,程序就会panding一下,等80%空间腾出来之后再继续写。
如此循环往复,永不停歇,直到所有任务全部结束。整个操作都在内存,形状像一个环,所以才叫环形缓冲区。

  1. MapReduce为什么一定要有环型缓冲区
    环形缓冲区不需要重新申请新的内存,始终用的都是这个内存空间。大家知道MR是用java写的,而Java有一个最讨厌的机制就是Full GC。Full GC总是会出来捣乱,这个bug也非常隐蔽,发现了也不好处理。环形缓冲区从头到尾都在用那一个内存,不断重复利用,因此完美的规避了Full GC导致的各种问题,同时也规避了频繁申请内存引发的其他问题。

另外呢,环形缓冲区同时做了两件事情:1、排序;2、索引。在这里一次排序,将无序的数据变为有序,写磁盘的时候顺序写,读数据的时候顺序读,效率高非常多!

在这里设置索引区也是为了能够持续的处理任务。每读取一段数据,就往索引文件里也写一段,这样在排序的时候能加快速度。
————————————————
版权声明:本文为CSDN博主「大数据架构师Evan」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_52346300/article/details/116021803

  1. MapReduce为什么一定要有Shuffle过程
    因为不同的 Map 可能输出相同的 Key,相同的 Key 必须发送到同一个 Reduce 端处理,因此需要Shuffle进行排序分区,减少跨节点数据传输的资源消耗。将数据完整的从map task端拉取数据到reduce task端减少磁盘IO对task的影响。

  2. MapReduce的Shuffle过程及其优化
    shuffle优化
    1)Map阶段

(1)增大环形缓冲区大小。由100m扩大到200m
(2)增大环形缓冲区溢写的比例。由80%扩大到90%
(3)减少对溢写文件的merge次数。(10个文件,一次20个merge)
(4)不影响实际业务的前提下,采用Combiner提前合并,减少 I/O。

2)Reduce Shuffle阶段

(1)合理设置Map和Reduce数:两个都不能设置太少,也不能设置太多。太少,会导致Task等待,延长处理时间;太多,会导致 Map、Reduce任务间竞争资源,造成处理超时等错误。
(2)设置Map、Reduce共存:调整slowstart.completedmaps参数,使Map运行到一定程度后,Reduce也开始运行,减少Reduce的等待时间。
(3)规避使用Reduce,因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。
(4)增加每个Reduce去Map中拿数据的并行数
(5)集群性能可以的前提下,增大Reduce端存储数据内存的大小。

3)IO传输

采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZOP压缩编码器。

压缩:
(1)map输入端主要考虑数据量大小和切片,支持切片的有Bzip2、LZO。注意:LZO要想支持切片必须创建索引;
(2)map输出端主要考虑速度,速度快的snappy、LZO;
(3)reduce输出端主要看具体需求,例如作为下一个mr输入需要考虑切片,永久保存考虑压缩率比较大的gzip。
————————————————
版权声明:本文为CSDN博主「牧码文」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_46429290/article/details/121429795

  1. Reduce怎么知道去哪里拉Map结果集?
    map任务成功后,它们会使用心跳机制通知它们的application master。因此,对于指定作业,application master 知道map输出和主机位置之间的映射关系。reduce中的一个线程定期询问master以便于获取map输出主机的位置,直到获得所有输出位置。

由于第一个reducer可能失败,因此主机并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们。相反,主机会等待,直到application master 告知它删除map输出,这是作业完成后执行的。
————————————————
版权声明:本文为CSDN博主「七月流火_2567」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/zhy_2117/article/details/88145845

  1. Reduce阶段都发生了什么,有没有进行分组

● Reduce在接收到数据之后,对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。
Reduce端需要对数据进行分组,将key相同的放在一起规约。

  1. MapReduce Shuffle的排序算法
    shuffle阶段分为:

     1. map shuffle也称为shuffle writer, 每个map 处理分配的split, 然后写入到环形缓冲区中,当缓冲区中的数据达到 一定比率,就会开启线程将缓冲区中的数据写入文件,称为spill, spill 同时会对数据进行分区、排序、合并操作,然后写入到文件,这是一个边写缓冲区,边spill的过程,中间可能会产生多个文件,只到map 读取数据完毕会将spill 的所有小文件进行分区、排序、合并成为最终一个数据文件和一个索引文件。
    1. reduce shuffle 也称为shuffle reader, 待map阶段执行完成,每个reducer开启若干线程 从所有的map阶段输出的索引文件与数据文件获取对应的分区数据,若内存足够则存放在内存中,否则输出到磁盘,在这个过程中还会同时对内存、 磁盘数据进行合并(merge)、排序,最终形成一个有序的大文件,提供给reduce执行。

      在shuffle writer 与shuffle reader阶段都发生按照数据的key进行排序,spill 过程对内存缓冲区的数据进行快速排序,map最终合并小文件。
      归并排序,shuffle reader 拉取map端的数据并归排序。用到两种排序算法:快速排序与并归排序。


©著作权归作者所有:来自51CTO博客作者mb5fd340813ba80的原创作品,请联系作者获取转载授权,否则将追究法律责任
mapreduce中shuffle中两种排序算法
https://blog.51cto.com/u_15054047/2621043

  1. shuffle为什么要排序?
    map端排序:
    map端排序是为了减轻reduce端排序的压力。

reduce端排序:
Reduce端需要对数据进行分组,将key相同的放在一起规约。
为达到目的,有两种算法:hashmap和sort,前者太耗内存,而排序可以通过归并对任意数据量分组,只要磁盘数据量够大就行。

  1. 说一下map是怎么到reduce的?

● 3.2 MapReduce工作流程

○ 简易版:

■ InputFormat --> Mapper --> Reducer --> OutputFormat

○ 详细版:

■ InputFormat -->标记分区 --> 环形缓冲区内进行排序、溢写 ---> 将溢写文件进行归并排序 --> Reduce:不同的Reduce节点copy Mapper阶段对应的文件 -->将copy的文件进行归并 -> 执行reduce方法 --> OutputFormat
  1. MR中的三次排序
    mr在Map任务和Reduce任务的过程中,一共发生了3次排序

1)当map函数产生输出时,会首先写入内存的环形缓冲区,当达到设定的阀值,在刷写磁盘之前,后台线程会将缓冲区的数据划分成相应的分区。在每个分区中,后台线程按键进行内排序

2)在Map任务完成之前,磁盘上存在多个已经分好区,并排好序的,大小和缓冲区一样的溢写文件,这时溢写文件将被合并成一个已分区且已排序的输出文件。由于溢写文件已经经过第一次排序,所有合并文件只需要再做一次排序即可使输出文件整体有序。

3)在reduce阶段,需要将多个Map任务的输出文件copy到ReduceTask中后合并,由于经过第二次排序,所以合并文件时只需再做一次排序即可使输出文件整体有序

在这3次排序中第一次是内存缓冲区做的内排序,使用的算法使快速排序,第二次排序和第三次排序都是在文件合并阶段发生的,使用的是归并排序。
————————————————
版权声明:本文为CSDN博主「小涛手记」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_47699191/article/details/111372266

  1. MapReduce的数据处理过程
    mapreduce流程

1、Inputformat会将切片中每一行的数据表达成kv形式,k是这一行数据的偏移量(longwritable),v是每一行数据内容类型(text)
2、map端会将每一行的数据进行处理,生成若干对kv作为中间结果,经过hashpartition分区后进入环形缓冲区,当环形缓存区满80%后会将数据溢写磁盘(这里会有溢写文件的合并叫做merge)
3、然后reduce端通过http的形式请求task tracker获取map task输出的文件,reduce端接收到数据后会进行一系列的合并,排序操作然后输出最终结果.

————————————————
版权声明:本文为CSDN博主「小涛手记」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_47699191/article/details/111372266

  1. mapjoin的原理(实现)?应用场景?
    MapJoin简单说就是在Map阶段将小表读入内存,顺序扫描大表完成Join。
    mapJoin原理:适用于小表join大表,使用DistributedCache机制将小表存储到各个Mapper进程所在机器的磁盘空间上,各个Mapper进程读取不同的大表分片,将分片中的每一条记录与小表中所有记录进行合并。
    合并后直接输出map结果即可得到最终结果。
    (Distribut : 分配、分发)

  2. reducejoin如何执行(原理)
    ● 3.7.1 Reduce Join

    ○ Map端的主要工作:为来自不同表或者文件的key/value对打标签,以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标记作为value,最后进行输出。

    ○ Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组中将那些来源于不同文件的记录分开,最后进行合并就OK了。

    ○ 缺点:这种方式中, 合并的操作是在Reduce阶段完成的,Reduce端的处理压力太大,Map节点的运算负载则很低,资源的利用率不够,且在Reduce阶段及易产生数据倾斜问题。

    ○ 结论:推荐使用MapJoin避免使用ReduceJoin
    ○ 解决:在Map端实现数据合并,增加Shuffle过程

  3. MapReduce为什么不能产生过多小文件
    ● 2.4.1 Hadoop小文件弊端

    ○ HDFS 上每个文件都要在 NameNode 上创建对应的元数据,这个元数据的大小约为150byte,这样当小文件比较多的时候,就会产生很多的元数据文件,一方面会大量占用NameNode 的内存空间,另一方面就是元数据文件过多,使得寻址索引速度变慢。

    ○ 小文件过多,在进行 MR 计算时,会生成过多切片,需要启动过多个 MapTask。每个MapTask 处理的数据量小,导致 MapTask 的处理时间比启动时间还小,白白消耗资源。

● 1) 小文件优化的方向:

○ 1)在数据采集的时候,就将小文件或小批数据合成大文件再上传 HDFS。

○ 2)在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并。

○ 3)在 MapReduce 处理时,可采用 CombineTextInputFormat 提高效率

○ 4)开启 uber 模式,实现 jvm 重用

  1. MapReduce分区及作用
    一、Mapreduce中Partition(分区)的概念
    Hadoop采用的派发方式默认是根据散列值来派发,当数据进行map转换后,根据map后数据的key值进行散列派发,这样的一个弊端就是当数据key的值过于相似且集中时,大部分的数据就会分到同一个reducer中,从而造成数据倾斜,影响程序的运行效率。
    所以需要我们自己定制partition来根据自己的要求,选择记录的reducer。自定义partitioner很简单,只要自定义一个类,并且继承Partitioner类,重写其getPartition方法就好了,在使用的时候通过调用Job的setPartitionerClass指定一下即可。
    Map的结果,会通过partition分发到Reducer上。如果设置了Combiner,Map的结果会先送到Combiner进行合并,再将合并后数据发送给Reducer。

    Mapper最终处理的键值对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer那。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。它只有一个方法,
    getPartition(Text key, Text value, int numPartitions)
    系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。这样就保证如果有相同的key值,肯定被分配到同一个reducre上。

二、Mapreduce中分组的概念
分组,顾名思义,就是根据Map<key,value>中的key进行分组。在同一个分区呢,相同key的值记录是属于同一个分组的,相当于groupby key的功能。
————————————————
版权声明:本文为CSDN博主「Dzhantao」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/m0_37065162/article/details/80881520
22. ReduceTask数量和分区数量关系
● 分区总结:

○ 如果ReduceTask的数量 > getPartition的结果数

■ 则会多产生几个空的输出文件

○ 如果ReduceTask的数量 < getPartition的结果数

■ 会有一部分分区数据无处安放,会报错Exception

○ 如果ReduceTask的数量 = 1

■ 不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也只会产生一个结果文件

○ 分区号是从0开始,逐一累加。

  1. Map的分片有多大
    Hadoop中在计算一个JOB需要的map数之前首先要计算分片的大小。计算分片大小的公式是:

goalSize (推荐值)= totalSize(job总输入数据量) / mapred.map.tasks(推荐个数)
minSize = max {mapred.min.split.si***SplitSize(默认为1)}
splitSize = max (minSi***(goalSize, dfs.block.size(文件块大小)))

totalSize是一个JOB的所有map总的输入大小,即Map input bytes。
参数mapred.map.tasks的默认值是2,我们可以更改这个参数的值。
计算好了goalSize之后还要确定上限和下限。

下限是max {mapred.min.split.si***SplitSize} 。
参数mapred.min.split.size的默认值为1个字节,minSplitSize随着File Format的不同而不同。
上限是dfs.block.size,它的默认值是64兆。
举几个例子,例如Map input bytes是100兆,mapred.map.tasks默认值为2,那么分片大小就是50兆;如果我们把mapred.map.tasks改成1,那分片大小就变成了64兆。

具体例子:
https://blog.csdn.net/weixin_33724659/article/details/92944605
24. MapReduce join两个表的流程?

  1. Reduce Join
      map 端的主要工作:为来自不同表(文件)的key/value 打标记以区别不同来源的记录。然后用连接字段作为 key,其余部分和新加的标记作为 value,然后进行输出。
      reduce 端的主要工作:在 reduce 端,以连接字段作为key 的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在 map 阶段已经打标记)分开,最后进行合并就 ok 了。
      通俗的说,就是在map 阶段, 把关键字作为key输出,并在value中标记出数据是来自data1还是data2。因为在shuffle阶段已经自然按key分组,reduce 阶段,判断每一个value是来自data1还是data2,在内部分成2组,做集合的乘积。。
      但是,此方法有缺点:
      1、map 阶段没有对数据瘦身,shuffle的网络传输和排序性能很低。
      2、reduce 端对2个集合做乘积计算,很耗内存,容易导致OOM。

  2. Map Join
      两份数据中,如果有一份数据比较小,小数据全部加载到内存,按关键字建立索引。大数据文件作为map的输入文件,对map()方法的每一对输入,都能够方便地和已加载到内存的小数据进行连接。把连接结果按key输出,经过shuffle阶段,reduce端得到的就是已经按key分组的,并且连接好了的数据。
      此方法的特点:
      1、要使用hadoop中的DistributedCache(分布式缓存)把小数据分布到各个计算节点,每个map节点都要把小数据库加载到内存,按关键字建立索引。
      2、有明显的局限性:数据中有一份数据比较小,在map端,能够把它加载到内存,并进行join操作。
    ————————————————
    版权声明:本文为CSDN博主「迷茫君」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_41610418/article/details/81488342

  3. 手撕一段简单的MapReduce程序

  4. reduce任务什么时候开始?
    hadoop MapReduce在reduce阶段分为三步,即:shuffle,sort,reduce。在mapred-site.xml中有个参数可以调整什么时候开始执行reduce操作,mapred.reduce.slowstart.completed.maps ,默认值是0.95,即在mapper执行完95%时开始执行reduce操作,我们可以根据自己的需要调整,0.0到1.00之间。
    ————————————————
    版权声明:本文为CSDN博主「书凡世界」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/shufanshijie/article/details/8893856

  5. MapReduce的reduce使用的是什么排序?

  6. 在MapReduce的shuffle阶段共有三次排序,分别是:
    Map的溢写(spill)阶段:根据分区及key进行快速排序;
    Map合并溢写文件阶段:将同一个分区的多个溢写文件经过多轮归并排序最终合并为一个文件;
    Reduce输入阶段:将同一个分区,不同map task的数据文件进行归并排序

  7. 在MapReduce的整个过程中,默认对输出的KV键值对按照key进行快速排序
    Map输出排序:即map溢写排序
    reduce输出排序:reduce输出时会对KV键值对按照key进行排序
    ————————————————
    版权声明:本文为CSDN博主「YF_Li123」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/YF_Li123/article/details/108498586

  8. MapReduce怎么确定MapTask的数量?
    在MapReduce当中,每个mapTask处理一个切片split的数据量,注意切片与block块的概念很像,但是block块是HDFS当中存储数据的单位,切片split是MapReduce当中每个MapTask处理数据量的单位。在介绍map task的数量及切片机制之前先了解这两个概念:

block块(数据块,物理划分)
block是HDFS中的基本存储单位,hadoop1.x默认大小为64M,而hadoop2.x默认块大小为128M。文件上传到HDFS,就要划分数据成块,这里的划分属于物理的划分(实现机制也就是设置一个read方法,每次限制最多读128M的数据后调用write进行写入到hdfs),块的大小可通过 dfs.block.size配置。block采用冗余机制保证数据的安全:默认为3份,可通过dfs.replication配置。
注意:当更改块大小的配置后,新上传的文件的块大小为新配置的值,以前上传的文件的块大小为以前的配置值。

split分片(数据分片,逻辑划分)
Hadoop中split划分属于逻辑上的划分,目的只是为了让map task更好地获取数据。split是通过hadoop中的 InputFormat 接口中的getSplits()方法得到的。数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。

一、MapTask个数

1、一个Job的Map阶段并行度由客户端在提交Job时的切片树决定。
2、每一个Split切片分配一个MapTask并行实例处理

在运行我们的MapReduce程序的时候,我们可以清晰的看到会有多个mapTask的运行,那么maptask的个数究竟与什么有关,是不是maptask越多越好,或者说是不是maptask的个数越少越好呢???我们可以通过MapReduce的源码进行查看mapTask的个数究竟是如何决定的,查看FileInputFormat的源码,里面getSplits的方法便是获取所有的切片,其中有个方法便是获取切片大小 :

protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
切片大小的计算公式:
Math.max(minSize, Math.min(maxSize, blockSize));
mapreduce.input.fileinputformat.split.minsize=1 默认值为1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue
blockSize为128M

二、如何控制mapTask的个数
MapTask数量设置不当带来的问题:

Map Task数量过多的话,会产生大量的小文件, 过多的Mapper创建和初始化都会消耗大量的硬件资源 。
Map Task数量过少,就会导致并发度过小,Job执行时间过长,无法充分利用分布式硬件资源。

那么,如果需要控制maptask的个数,我们只需要调整maxSize和minsize这两个值,那么切片的大小就会改变,切片大小改变之后,mapTask的个数就会改变:

如果想增加map个数,则通过FileInputFormat.setMaxInputSplitSize(job,long bytes)方法设置最大数据分片大小为一个小于默认blockSize的值,越小map数量越多。
如果想减小map个数,则FileInputFormat.setMinInputSplitSize(job,long bytes) 方法设置最小数据分片大小为一个大于默认blockSize的值,越大map数量越少。
当然,如果输入中有很多小文件,依然想减少map个数,则需要将小文件使用HDFS提供的API合并为大文件,然后使用方法2。

三、关于分片确定的临界问题
分片大小的数量一定是按照公式Math.max(minSize, Math.min(maxSize, blockSize))计算的吗?
可做以下试验:文件大小 297M(311349250),块大小128M
测试代码
FileInputFormat.setMinInputSplitSize(job, 301349250);
FileInputFormat.setMaxInputSplitSize(job, 10000);

由上面分片公式算出分片大小为301349250, 比 311349250小, 理论应该为2个map, 但实际测试后Map个数为1, 这是为什么呢?
分片的计算中,会考虑空间利用问题,每次分出一个分片后,都会判断剩下的数据能否在一定的比率(slop变量,默认10%)内压缩到当前分片中,如果不大于默认比率1.1,则会压缩到当前分片中。源码如下:

四、ReduceTask的数量如何确定?
Reduce任务是一个数据聚合的步骤,数量默认为1。使用过多的Reduce任务则意味着复杂的shuffle,并使输出文件数量激增。而reduce的个数设置相比map的个数设置就要简单的多,只需要设置在驱动程序中通过job.setNumReduceTasks(int n)即可。
————————————————
版权声明:本文为CSDN博主「皮哥四月红」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_43230682/article/details/107861581

  1. Map数量由什么决定
    在执行MapReduce作业时,在map阶段读取数据前,FileInputFormat会根据一定的规则将将输入文件split成数据块进行分布式读取。
    split的个数决定了map的个数。
  1. MapReduce的task数目划分

● 3.1.1 切片与MapTask并行度决定机制

○ 数据块:Block 是 HDFS 物理上把数据分成一块一块。数据块是 HDFS 存储数据单位。

○ 数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是 MapReduce 程序计算输入数据的单位,一个切片会对应启动一个MapTask。

● 3.3.2 Partition分区

○ 问题:要求将统计结果按照条件输出到不同的文件中(分区)。

○ 默认的分区规则是根据key的hashCode对ReduceTasks的个数取模得到的。用户无法控制那个key的数据存储到哪个分区,一个分区将对应一个ReduceTask。

○ 解决:自定义分区器(Partitioner)

■ 1)自定义类继承Partitioner,重写getPartition()方法

■ 2)在job驱动中,设置自定义的Partitioner

■ 3)自定义Partition之后,要根据自定义的Oartitioner的逻辑设置相应数量的ReduceTask。

○ 分区总结:

■ 如果ReduceTask的数量 > getPartition的结果数

  ● 则会多产生几个空的输出文件

■ 如果ReduceTask的数量 < getPartition的结果数

  ● 会有一部分分区数据无处安放,会报错Exception

■ 如果ReduceTask的数量 = 1 

  ● 不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也只会产生一个结果文件

■ 分区号是从0开始,逐一累加。
  1. MapReduce作业执行的过程中,中间的数据会存在什么地方?不会存在内存中么?
    map输出结果会先写到缓存中(如环形缓冲区),缓存写满后会产生溢写,把中间结果归并写到本地磁盘中。

  2. Mapper端进行combiner之后,除了速度会提升,那从Mapper端到Reduece端的数据量会怎么变?
    数据在进入Combine处理后会进行局部汇总操作,根据具体的业务,数据会发生对应的变化。(不影响最终结果的情况下,对数据做了局部的汇总)

  3. map输出的数据如过超出它的小文件内存之后,是落地到磁盘还是落地到HDFS中?
    中间结果在内存中保存不下,会进行落盘操作。只有MapReduce程序执行完成后才会将计算结果写到HDFS,所以,中间结果落盘应该是落盘到本地磁盘中。

  4. Map到Reduce默认的分区机制是什么?

● 默认的分区规则是根据key的hashCode对ReduceTasks的个数取模得到的。用户无法控制那个key的数据存储到哪个分区,一个分区将对应一个ReduceTask。

  1. 结合wordcount述说MapReduce,具体各个流程,map怎么做,reduce怎么做 ?
    WordCount是最常见、最基本的一个需求,例如进行词频统计、用户访问记录统计。如果数据量非常小的情况下,使用单机、批处理的方式就可以很快得到结果。但是如果数据量非常大,数据量可能是10G、100G等等。这种情况下使用单机、批处理的方式就非常低效率。所以这个时候就需要借助于分布式的思想进行处理——使用集群进行处理。就拿词频统计来说,处理的过程步骤如下图。

Input就是将需要进行处理的数据输入,输入后会经过Spliting操作,将输入的数据进行切分,将众多的数据划分成不同的小块然后发送至不同的节点上,在节点上进行Mapping操作。Mapping操作后得到每个word以及value值,需要将这些结果再Shuffling操作。此操作后每个关键词会在一组,然后又发送到不同的节点进行统计(Reducing),统计后由一节点进行汇总。得到最后的wordcount结果。整个过程充分体现了分而治之的思想。

由上面的图片可知,WordCount需要我们自己实现的只有Mapping和Reducing两个部分,即对应Map和Reduce两个部分【这也是MapReduce框架的由来】。所以在实践时,需要先准备这两个部分的代码。
————————————————
版权声明:本文为CSDN博主「黑白键的约定」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_35363507/article/details/116594029

  1. MapReduce数据倾斜产生的原因及其解决方案
    发生数据倾斜的根本原因在于,shuffle之后,key的分布不均匀,使得大量key集中在某个reduce节点,导致此节点处理的数据量过大。要解决此问题,主要可以分为两大块:一是尽量不shuffle;二是shuffle之后,在reduce节点上的key分布尽量均匀。
    具体来说,在实际开发过程中,以下几个业务过程可能会导致数据倾斜:
  2. 不可拆分的大文件引发的数据倾斜
  3. 业务无关的数据引发的数据倾斜
  4. 多维聚合计算数据膨胀引起的数据倾斜
  5. 无法削减中间结果的数据量引发的数据倾斜
  6. 两个hive数据表连接时引发的数据倾斜

1.不可拆分的大文件引发的数据倾斜
当对文件使用GZIP压缩等不支持文件分割操作的压缩方式,在日后有作业涉及读取压缩后的文件时,该压缩文件只会被一个任务所读取。如果该压缩文件很大,则处理该文件的Map需要花费的时间会远多于读取普通文件的Map时间,该Map任务会成为作业运行的瓶颈。这种情况也就是Map读取文件的数据倾斜。

2.业务无关的数据引发的数据倾斜
实际业务中可能有大量的null值或者一些无意义的数据参与到计算作业中,这些数据可能来自业务为上报或者数据规范将某类数据进行归一化变成空值或空字符串等形式,这些与业务无关的数据引入导致在分组聚合或者在执行表连接时发生数据倾斜。
倾斜原因:
key相同的太集中,导致倾斜(很多和业务无关的null值)
解决方案:
对于group by操作,有两种解决思路:1.直接对null值进行过滤,2.对null值添加随机前缀。
对于join操作,有两种思路:1.手工分割,2.对null值添加随机前缀。

3.多维聚合计算数据膨胀引起的数据倾斜
在多维聚合计算时,如果进行分组聚合的字段较多,如下:
select a,b,c,count(1)from log group by a,b,c with rollup;
注:对于最后的with rollup关键字不知道大家用过没,with rollup是用来在分组统计数据的基础上再进行统计汇总,即用来得到group by的汇总信息。
如果上面的log表的数据量很大,并且Map端的聚合不能很好地起到数据压缩的情况下,会导致Map端产出的数据急速膨胀,这种情况容易导致作业内存溢出。如果log表含有数据倾斜key,会加剧Shuffle过程的数据倾斜。
注:对于最后的with rollup关键字拆分为如下几个sql:
SELECT a, b, c, COUNT(1)
FROM log
GROUP BY a, b, c;

SELECT a, b, NULL, COUNT(1)
FROM log
GROUP BY a, b;

SELECT a, NULL, NULL, COUNT(1)
FROM log
GROUP BY a;

SELECT NULL, NULL, NULL, COUNT(1)
FROM log;
但是,上面这种方式不太友好,因为现在是对3个字段进行分组聚合,如果是5个或者10个,则需要拆解的SQL语句会更多。
在Hive中可以通过参数 hive.new.job.grouping.set.cardinality 配置的方式自动控制作业的拆解,该参数默认值是30。表示针对grouping sets/rollups/cubes这类多维聚合的操作,如果最后拆解的键组合大于该值,会启用新的任务去处理大于该值之外的组合。如果在处理数据时,某个分组聚合的列有较大的倾斜,可以适当调小该值。
a. 确实无法减少数据量引发的数据倾斜
在一些操作中无法削减中间结果,例如使用collect_list聚合函数,存在如下SQL:
select s_age,collect_list(s_score) list_score
from student
group by s_age
s_age有数据倾斜,但如果数据量大到一定的数量,会导致处理倾斜的Reduce任务产生内存溢出的异常。
是否可以开启hive.groupby.skewindata参数来优化。我们接下来分析下:
开启该配置会将作业拆解成两个作业,第一个作业会尽可能将Map的数据平均分配到Reduce阶段,并在这个阶段实现数据的预聚合,以减少第二个作业处理的数据量;第二个作业在第一个作业处理的数据基础上进行结果的聚合。
hive.groupby.skewindata的核心作用在于生成的第一个作业能够有效减少数量。但是对于collect_list这类要求全量操作所有数据的中间结果的函数来说,明显起不到作用,反而因为引入新的作业增加了磁盘和网络I/O的负担,而导致性能变得更为低下。
解决方案:
这类问题最直接的方式就是调整reduce所执行的内存大小。
调整reduce的内存大小使用mapreduce.reduce.memory.mb这个配置。
5.表连接时引发的数据倾斜
两表进行join时,如果表连接的键存在倾斜,那么在shuffle阶段必然会引起数据倾斜。
解决方案:
5.1 MapJoin
通常做法是将倾斜的数据存到分布式缓存中,分发到各个Map任务所在节点。在map阶段完成了join操作,即MapJoin,这避免了Shuffle,从而避免了数据倾斜。
set hive.auto.convert.join = true;
set hive.mapjoin.smalltable.filesize=25000000;
优点: 运行时转换为mapjoin,无reduce阶段,运行时间极短
缺点: 适用场景有限,需要占用分布式内存。由于是将小表加载进内存所以需要注意小表的大小。
5.2 手工分割
适用场景有限,因为倾斜的key,除了key=A之外,还有其他key也可能会发生倾斜。
5.3大表添加N中随机前缀,小表膨胀N倍数据
适用场景:小表不是很小,不太方便用mapjoin。

总结
以上就是面对hive数据倾斜时,经常采用的一些方法。我们在遇到实际问题时,可以参考上述的方案,但是不需要完全照搬。因为处理数据倾斜是一个综合性的事情,考察的不仅是技术能力,更是对业务的熟悉程度。在处理数据倾斜问题时,我们首先要对表中的字段的意思有所了解。
以上面的五种情况来说,对于null值,除了剔除以及添加随机前缀,我们可以进一步了解业务,在源表中是否可能将null值进行补全,或者在源表中是否可以给null值赋予一个随机值。再比如进行join操作时,如果有数据倾斜,可以先看是否有数据发散的情况,如果有数据发散,可以考虑先处理数据发散的问题。可以对代码进行拆分,先保证数据不发散,再进行join。
我们不管处理什么问题,不管使用的工具是hive、spark或者python,首先要做到熟悉业务,了解数据的含义。如果抛开业务,只是照搬书本上的所讲的技术,对代码进行调优,有时候也能解决问题,但是对自己的提升较为片面。

作者:checurry
链接:https://www.jianshu.com/p/9ca83f63a7e1
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

  1. Map Join为什么能解决数据倾斜
    什么是数据倾斜?
    在大数据处理过程中,不怕数据大,就怕数据倾斜。
    数据倾斜就是在mapreduce过程中,一个或几个reduce端处理的数据量过大,明显远大于平均值,导致少数的reduce端的任务长时间无法完成,而其他reduce端又无事可做,明显的效率低下。

数据倾斜的一些原因:
1、关联查询时,有一个较小的表的key比较集中key的分布不均就导致在分区时,某一个或几个分区的数量过多。
2、使用group by但没有用聚合函数,导致维度过小,某值的数量过多使用map join解决大小表关联造成的数据倾斜

map join 概念:将其中做连接的小表(全量数据)分发到所有 MapTask 端进行 Join,从而避免了 reduceTask,前提要求是内存足以装下该全量数据。

map join通常用于一个很小的表和一个大表进行join的场景,具体小表有多小,由参数 hive.mapjoin.smalltable.filesize来决定,该参数表示小表的总大小,默认值为25000000字节,即25M。 一般默认就够了,无须修改。

使用map join解决小表关联大表造成的数据倾斜问题。这个方法使用的频率很高。

每个MapTask载入了大表的一个数据块做处理,载入小表的所有数据做处理,省去了ReduceTask,避免了分区不均,提高了效率。

大表放硬盘,小表放内存。

在 hive 中,直接提供了能够在 HQL 语句指定该次查询使用 map join,map join 的用法是 在查询/子查询的SELECT关键字后面添加/*+ MAPJOIN(tablelist) */提示优化器转化为map join(早期的 Hive 版本的优化器是不能自动优化 map join 的)。 其中 tablelist 可以是一个 表,或以逗号连接的表的列表。tablelist 中的表将会读入内存,通常应该是将小表写在 这里。

MapJoin 具体用法:

select /* +mapjoin(movies) */
a.title, b.rating
from movies a
join ratings b
on a.movieid =b.movieid;

这里 /* +mapjoin(movies) */的意思就是强制规定小表为movies,并强制使用mapjoin。

如果两张都是大表,能不能使用mapjoin?
可以。
把其中一张大表切分成小表,然后分别 map join

select /+mapjoin(x)/* from log a
left outer join (
select /+mapjoin(c)/ d.*
from ( select distinct user_id from log ) c join users d on c.user_id = d.user_id
) x
on a.user_id = x.user_id;
————————————————
版权声明:本文为CSDN博主「白修修」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_41639302/article/details/107235828

  1. MapReduce运行过程中会发生OOM,OOM发生的位置?
    本文主要讲述了在使用MapReduce计算框架时发生Java.lang.OutOfMemory(OOM)的故障及其处理方式。
    二、概括回顾
    先概括下,Hive中出现OOM的异常原因大致分为以下几种:
  2. Map阶段OOM。
  3. Reduce阶段OOM。
  4. Driver提交Job阶段OOM。

Map阶段OOM:

  1. 发生OOM的几率很小,除非你程序的逻辑不正常,亦或是程序写的不高效,产生垃圾太多。

Reduce阶段OOM:

  1. data skew 数据倾斜
    data skew是引发这个的一个原因。
    key分布不均匀,导致某一个reduce所处理的数据超过预期,导致jvm频繁GC。

  2. value对象过多或者过大
    某个reduce中的value堆积的对象过多,导致jvm频繁GC。
    解决办法:

  3. 增加reduce个数,set mapred.reduce.tasks=300,。

  4. 在hive-site.xml中设置,或者在hive shell里设置 set mapred.child.java.opts = -Xmx512m,或者只设置reduce的最大heap为2G,并设置垃圾回收器的类型为并行标记回收器,这样可以显著减少GC停顿,但是稍微耗费CPU。
    set mapred.reduce.child.java.opts=-Xmx2g -XX:+UseConcMarkSweepGC;

  5. 使用map join 代替 common join. 可以set hive.auto.convert.join = true

  6. 设置 hive.optimize.skewjoin = true 来解决数据倾斜问题

Driver提交job阶段OOM:
job产生的执行计划的条目太多,比如扫描的分区过多,上到4k-6k个分区的时候,并且是好几张表的分区都很多时,这时做join。
究其原因,是 因为序列化时,会将这些分区,即hdfs文件路径,封装为Path对象,这样,如果对象太多了,而且Driver启动的时候设置的heap size太小,则会导致在Driver内序列化这些MapRedWork时,生成的对象太多,导致频繁GC,则会引发如下异常:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at sun.nio.cs.UTF_8.newEncoder(UTF_8.java:53)
at java.beans.XMLEncoder.createString(XMLEncoder.java:572)

总结:
遇到这种问题:
一是HiveQL的写法上,尽量少的扫描同一张表,并且尽量少的扫描分区。扫太多,一是job数多,慢,二是耗费网络资源,慢。
二是Hive的参数调优和JVM的参数调优,尽量在每个阶段,选择合适的jvm max heap size来应对OOM的问题。

  1. MapReduce用了几次排序,分别是什么?
    在Map任务和Reduce任务的过程中,一共发生了3次排序

1)当map函数产生输出时,会首先写入内存的环形缓冲区,当达到设定的阀值,在刷写磁盘之前,后台线程会将缓冲区的数据划分成相应的分区。在每个分区中,后台线程按键进行内排序

2)在Map任务完成之前,磁盘上存在多个已经分好区,并排好序的,大小和缓冲区一样的溢写文件,这时溢写文件将被合并成一个已分区且已排序的输出文件。由于溢写文件已经经过第一次排序,所有合并文件只需要再做一次排序即可使输出文件整体有序。

3)在reduce阶段,需要将多个Map任务的输出文件copy到ReduceTask中后合并,由于经过第二次排序,所以合并文件时只需再做一次排序即可使输出文件整体有序

在这3次排序中第一次是内存缓冲区做的内排序,使用的算法使快速排序,第二次排序和第三次排序都是在文件合并阶段发生的,使用的是归并排序。
————————————————
版权声明:本文为CSDN博主「数仓大山哥」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/panfelix/article/details/107246662
40. MapReduce压缩方式

● 1.3 压缩方式选择

○ 1.3.1 Gzip压缩

■ 优点:压缩率比较高,并且压缩/解压的速度比较快,(压缩率第二,解压缩速率倒数第二)

■ 缺点:不支持对文件Split(切分)

○ 1.3.2 Bzip2压缩

■ 优点:支持对文件进行Split;具有很高的压缩率,比Gzip压缩率都高(压缩率第一,解压缩速率倒数第一)

■ 缺点:压缩/解压速度慢

○ 1.3.3 Lzo压缩

■ 优点:合理的压缩率,压缩/解压速度比较快,支持Split,是Hadoop中最流行的压缩格式。

■ 缺点:压缩率比Gzip要低一些,Hadoop本身不支持,需要单独安装

○ 1.3.4 Snappy压缩

■ 优点:高速的压缩速度和合理的压缩率

■ 缺点:不支持Split,压缩率比Gzip要低,Hadoop本省也不支持,需要单独安装。
  1. MapReduce中怎么处理一个大文件
    对文件进行切分,生成多个Split切片,作为多个MapTask交给Hadoop集群处理。
#大数据求职##实习##笔试题目##技术栈#
 类似资料: