// // The map function is called once for each file of input. The first // argument is the name of the input file, and the second is the // file's complete contents. You should ignore the input file name, // and look only at the contents argument. The return value is a slice // of key/value pairs. // funcMap(filename string, contents string) []mr.KeyValue { // function to detect word separators. ff := func(r rune)bool { return !unicode.IsLetter(r) }
// split contents into an array of words. words := strings.FieldsFunc(contents, ff)
kva := []mr.KeyValue{} for _, w := range words { kv := mr.KeyValue{w, "1"} kva = append(kva, kv) } return kva }
reduce 函数的输出类型为 string,其逻辑较为简单,中间结果数组的长度就是单词的个数:
1 2 3 4 5 6 7 8 9
// // The reduce function is called once for each key generated by the // map tasks, with a list of all the values created for that key by // any map task. // funcReduce(key string, values []string)string { // return the number of occurrences of this word. return strconv.Itoa(len(values)) }
框架代码
我们可以通过 go run -race mrsequential.go wc.so pg*.txt 来运行串行化的 MapReduce 程序,这里的 wc.so 内包含了用户自定义的 map 和 reduce 函数,pg*.txt 则是本次 MapReduce 程序的原始输入数据。
首先,根据入参提供的插件找到用户自定义的 map 和 reduce 函数:
1
mapf, reducef := loadPlugin(os.Args[1])
接着,依次读取输入文件的内容,并调用用户自定义的 map 函数,生成一组中间结果数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// // read each input file, // pass it to Map, // accumulate the intermediate Map output. // intermediate := []mr.KeyValue{} for _, filename := range os.Args[2:] { file, err := os.Open(filename) if err != nil { log.Fatalf("cannot open %v", filename) } content, err := ioutil.ReadAll(file) if err != nil { log.Fatalf("cannot read %v", filename) } file.Close() kva := mapf(filename, string(content)) intermediate = append(intermediate, kva...) }
// // a big difference from real MapReduce is that all the // intermediate data is in one place, intermediate[], // rather than being partitioned into NxM buckets. //
// // call Reduce on each distinct key in intermediate[], // and print the result to mr-out-0. // i := 0 for i < len(intermediate) { j := i + 1 for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key { j++ } values := []string{} for k := i; k < j; k++ { values = append(values, intermediate[k].Value) } output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output. fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)