MapReduce问答题

一、描述一下手写MR的大概流程和规范

写mapper类

  • 继承Hadoop提供的Mapper类
  • 重写map()方法
  • 在map()方法中写业务逻辑
  • context对象输出<k,v>

写reducer类

  • 继承Hadoop提供的Reducer类
  • 重写reduce()方法
  • 在reduce()方法中写业务逻辑
  • context对象输出<k,v>到结果文件

写driver类

  • 编写MR作业提交流程和自定以配置项目

二、如何实现Hadoop中的序列化,以及Hadoop的序列化和Java的序列化有什么区别?

实现:

  1. 在具体bean对象中实现Writable接口
  2. 写无参构造
  3. 重写序列化、反序列化方法下

区别:

​ Java的序列化比较重量级,会对类数据及类的继承结构等信息全部序列化传递

​ 而Hadoop只序列化类的数据,轻量高效

三、概述一下MR程序的执行流程

​ InputFormat读取HDFS中的数据文件,并在内部切片,每一个切片对应搞一个MT任务,MT按照文件逐行处理数据,每一行数据调用一次Mapper里我们自定义的map()方法,而map()方法里是我们自己写的处理逻辑,但主要任务还是把数据通过context对象输出到磁盘里

​ 在MT处理完数据、RT开始前,数据会走一趟shuffle

​ 数据走完shuffle之后,RT会先把MT处理完的数据拷贝一份,每一组相同key的values调用一次Reducer类里边我们自定义的reduce()方法,数据处理完之后通过OutputFormat把结果数据再写到磁盘上

四、InputFormat负责数据写的时候要进行切片,为什么切片大小默认是128M

​ 切片大小是由minSize,MaxSize,blockSize三个参数调整决定的,默认切片大小为blocksize大小=128M,而如果读数据的时候切片大小不等于块大小,在集群环境下可能会造成跨机器读数据的情况,这样会造成额外的资源消耗,拖慢整体MR程序执行效率,因此如果切片大小为块大小,一次读数据读一块就会避免上述情况。

五、描述一下切片的逻辑(从源码角度描述)

​ FileInputFormat----->getSplits()

  1. 找到数据存储的目录,把待处理的输入数据的元数据信息放入一个集合

  2. 开始遍历待处理目录下的每一个文件,默认不忽略处理子目录

  3. 切片计算规则:minSize、MaxSize和blocksize共同决定切片大小,调整minSize>blocksize切片变大,调整maxSize<blockSize切片变小,默认大小为blocksize

    计算切片大小:

    ​ protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
    }

  4. 遍历第一个文件

    • 判断当前文件是不是空文件,能不能切分(是否压缩文件)
      • 非空,
        • 能切:获取文件真实数据(位置,大小),计算切片大小,默认切片大小=blocksize
        • 不能(压缩文件):一片
      • 空:一片
  5. 每次切片时,都要判断切完剩下的部分是否大于切块大小的 1.1 倍, 不大于 1.1 倍就划分一块切片(合理利用计算资源)

  6. 将切片信息写到一个切片规划文件中

六、CombineTextInputFormat机制是怎么实现的

​ 先设置切片最大值

​ 虚拟文件存储过程:

​ 当前文件和最大值比较:

​ 小于最大值,单独分一块

​ 大于最大值且小于2倍最大值,一分为二

​ 大于最大值2倍,先以最大值分一块,剩下的小于2倍最大值则一分为二

​ 切片过程:

​ 虚拟文件大小和最大值比较,如果小于最大值则和下一个虚拟文件合并共同成为一个切片,如果大于等于最大值则单独成为一个切片

七、阐述一下 Shuffle机制流程?

​ 在Map端的shuffle过程是对Map的结果进行分区、排序、合并,然后将属于同一划分(分区)的输出合并在一起并写在磁盘上,最终得到一个分区有序的文件

​ 在排序前,kv数据是先写入环形缓冲区的,当这些数据达到缓存区内存的80%,缓冲区就会按照写入的索引反向把这些数据溢写出去,每溢写一次,这些数据就会按照分区排序(快排),生成临时文件储存索引和数据,如果配置了combiner,还会把这些临时文件归并排序,最终按照分区合并输出为有序的结果文件落盘

​ 在Reducde段的shuffle过程是对Map端输出的数据进行拷贝、排序、分组等

拷贝数据时,先把Map方法输出的数据加载到内存缓冲区中,当内存缓冲区达到一定阈值,数据落盘,而后对其归并排序,最后按照相同key分组,交给reduce()方法处理输出。

八、在MR程序中由谁来决定分区的数量,哪个阶段环节会开始往分区中写数据?

​ 分区数量由用户需求决定,默认通过hash函数来分区

​ 在map shuffle阶段 ,环形缓冲区内存达80%以后会开始溢写数据到分区中

九、阐述MR中实现分区的思路(从源码角度分析)

​ 先获取RT的数量

​ 如果RT数量大于1,说明用户指定了RT数量,则通过Job的配置文件中获取一个Partitioner类的实现的类的class,再通过反射获取最终实现分区的实现类(自定义或者默认),从而得到分区号

