6.824 lab1 MapReduce

  • Post author:
  • Post category:其他




6.824 Lab-1 MapReduce



1.实验内容



1.1内容概述

将经典的Word Counter任务使用MapReduce编程范式去实现,任务整体流程如下(假设两个Map节点和两个Reduce节点):

image-20220220165329877

每个Map Worker负责一个输入文件的Map处理,每个Map任务输出N份文件(N是Reduce Worker数目),这N份文件会送到N个Reduce Worker处理。等待所有Map任务完成后,Reduce工作才能开始(此时所有输入文件才准备好),每个Reduce Worker输出一份reduce结果。



1.2lab相关代码概述

与本实验相关的主要有三个包,分别是main、mrapps、mr包。main包调用mrapps包和mr包运行整个流程,mrapps包是运行和测试时使用的工具函数包,这两个包在实验过程中都不需要改动,自己写的代码都在mr包中,其中包含三个文件coordinator.go、rpc.go和worker.go,作用如下

mr
├── coordinator.go \\Master
├── rpc.go		   \\处理通信
└── worker.go	   \\worker,包含map和reduce



2.实验步骤



2.1定义通信内容(rpc.go)



2.1.1分配任务时的请求与响应

worker请求任务时,不区分map和reduce,让coordinator根据任务完成情况来决定分配任务类型,这里有个边界情况就是所有的任务都在运行中,这个状态既得不到任务又不能直接退出,只能进行下一次任务。

// 节点请求任务
type ReqArgs struct {
   
	ReqNumber int8 //占用一个字节表示请求 为1表示申请任务
}
// Master回应任务内容
type ReqReply struct {
   
	TypeName string   // map or reduce or allinprogress
	Idx      int      //worker idx
	Content  []string //file names for work content
	NReduce  int      //reduce worker number
}


2.1.2完成任务时worker汇报任务结果给Master

汇报完成情况时,需要说明任务类型(TypeName), 任务结果(Ret), 完成的任务编号(idx)

// 报告任务完成情况
type FinishReq struct {
   
	TypeName string //map or reduce
	Ret      []string //output files(map or reduce)
	Idx      int //worker idx (map or reduce)
}

//Master 回应worker
type FinishReply struct {
   
	Done bool //for reply
}



2.2实现Worker(Worker.go)



2.2.1Map worker

Map worker任务包括三步,第一步读取输入文件调用mapf生成key value对,第二步处理kv对,排序之后合并相同条目到同一行,第三步将结果写入输出文件。



1.读取文件内容生成Key Value对
intermediate := []KeyValue{
   }
filename := reply1.Content[0]
NReduce := reply1.NReduce

//打开文件
file, err := os.Open(filename)
if err != nil {
   
	log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
   
	log.Fatalf("cannot read %v", filename)
}

file.Close()

//生成KV对
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)


2.处理kv对生成中间结果
sort.Sort(ByKey(intermediate))
i := 0
reduceInput := make([][]ReduceKv, NReduce)
for i < len(intermediate) {
   
	j := i + 1
    for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
   
        j++
    }
    values := []string{
   }
    for k := i; k < j; k++ {
   
        values = append(values, intermediate[k



版权声明:本文为qq_41084473原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。