工作流程一:
工作流程二:
一个完整的mapreduce程序在分布式运行时有三类实例进程:
1)MrAppMaster:负责整个程序的过程调度及状态协调
2)MapTask:负责map阶段的整个数据处理流程
3)ReduceTask:负责reduce阶段的整个数据处理流程
工作全流程详解:
上面图一和图二中的流程是整个MapReduce最全工作流程,主要包括MapTask阶段、Shuffle阶段和ReduceTask阶段,而Shuffle阶段和MapTask阶段、ReduceTask阶段都存在交集,具体流程如下:
- 准备好待处理的文本
- 客户端submit()前,获取待处理数据的信息,然后根据参数配置形成一个任务分配的规划
- 客户端向Yarn集群提出请求创建Mr appmaster并提交切片等相关信息:job.split、wc.jar(集群模式才需要)、job.xml
-
Yarn调用ResourceManager来创建Mr appmaster,而Mr appmaster则会根据切片的个数来创建几个Map Task。
于是,MapTask进程开始工作。
- MapTask们从文件中读取各自需要处理的数据,默认是TextInputFormat格式(可以自定义)。其实是里面的RecorderReader来读取,每读取一行之后返回给Mapper
- 在Mapper中调用map()方法来对每一行数据进行相关的业务上的逻辑运算处理
-
在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。而在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中(环形缓冲区默认大小是100M,数据处理的特点:左侧写索引或者是元数据信息,右侧写数据,写满80%后溢写到磁盘,然后反向又开始写,如此反复)。
自数据进入到环形缓冲区后,Shuffle过程正式开始。
- 进入到环形缓冲区之后在溢写之前会对数据进行一次排序。排序的方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
- 排完序之后就溢出到文件(分区且区内有序),整个过程会多次溢出到多个文件
- 在所有数据都溢出到文件之后,开始Merge归并排序(对同一个分区内溢出的多个有序的结果文件合并成一个大的溢出文件且完成归并排序)
-
之后的Combiner合并为可选流程:分区内合并和压缩。之后,写入到磁盘。
至此MapTask的执行过程基本结束。
- 在所有Map Task任务都完成之后,根据分区的数量来启动相应数量的Reduce Task,并告知ReduceTask处理数据范围(数据分区)(有几个分区就启动几个Reduce Task,每个Reduce Task专门处理同一个分区的数据,比如处理MapTask1中partition0和MapTask2中partition0的数据)
- ReduceTask根据自己的分区号,去各个MapTask机器上拷贝相应分区内的数据到本地内存缓冲区,缓冲区不够的话就溢写到磁盘。待所有数据拷贝完毕之后,ReduceTask会将这些文件再进行归并排序
-
排好序之后按照相同的key分组。
至此Shuffle的过程基本结束。
- 在分组之后一次读取一组数据到Reducer,调用reduce()方法进行聚合处理
-
之后通过context.write默认以TextOutputFormat格式经RecordWriter下入到文件。
最后,ReduceTask过程结束。
注意:
Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。缓冲区的大小可以通过参数调整,参数:
io.sort.mb
默认100M。
版权声明:本文为weixin_43230682原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。