1.client在提交任务之前,根据参数配置形成任务分配的规划。(比如,切分数据块)
2.client端提交任务信息,计算出所需要启动的MapTask的数量,然后向Yarn申请执行任务所需要的资源。
3.Yarn启动计算任务,读取需要计算的文本数据,然后调用自定义的Mapper(你自己写的Mapper)进行逻辑运算,然后调用Context.write(k,v)写出数据,生成新的K,V值。
4.数据处理完之后,调用OutPutCollector.collect()输出结果,此函数内部会将Key/Value进行分区(调用Partitioner),数据写入到一个环形内存缓冲区。
5.利用快排对缓存区数据进行排序,排序规则如下:
- 先按照分区编号Partition进行排序
- 在一个分区内按照key进行排序。
6.把分区排序后的数据写入分区工作目录下的临时文件中,如果设置了Combiner,MapTask会以分区为单位对这些临时文件进行一次Combine操作(多轮递归合并,默认每次合并10个文件【io.sort.factor】),最终每个分区只会生成一个大文件(output/file.out)和该文件的索引文件(output/file.out.index)。(多轮递归合并:就是在待合并列表中先合并10个文件生成一个文件,然后把生成的文件重新放到待合并列表的中,依次循环,直至最后一个文件)
7.分区数据写到内存索引数据结构SpillRecord中,元信息包括临时文件的偏移量、压缩前数据大小和压缩后数据大小。当前索引大小超过1M,则将内存索引写入到文件中。
8.ReduceTask从各个MapTask上Copy一片数据,如果其大小超过一定得阈值,则写到磁盘上,否则放入内存中。同时ReduceTask会启动两个线程,同时对磁盘和内存中的数据文件进行合并,防止文件过多(每个MapTask中含有不同的数据,根据Partition,这些数据需要发送到不同的ReduceTask中。ReduceTask会自己拉取这些数据)
9.MapTask中的数据已经进行了排序,因此ReduceTask对数据进行一次归并排序即可让数据有序。
10.调用GroupingComparator组件进行分组操作,其目的是为了决定哪些数据作为一组。
11.调用Reduce函数将数据计算结果写入到HDFS上。至此完成MapReduce。