MIT 6.824 - Lab 1 (5): 实现
概览
Lab 1
需要我们实现一个单机多线程、多进程的 MapReduce
程序,通过 test-mr.sh
可以看到该实验会先启动一个主节点进程,然后再启动多个工作节点进程。
主节点
主节点的入口是 mrcoordinator.go
,通过 go run mrcoordinator.go pg*.txt
可运行一个主节点程序,其中 pg*.txt
是本次 MapReduce
程序的输入数据。同时,主节点和工作节点间通过 RPC
进行交互,本实验需要我们实现以下 RPC
接口:
- 实验中主节点的任务分发采用的是拉模式,工作节点会定期向主节点请求一个任务,这个任务可以是
map
任务也可以是reduce
任务,由主节点根据当前任务的状态决定应该推送map
任务还是reduce
任务。由于reduce
任务会不间断的拉取中间结果数据,这里为了方便处理,当某个工作节点正在处理reduce
任务时,在向主节点请求任务时可指明要求分发对应的reduce
任务 map
任务完成后需要通知主节点生成的中间结果文件地址,主节点收到请求后需要将中间结果文件地址保存到内部数据结构中,供后续发送给对应reduce
任务reduce
任务完成后同样需要通知主节点任务完成,否则主节点无法知晓所有reduce
任务是否已完成,从而无法退出主节点
任务管理
主节点需要维护所有 map
任务和 reduce
任务的状态,由于工作节点有可能失败,所以主节点需要同时记录每个任务已执行了多久,如果超过一定时间还没有收到任务完成的通知,则认为执行这个任务的工作节点已失联,然后需要将该任务重新分配给其他工作节点,在本实验中主节点等待每个任务完成的时间为10秒。
工作节点
工作节点的入口是 mrworker.go
,通过 go run mrworker.go wc.so
可运行一个工作节点程序,其中 wc.so
是本次 MapReduce
程序用到的用户自定义 map
和 reduce
函数。工作节点只有一个 Worker
主方法,需要不断向主节点轮询请求任务,那么工作节点什么时候结束轮询?根据实验建议有两种方法,一种是当本次 MapReduce
任务完成后,主节点进程退出,当工作节点再次请求主节点任务时,RPC
请求必然失败,此时工作节点可认为本次任务已完成主节点已退出,从而结束轮询并退出;另一种是当主节点完成后,在一定时间内,在收到工作节点新的任务请求时,返回一个要求工作节点退出的标识(或者也可抽象为一种任务类型),工作节点收到 RPC
响应后退出,主节点在等待时间到了之后也进行退出。
空任务
当工作节点向主节点申请任务,而此时主节点没有可分发的任务时,主节点可返回一个自定义任务类型的任务,来表示空任务,工作节点收到响应后则直接睡眠等待下次唤醒。
map 任务
map
任务负责调用用户自定义的 map
函数,生成一组中间结果数据,然后将中间结果数据保存为文件,实验建议的文件名是 mr-X-Y
,其中 X
表示 map
任务的编号,Y
表示 reduce
任务的编号,map
任务编号的范围可简单使用 [1, 输入文件的数量]
来表示,reduce
任务编号的范围为 [1, nReduce]
。每个 map
任务会生成 R
个中间结果文件,实验已提供了分片函数,对中间结果的每个键使用 ihash(key) % nReduce
来决定写入到哪个中间结果文件中。
从数据结构角度来说,所有的中间结果数据是一个 nReduce * nMap
的二维矩阵,每一行对应一个 reduce
任务。
由于 map
节点有可能执行失败,为避免 reduce
节点读取到的是未写入完成的中间结果文件,在写入中间结果文件时可以先写入一个临时文件,在写入完成后再重命名为最终的文件名。在本实验中,可以使用 ioutil.TempFile
来创建临时文件,以及使用 os.Rename
来原子性的重命名文件。
在当前 map
任务的中间结果文件写入完成后,需要通过 RPC
请求通知主节点所有中间结果文件的文件地址。在本实验中,各个工作节点运行在同一台机器上,实验要求保存 map
任务的中间结果文件到当前文件夹,这样 reduce
任务就能通过中间结果文件的文件名读取中间结果数据。
对中间结果数据的写文件可以借助 Go
的 encoding/json
模块,例如:
1 | enc := json.NewEncoder(file) |
然后 reduce
任务读取文件:
1 | dec := json.NewDecoder(file) |
reduce 任务
reduce
任务负责将所有中间结果数据按照键排序后应用用户自定义的 reduce
函数,并生成最终输出文件,文件格式为 mr-out-X
,其中 X
表示 reduce
任务的编号。文件中的每一行对应一个 reduce
函数调用的结果,需要按照 Go
的 %v %v
形式对键值对进行格式化。
在收到所有的中间结果数据之前,reduce
任务无法开始执行,所以在这个期间当工作节点请求 reduce
任务时,如果主节点暂时没有新的 reduce
任务可分发,则可返回一个空任务,工作节点收到响应后则暂时等待一段时间再重新请求任务。reduce
每次收到中间结果数据后会暂存在内存中,如果暂存的中间结果数据的数量等于 map
任务的数量(这个值可以放在 RPC
响应中),则说明所有中间结果数据已经接收完毕,可以开始执行 reduce
任务。
注意
mrworker.go
中注释描述通过 go run mrworker.go wc.so
来运行工作节点,不过如果构建 wc.so
时开启了竞争检测(-race
),则运行 mrworker.go
时也同样需要开启竞争检测,否则会提示 cannot load plugin wc.so
,如果打印 err
则会显示 plugin.Open("wc"): plugin was built with a different version of package runtime
。
同样的,如果使用 GoLand
调试程序,由于在调试模式下 GoLand
编译 Go
文件时会增加 -gcflags="all=-N -l"
参数,所以也需要在打包 wc.so
时增加相同的参数。
活锁
在最后的 crash test
遇到个类似活锁的问题,在前文提到,如果某个工作节点开始了 reduce
任务但是还没有接收全部的中间结果数据,则该节点下次申请任务时会继续申请该 reduce
任务(普通工作节点对申请的任务类型没有要求)。在 crash test
下,工作节点在执行用户自定义的 map
或 reduce
函数时有一定概率结束进程,假设现在有4个工作节点,其中一个在执行 map
任务,另外三个各自在执行 reduce
任务(轮询获取中间结果数据),不幸的是
这个时候 map
节点挂了,此时 test-mr.sh
会自动再启动一个工作节点,然后更不幸的是挂掉的 map
任务在主节点看来还没有到超时时间,所以主节点此时不会分配 map
任务给新的节点(假设没有其他 map
任务),会再分配一个 reduce
任务给新的节点,至此所有工作节点都在执行 reduce
任务,又都在等待中间结果数据完成,却又不可能完成。
造成这个问题的主要原因是任务分配顺序,上述问题下的任务分配顺序是:
- 指定的
reduce
任务 - 空闲或超时的
map
任务 - 空闲或超时的
reduce
任务
解决方法就是把前两个换下顺序即可,即:
- 空闲或超时的
map
任务 - 指定的
reduce
任务 - 空闲或超时的
reduce
任务