MIT 6.824 - Lab 1 (3): 示例程序

Lab 1 提供了一个串行化的示例 MapReduce 程序,整体分两部分,第一部分是用户自定义的 mapreduce 函数,第二部分是框架代码。

用户自定义 map 和 reduce 函数

以单词计数应用 wc.go 为例,对于 map 函数来说,它的输入键值对类型为 <string, string>,中间结果数据类型为框架定义的 KeyValue 类型,本质上也是个 <string, string> 类型。map 函数首先将文件内容拆分为单词,然后遍历每个单词,输出对应中间结果数据 <w, "1">

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//
// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
//
func Map(filename string, contents string) []mr.KeyValue {
// function to detect word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }

// split contents into an array of words.
words := strings.FieldsFunc(contents, ff)

kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}

reduce 函数的输出类型为 string,其逻辑较为简单,中间结果数组的长度就是单词的个数:

1
2
3
4
5
6
7
8
9
//
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
//
func Reduce(key string, values []string) string {
// return the number of occurrences of this word.
return strconv.Itoa(len(values))
}

框架代码

我们可以通过 go run -race mrsequential.go wc.so pg*.txt 来运行串行化的 MapReduce 程序,这里的 wc.so 内包含了用户自定义的 mapreduce 函数,pg*.txt 则是本次 MapReduce 程序的原始输入数据。

首先,根据入参提供的插件找到用户自定义的 mapreduce 函数:

1
mapf, reducef := loadPlugin(os.Args[1])

接着,依次读取输入文件的内容,并调用用户自定义的 map 函数,生成一组中间结果数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//
// read each input file,
// pass it to Map,
// accumulate the intermediate Map output.
//
intermediate := []mr.KeyValue{}
for _, filename := range os.Args[2:] {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
}

然后,对所有中间结果数据按照键进行排序,而在 MapReduce 论文中,中间结果数据会经过分片函数分发给不同的 reduce 节点,由 reduce 节点自行排序处理:

1
2
3
4
5
6
7
//
// a big difference from real MapReduce is that all the
// intermediate data is in one place, intermediate[],
// rather than being partitioned into NxM buckets.
//

sort.Sort(ByKey(intermediate))

最后,遍历所有中间结果数据,对同键的中间结果数据调用用户自定义的 reduce 函数,并将结果写入到最终输出文件中,同样的,这里也只有一个最终输出文件而不是多个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
oname := "mr-out-0"
ofile, _ := os.Create(oname)

//
// call Reduce on each distinct key in intermediate[],
// and print the result to mr-out-0.
//
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)

// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

i = j
}

ofile.Close()

至此,一个串行化的 MapReduce 程序就完成了。

参考