​ 如果RT数量不大于1,说明用户没有指定RT数量,这时默认为1,直接new一个默认的分区器对象,getPartition()方法返回RT数量-1=0,也就是只有一个0号分区

​ 这时来到getPartitionerClass()方法中,通过配置项 mapreduce.job.partitioner.class 获取实现分区的类的class,如果没有获取到就使用默认的 HashPartitioner.class,如果获取到了就走用户自定义的分区逻辑

​ 其中HashPartitioner中的实现是通过getPartition()方法,返回当前key的hashcode值和RT数量取余得到当前key所属分区号

十、描述一下Hadoop中实现排序比较的规则(源码角度分析)

​ 排序比较的实现,简单一句话就是给参与比较的对象获取一个比较器对象,然后调用这个比较器对象里的compareTo()方法就能达到具体的效果。那么问题就在于这个比较器是如何获得的,下面就来分析一下。

​ 首先从Hadoop的MapTask里边的init方法里边呢,可以看到有一行代码是comparator=job.getOutputKeyComparator();从字面意思也能看出来这行代码就是用来获取比较器对象的,那往getOutputKeyComparator()方法里边跟进,源码会获取有没有指定自定义的比较器对象,如果有指定,那么就拿到设置好的比较器对象的class文件,然后利用反射把这个比较器对象实例化出来,到这也就拿到了比较器对象。

​ 但是如果没有指定自定义比较器对象的话,就没法通过反射的方式拿到比较器对象,接下来就走另外的逻辑:

​ 因为比较是根据Map端输出的key值来实现,所以先检验参与比较的对象有没有实现WritableComparable接口,如果没有实现,或者只实现了Writable或者Comparable其中一个接口,都会报错,因为源码里写死了只能实现WritableComparable才行。

​ 如果WritableComparable接口也实现了呢,接下来就通过当前参与比较的对象的class文件到 comparators 这个Map容器中获取比较器对象,如果在这里便能获取到比较器对象,那么当前参与比较的对象一定是Hadoop自身的数据类型,就比如Text类型,当Text类加载的时候,会将当前Text.class 作为key ,它的比较器对象作为value会放入到一个叫做comparators的Map容器中,这时因为当前参与比较的对象在类初始化的时候就在静态代码块里边给了一个比较器对象,并把这个比较器对象放到了Map容器中,所以以上可以直接去Map容器里边通过get(key)的方式拿比较器对象。

​ 考虑到GC垃圾回收啊或者是其他的故障可能导致静态代码块初始化比较器对象也失败了,所以去Map容器拿比较器对象的时候会用forceInit()强制加载一次,如果这还拿不到呢,就说明当前参与比较的对象不是Hadoop自身数据类型的而是我们自定义数据类型的对象,那这时候就只能通过Hadoop帮我们new一个比较器对象出来了。

十一、Hadoop中实现排序的两种方案分别是什么?

​ 第一种是让要参与比较的对象实现WritableComparable接口,指定泛型,重写compareTo()方法,该方法中编写具体规则

​ 第二种是自定义一个比较器对象,继承WritebleComparator类,重写compare()方法,在其无参构造器里super(比较器对象类.class) ,在Driver类中把自定义的比较器对象类.class 设置到Job里

十二、编写MR的时候什么情况下使用Combiner,具体实现流程是什么?

​ 在不影响业务逻辑的前提下,如果需要提升MR程序运行效率、减轻RT的压力、减少IO的开销时可以使用Combiner

​ 实现流程:

​ 自定义Combiner类继承Hadoop包下的Reducer

​ 在Job中setCombinerClass()方法中传入自定义的Combiner类.class

​ 因为Combiner流程相当于在Map阶段的Reducer,所以也可以把XxxReducer直接set到job中

十三、OutputFormat自定义实现流程描述一下

​ 自定义一个类XxxOutputFormat继承FileOutputFormat类

​ 重写返回类型为RecordWriter的getRecordWriter()方法,方法体return new XxxRecordWriter(job)

​ 自定义一个类XxxRecordWriter继承RecordWriter类

​ 重写RecordWriter中的write()方法和close()方法

​ 在Driver类中把自定义输出格式组件set到job中

十四、MR实现 ReduceJoin 的思路,以及ReduceJoin方案有哪些不足?

​ 先找到文件之间的相互关系,然后找到他们的关联字段

​ 在Map阶段对多个文件整合(注意标记数据来源),让关联字段作为key输出

​ 相同key对应的values进入reduce()方法中i以后,先把两个文件的数据分别放到各自的对象里

​ 把现在这组维护好的数据进行关联,最后得到期望的结果

不足:耗费资源过多,性能不高,如果处理数据倾斜就完犊子

十五、MR实现 MapJoin 的思路,以及MapJoin的局限性是什么?

​ 先找到文件之间的相互关系,然后找到他们的关联字段

​ 把小文件的数据映射到内存中的一个容器维护起来

​ MT处理大文件的时候,读一行数据就根据这行数据中的关联字段去内存里边找到对应的对象信息

​ 封装结果输出

局限性:MapJoin需要两个文件一个是大文件一个是小文件