HDFS之MapReduce过程

  • Post author:
  • Post category:其他


1.client在提交任务之前,根据参数配置形成任务分配的规划。(比如,切分数据块)

2.client端提交任务信息,计算出所需要启动的MapTask的数量,然后向Yarn申请执行任务所需要的资源。

3.Yarn启动计算任务,读取需要计算的文本数据,然后调用自定义的Mapper(你自己写的Mapper)进行逻辑运算,然后调用Context.write(k,v)写出数据,生成新的K,V值。

4.数据处理完之后,调用OutPutCollector.collect()输出结果,此函数内部会将Key/Value进行分区(调用Partitioner),数据写入到一个环形内存缓冲区。

5.利用快排对缓存区数据进行排序,排序规则如下:

  1. 先按照分区编号Partition进行排序
  2. 在一个分区内按照key进行排序。

6.把分区排序后的数据写入分区工作目录下的临时文件中,如果设置了Combiner,MapTask会以分区为单位对这些临时文件进行一次Combine操作(多轮递归合并,默认每次合并10个文件【io.sort.factor】),最终每个分区只会生成一个大文件(output/file.out)和该文件的索引文件(output/file.out.index)。(多轮递归合并:就是在待合并列表中先合并10个文件生成一个文件,然后把生成的文件重新放到待合并列表的中,依次循环,直至最后一个文件)

7.分区数据写到内存索引数据结构SpillRecord中,元信息包括临时文件的偏移量、压缩前数据大小和压缩后数据大小。当前索引大小超过1M,则将内存索引写入到文件中。

8.ReduceTask从各个MapTask上Copy一片数据,如果其大小超过一定得阈值,则写到磁盘上,否则放入内存中。同时ReduceTask会启动两个线程,同时对磁盘和内存中的数据文件进行合并,防止文件过多(每个MapTask中含有不同的数据,根据Partition,这些数据需要发送到不同的ReduceTask中。ReduceTask会自己拉取这些数据)

9.MapTask中的数据已经进行了排序,因此ReduceTask对数据进行一次归并排序即可让数据有序。

10.调用GroupingComparator组件进行分组操作,其目的是为了决定哪些数据作为一组。

11.调用Reduce函数将数据计算结果写入到HDFS上。至此完成MapReduce。



版权声明:本文为qq_37486639原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。