MIT-6.824 Lab1 MapReduce

  • Post author:
  • Post category:其他




前言

由于之前时间都比较分散,再加上拖延了一段时间之后,终于下定决心利用周末来完成lab1了,这篇blog将记录我完成lab1的过程,以及在这个过程中所学习到的知识点,希望可以帮助到大家~(后续会将完整代码上传至github中)



准备阶段

在正式开始完成自己的mapreduce之前,进行了一定的准备

  • 阅读mapreduce论文,

    该论文思维导图

    ,并观看mit-6.824的前两节视频
  • 阅读

    lab1

    实验说明
  • 进行mrsequential.go代码的理解



mrsequential.go

mrsequential.go中实现了一个mapreduce的简易版本,在这个版本中是顺序执行,没有包含并发的操作。流程如下:

首先读取提前生成的wc.so链接库,在这个链接库中读取map以及reduce函数为我们调用。

// 读取wc.so中的map以及reduce函数
mapf, reducef := loadPlugin(os.Args[1]) 

接着,遍历要进行统计单词个数的文件列表,并通过map函数生成一连串的 (key, value) 对。

将生成的 (key, value)进行排序,方便后续进行reduce的统计。

sort.Sort(ByKey(intermediate))

通过将相同key值的value组合在一起,变为一个新的(key, value)然后将其传送到reduce函数,最终返回单词个数,并将其写入文件。



正式实验

【实验概述】



实验步骤

首先需要完成的是works与coordinator之间通过rpc的交互,works由coordinator分配执行对应的map以及reduce任务。

关于两者间的rpc交互,源代码中给出了示例,分别是worker.go中的CallExample(),以及coordinator.go中的Example,work通过调用CallExample()中的call函数进行与coordinator的交互。

func call(rpcname string, args interface{}, reply interface{}) bool

call函数中一共有三个参数,第一个为rpcname即为Coordinator.Example,Example为coordinator.go中的函数名,第二个参数args为传入的参数列表,reply为输出(即得到)的参数列表,这两个参数列表分别以结构体的形式在rpc.go中进行控制。

以上便是rpc交互的流程。

在mapreduce程序中我们需要完成以下交互操作。

  • coordinator为多个work节点分配map任务,并等待至任务完成。
  • work节点完成完指定的map任务,并将其以json键值对的形式写入中间文件(涉及json编解码,模仿分布式网络传输数据),并通知coordinator。
  • coordinator等待所有map任务完成之后,将为work节点分配reduce任务,并将map任务产生的中间文件作为reduce任务的输入。
  • work节点完成所有的reduce任务后,通知coordinator完成mapreduce任务。



RPC中结构体定义

work需要对coordinator进行任务的请求

// worker获取任务的请求
type GetTaskRequest struct {
	Id int
}

coordinator需要为work分配任务,因此需要有一个包含任务属性的结构体。

type JobType int32
const (
	MAPTASK    JobType = 0
	REDUCETASK JobType = 1
	SLEEP      JobType = 2
)
 // coordinator 任务分发的响应
type TaskReply struct {
	Job_type JobType
	Mfile_name string  // map 任务的文件名
	Task_id string  // 任务名字,全局唯一编号
	Rfile_name []string // reduce 任务的文件名
	Reduce_num int
}

work任务完成后,给coordinator的请求信息

// worker 完成任务后发送状态
type TaskFinishedRequest struct {
	Task_id   string   //任务id
	File_name []string // map生成的中间文件名
}

coordinator对work完成任务后的响应信息

// coordinator 对任务完成后的响应
type TaskFinishedReply struct {
	Reply int
}



coordinator实现

Coordinator的核心数据结构中记录了map以及reduce任务的完成情况,记录一些关于状态的元数据,用于调用work程序进行相应任务的执行。

type IsTimeOut int32

const (
	NORMAL  IsTimeOut = 0
	TIMEOUT IsTimeOut = 1
)

// 正在执行的任务
type Task struct {
	name       string    //任务名字
	job_type   JobType   // 任务类别
	status     IsTimeOut // 是否超时
	mfile_name string    // 如果是map任务,则记录分配给该任务的文件名字
	rfile_name int       // reduce任务的任务编号
}

// 任务执行情况
type Status int32

const (
	TODO  Status = 0
	DOING Status = 1
	DONE  Status = 2
)

