MIT 6.824 - Lab 1 (5): 实现

概览

Lab 1 需要我们实现一个单机多线程、多进程的 MapReduce 程序,通过 test-mr.sh 可以看到该实验会先启动一个主节点进程,然后再启动多个工作节点进程。

主节点

主节点的入口是 mrcoordinator.go,通过 go run mrcoordinator.go pg*.txt 可运行一个主节点程序,其中 pg*.txt 是本次 MapReduce 程序的输入数据。同时,主节点和工作节点间通过 RPC 进行交互,本实验需要我们实现以下 RPC 接口:

  1. 实验中主节点的任务分发采用的是拉模式,工作节点会定期向主节点请求一个任务,这个任务可以是 map 任务也可以是 reduce 任务,由主节点根据当前任务的状态决定应该推送 map 任务还是 reduce 任务。由于 reduce 任务会不间断的拉取中间结果数据,这里为了方便处理,当某个工作节点正在处理 reduce 任务时,在向主节点请求任务时可指明要求分发对应的 reduce 任务
  2. map 任务完成后需要通知主节点生成的中间结果文件地址,主节点收到请求后需要将中间结果文件地址保存到内部数据结构中,供后续发送给对应 reduce 任务
  3. reduce 任务完成后同样需要通知主节点任务完成,否则主节点无法知晓所有 reduce 任务是否已完成,从而无法退出主节点

任务管理

主节点需要维护所有 map 任务和 reduce 任务的状态,由于工作节点有可能失败,所以主节点需要同时记录每个任务已执行了多久,如果超过一定时间还没有收到任务完成的通知,则认为执行这个任务的工作节点已失联,然后需要将该任务重新分配给其他工作节点,在本实验中主节点等待每个任务完成的时间为10秒。

工作节点

工作节点的入口是 mrworker.go,通过 go run mrworker.go wc.so 可运行一个工作节点程序,其中 wc.so 是本次 MapReduce 程序用到的用户自定义 mapreduce 函数。工作节点只有一个 Worker 主方法,需要不断向主节点轮询请求任务,那么工作节点什么时候结束轮询?根据实验建议有两种方法,一种是当本次 MapReduce 任务完成后,主节点进程退出,当工作节点再次请求主节点任务时,RPC 请求必然失败,此时工作节点可认为本次任务已完成主节点已退出,从而结束轮询并退出;另一种是当主节点完成后,在一定时间内,在收到工作节点新的任务请求时,返回一个要求工作节点退出的标识(或者也可抽象为一种任务类型),工作节点收到 RPC 响应后退出,主节点在等待时间到了之后也进行退出。

空任务

当工作节点向主节点申请任务,而此时主节点没有可分发的任务时,主节点可返回一个自定义任务类型的任务,来表示空任务,工作节点收到响应后则直接睡眠等待下次唤醒。

map 任务

map 任务负责调用用户自定义的 map 函数,生成一组中间结果数据,然后将中间结果数据保存为文件,实验建议的文件名是 mr-X-Y,其中 X 表示 map 任务的编号,Y 表示 reduce 任务的编号,map 任务编号的范围可简单使用 [1, 输入文件的数量] 来表示,reduce 任务编号的范围为 [1, nReduce]。每个 map 任务会生成 R 个中间结果文件,实验已提供了分片函数,对中间结果的每个键使用 ihash(key) % nReduce 来决定写入到哪个中间结果文件中。

从数据结构角度来说,所有的中间结果数据是一个 nReduce * nMap 的二维矩阵,每一行对应一个 reduce 任务。

由于 map 节点有可能执行失败,为避免 reduce 节点读取到的是未写入完成的中间结果文件,在写入中间结果文件时可以先写入一个临时文件,在写入完成后再重命名为最终的文件名。在本实验中,可以使用 ioutil.TempFile 来创建临时文件,以及使用 os.Rename 来原子性的重命名文件。

在当前 map 任务的中间结果文件写入完成后,需要通过 RPC 请求通知主节点所有中间结果文件的文件地址。在本实验中,各个工作节点运行在同一台机器上,实验要求保存 map 任务的中间结果文件到当前文件夹,这样 reduce 任务就能通过中间结果文件的文件名读取中间结果数据。

对中间结果数据的写文件可以借助 Goencoding/json 模块,例如:

1
2
3
enc := json.NewEncoder(file)
for _, kv := ... {
err := enc.Encode(&kv)

然后 reduce 任务读取文件:

1
2
3
4
5
6
7
8
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}

reduce 任务

reduce 任务负责将所有中间结果数据按照键排序后应用用户自定义的 reduce 函数,并生成最终输出文件,文件格式为 mr-out-X,其中 X 表示 reduce 任务的编号。文件中的每一行对应一个 reduce 函数调用的结果,需要按照 Go%v %v 形式对键值对进行格式化。

在收到所有的中间结果数据之前,reduce 任务无法开始执行,所以在这个期间当工作节点请求 reduce 任务时,如果主节点暂时没有新的 reduce 任务可分发,则可返回一个空任务,工作节点收到响应后则暂时等待一段时间再重新请求任务。reduce 每次收到中间结果数据后会暂存在内存中,如果暂存的中间结果数据的数量等于 map 任务的数量(这个值可以放在 RPC 响应中),则说明所有中间结果数据已经接收完毕,可以开始执行 reduce 任务。

注意

mrworker.go 中注释描述通过 go run mrworker.go wc.so 来运行工作节点,不过如果构建 wc.so 时开启了竞争检测(-race),则运行 mrworker.go 时也同样需要开启竞争检测,否则会提示 cannot load plugin wc.so,如果打印 err 则会显示 plugin.Open("wc"): plugin was built with a different version of package runtime

同样的,如果使用 GoLand 调试程序,由于在调试模式下 GoLand 编译 Go 文件时会增加 -gcflags="all=-N -l" 参数,所以也需要在打包 wc.so 时增加相同的参数。

活锁

在最后的 crash test 遇到个类似活锁的问题,在前文提到,如果某个工作节点开始了 reduce 任务但是还没有接收全部的中间结果数据,则该节点下次申请任务时会继续申请该 reduce 任务(普通工作节点对申请的任务类型没有要求)。在 crash test 下,工作节点在执行用户自定义的 mapreduce 函数时有一定概率结束进程,假设现在有4个工作节点,其中一个在执行 map 任务,另外三个各自在执行 reduce 任务(轮询获取中间结果数据),不幸的是
这个时候 map 节点挂了,此时 test-mr.sh 会自动再启动一个工作节点,然后更不幸的是挂掉的 map 任务在主节点看来还没有到超时时间,所以主节点此时不会分配 map 任务给新的节点(假设没有其他 map 任务),会再分配一个 reduce 任务给新的节点,至此所有工作节点都在执行 reduce 任务,又都在等待中间结果数据完成,却又不可能完成。

造成这个问题的主要原因是任务分配顺序,上述问题下的任务分配顺序是:

  1. 指定的 reduce 任务
  2. 空闲或超时的 map 任务
  3. 空闲或超时的 reduce 任务

解决方法就是把前两个换下顺序即可,即:

  1. 空闲或超时的 map 任务
  2. 指定的 reduce 任务
  3. 空闲或超时的 reduce 任务

参考