type Coordinator struct {

	// 需要完成的map文件、map产生的中间文件、当前正在执行的任务、
	map_task          map[string]Status //map任务的记录
	reduce_task       map[int]Status    //reduce任务的记录
	intermediate_file map[int][]string  //中间文件
	task_list_map     map[string]*Task  // 当前正在执行的任务
	mcount            int               // 已经完成的map数量
	rcount            int               // 已经完成的reduce数量
	reduce_num        int               // 需要完成的reduce任务数量
	mutex             sync.Mutex        //锁
}

coordinator通过接收work的请求来为work分配具体的任务,它们两者间是通过rpc来完成的交互。

目前获取任务的方式是通过遍历Coordinator中map以及reduce的任务列表来进行的,时间复杂度较高,未来可考虑采用任务队列的形式来提高效率。

// 通过rpc获取任务
func (c *Coordinator) Get_task(req *GetTaskRequest, reply *TaskReply) error {
	// 多个work并发访问任务请求,需要加锁
	c.mutex.Lock()
	defer c.mutex.Unlock()

	reply.Mfile_name = ""
	reply.Rfile_name = make([]string, 0)
	reply.Reduce_num = c.reduce_num
	reply.Task_id = strconv.Itoa(taskID)
	taskID += 1
	// map任务是否全部完成
	if c.m_finished {
		// 遍历reduce任务,选择undo的任务进行执行
		for k, _ := range c.reduce_task {
			flag := c.reduce_task[k]
			if flag == DOING || flag == DONE {
				continue
			}
			c.reduce_task[k] = DOING
			for _, filename := range c.intermediate_file[k] {
				reply.Rfile_name = append(reply.Rfile_name, filename)
			}
			reply.Job_type = REDUCETASK
			tmp := &Task{reply.Task_id, REDUCETASK, NORMAL, "", k}
			c.task_list_map[reply.Task_id] = tmp
			// 超时任务处理
			go c.Handle_timeout(reply.Task_id)
			return nil
		}
		reply.Job_type = SLEEP
		return nil
	} else {
		// 遍历map任务,选择UNDO的任务进行执行
		for k, v := range c.map_task {
			flag := v
			if flag == DOING || flag == DONE {
				continue
			}
			c.map_task[k] = DOING
			reply.Mfile_name = k
			reply.Job_type = MAPTASK
			tmp := &Task{reply.Task_id, MAPTASK, NORMAL, reply.Mfile_name, -1}
			c.task_list_map[reply.Task_id] = tmp
			// 超时任务处理
			go c.Handle_timeout(reply.Task_id)
			return nil
		}
		// 若都没有则进行休眠
		reply.Job_type = SLEEP
		return nil
	}
	return nil
}

超时任务处理,如果10s没有响应,则将该任务状态设置为未做。

// 超时处理
func (c *Coordinator) Handle_timeout(task_id string) {
	time.Sleep(time.Second * 10)
	c.mutex.Lock()
	defer c.mutex.Unlock()
	if t, ok := c.task_list_map[task_id]; ok {
		t.status = TIMEOUT
		if t.job_type == MAPTASK {
			f := t.mfile_name
			if c.map_task[f] == DOING {
				c.map_task[f] = TODO
			}
		} else if t.job_type == REDUCETASK {
			f := t.rfile_name
			if c.reduce_task[f] == DOING {
				c.reduce_task[f] = TODO
			}
		}
	}
}

work任务完成时会对coordinator发送响应,coordinator会根据任务类型进行一定的响应处理。

// 响应任务完成发来的请求
func (c *Coordinator) Report_task(req *TaskFinishedRequest, reply *TaskFinishedReply) error {
	reply.Reply = 1
	c.mutex.Lock()
	defer c.mutex.Unlock()

	if t, ok := c.task_list_map[req.Task_id]; ok {
		flag := t.status
		// 任务超时,需要忽略该任务的请求,并需要将该任务删除
		if flag == TIMEOUT {
			delete(c.task_list_map, req.Task_id)
			return nil
		}
		if t.job_type == MAPTASK {
			filename := t.mfile_name
			c.map_task[filename] = DONE
			c.mcount += 1
			// 是否完成所有map任务
			if c.mcount == len(c.map_task) {
				c.m_finished = true
			}
			for _, v := range req.File_name {
				index := strings.LastIndex(v, "_")
				num, err := strconv.Atoi(v[index+1:])
				if err != nil {
					log.Fatal(err)
				}
				c.intermediate_file[num] = append(c.intermediate_file[num], v)
			}
			return nil
		} else if t.job_type == REDUCETASK {
			filename := t.rfile_name
			c.reduce_task[filename] = DONE
			c.rcount += 1
			delete(c.task_list_map, t.name)
			return nil

		} else {
			log.Fatal("任务类型错误")
			return nil
		}
	}
	log.Println("%s 任务不在Coordinator的任务列表中", req.Task_id)
	return nil
}



worker实现

work主要完成执行用户提供的map以及reduce这两个功能。

对于map功能的执行,核心功能与mesequential.go中的map执行差不多,将文件内容导入到mapf函数产生输出,并将输出json编码写入到中间文件中。

// 处理map任务
func handle_map(mapf func(string, string) []KeyValue, filename string, reduce_num int, task_id string) []string {
	intermediate := []KeyValue{}
	file, err := os.Open(filename)
	if err != nil {
		//		println("hello world ", filename)
		log.Fatalf("cannot open %v", filename)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", filename)
	}
	file.Close()
	// 执行用户自定义的mapfunction
	kva := mapf(filename, string(content))
	intermediate = append(intermediate, kva...)

	filenames := make([]string, reduce_num)
	files := make([]*os.File, reduce_num)
	// 新建中间文件
	for i := 0; i < reduce_num; i++ {
		oname := "mr"
		oname = oname + "_" + task_id + "_" + strconv.Itoa(i)
		ofile, _ := os.Create(oname)
		files[i] = ofile
		filenames[i] = oname
	}
	// 将数据以json格式写入到文件中
	for _, kv := range intermediate {
		index := ihash(kv.Key) % reduce_num
		enc := json.NewEncoder(files[index])
		enc.Encode(&kv)
	}
	return filenames
}

对于reduce任务则需要将中间文件读取出来并通过用户自定义的reduced函数合并之后写入最终输出文件即可。

//处理reduce任务
func handle_reduce(reducef func(string, []string) string, filenames []string) string {

	files := make([]*os.File, len(filenames))
	intermediate := []KeyValue{}
	for i := 0; i < len(filenames); i++ { //读取所有的文件信息
		files[i], _ = os.Open(filenames[i])
		kv := KeyValue{}
		dec := json.NewDecoder(files[i])
		for {
			if err := dec.Decode(&kv); err != nil {
				break
			}
			intermediate = append(intermediate, kv)
		}
	}
	// 进行排序
	sort.Sort(ByKey(intermediate))
	oname := "mr-out-"

	index := filenames[0][strings.LastIndex(filenames[0], "_")+1:]
	oname = oname + index
	ofile, _ := os.Create(oname) //创建特定名字的输出文件

	i := 0
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		output := reducef(intermediate[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

		i = j
	}
	return oname
}

Worker函数的主流程则为work不断的进行任务的请求与执行。

//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	// 循环获取任务
	for {
		req := GetTaskRequest{}
		req.Id = 0
		reply := TaskReply{}
		call("Coordinator.Get_task", &req, &reply)
		// map 任务的执行
		if reply.Job_type == MAPTASK {
			filenames := handle_map(mapf, reply.Mfile_name, reply.Reduce_num, reply.Task_id)
			task_req := TaskFinishedRequest{}
			task_req.Task_id = reply.Task_id
			task_req.File_name = filenames
			task_reply := TaskFinishedReply{}
			task_reply.Reply = 0
			call("Coordinator.Report_task", &task_req, &task_reply)
		} else if reply.Job_type == REDUCETASK {
			handle_reduce(reducef, reply.Rfile_name)
			task_req := TaskFinishedRequest{}
			task_req.Task_id = reply.Task_id
			task_req.File_name = make([]string, 0)
			task_reply := TaskFinishedReply{}
			task_reply.Reply = 0
			call("Coordinator.Report_task", &task_req, &task_reply)
		} else if reply.Job_type == SLEEP {
			time.Sleep(time.Millisecond * 10)
		} else {
			log.Fatal("任务类型错误")
		}
	}

}

以上便是lab1实现的简略版的核心代码了,若有不当之处,欢迎指出~



版权声明:本文为diyxuan原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。