mrworker.go 中注释描述通过 go run mrworker.go wc.so 来运行工作节点,不过如果构建 wc.so 时开启了竞争检测(-race),则运行 mrworker.go 时也同样需要开启竞争检测,否则会提示 cannot load plugin wc.so,如果打印 err 则会显示 plugin.Open("wc"): plugin was built with a different version of package runtime。
// // The map function is called once for each file of input. The first // argument is the name of the input file, and the second is the // file's complete contents. You should ignore the input file name, // and look only at the contents argument. The return value is a slice // of key/value pairs. // funcMap(filename string, contents string) []mr.KeyValue { // function to detect word separators. ff := func(r rune)bool { return !unicode.IsLetter(r) }
// split contents into an array of words. words := strings.FieldsFunc(contents, ff)
kva := []mr.KeyValue{} for _, w := range words { kv := mr.KeyValue{w, "1"} kva = append(kva, kv) } return kva }
reduce 函数的输出类型为 string,其逻辑较为简单,中间结果数组的长度就是单词的个数:
1 2 3 4 5 6 7 8 9
// // The reduce function is called once for each key generated by the // map tasks, with a list of all the values created for that key by // any map task. // funcReduce(key string, values []string)string { // return the number of occurrences of this word. return strconv.Itoa(len(values)) }
框架代码
我们可以通过 go run -race mrsequential.go wc.so pg*.txt 来运行串行化的 MapReduce 程序,这里的 wc.so 内包含了用户自定义的 map 和 reduce 函数,pg*.txt 则是本次 MapReduce 程序的原始输入数据。
首先,根据入参提供的插件找到用户自定义的 map 和 reduce 函数:
1
mapf, reducef := loadPlugin(os.Args[1])
接着,依次读取输入文件的内容,并调用用户自定义的 map 函数,生成一组中间结果数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// // read each input file, // pass it to Map, // accumulate the intermediate Map output. // intermediate := []mr.KeyValue{} for _, filename := range os.Args[2:] { file, err := os.Open(filename) if err != nil { log.Fatalf("cannot open %v", filename) } content, err := ioutil.ReadAll(file) if err != nil { log.Fatalf("cannot read %v", filename) } file.Close() kva := mapf(filename, string(content)) intermediate = append(intermediate, kva...) }
// // a big difference from real MapReduce is that all the // intermediate data is in one place, intermediate[], // rather than being partitioned into NxM buckets. //
// // call Reduce on each distinct key in intermediate[], // and print the result to mr-out-0. // i := 0 for i < len(intermediate) { j := i + 1 for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key { j++ } values := []string{} for k := i; k < j; k++ { values = append(values, intermediate[k].Value) } output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output. fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
Hello, world ================== WARNING: DATA RACE Write at 0x00c00007e180 by goroutine 7: runtime.mapassign_faststr() /usr/local/go/src/runtime/map_faststr.go:203 +0x0 main.main.func1() /path/to/racy.go:10 +0x50
Previous read at 0x00c00007e180 by main goroutine: runtime.mapaccess1_faststr() /usr/local/go/src/runtime/map_faststr.go:13 +0x0 main.main() /path/to/racy.go:13 +0x16b
Goroutine 7 (running) created at: main.main() /path/to/racy.go:9 +0x14e ================== ================== WARNING: DATA RACE Write at 0x00c000114088 by goroutine 7: main.main.func1() /path/to/racy.go:10 +0x5c
Previous read at 0x00c000114088 by main goroutine: main.main() /path/to/racy.go:13 +0x175
Goroutine 7 (running) created at: main.main() /path/to/racy.go:9 +0x14e ================== Found 2 data race(s) exit status 66
这个例子中每次循环时会创建一个 goroutine,每个 goroutine 会打印循环计数器 i 的值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
package main
import ( "fmt" "sync" )
funcmain() { var wg sync.WaitGroup wg.Add(5) for i := 0; i < 5; i++ { gofunc() { fmt.Println(i) // Not the 'i' you are looking for. wg.Done() }() } wg.Wait() }
我们想要的结果是能输出 01234 这5个值(实际顺序并不一定是 01234),但由于主 goroutine 对 i 的更新和被创建的 goroutine 对 i 的读取之间存在数据竞争,最终的输出结果可能是 55555 也可能是其他值。
如果要修复这个问题,每次创建 goroutine 时复制一份 i 再传给 goroutine 即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
package main
import ( "fmt" "sync" )
funcmain() { var wg sync.WaitGroup wg.Add(5) for i := 0; i < 5; i++ { gofunc(j int) { fmt.Println(j) // Good. Read local copy of the loop counter. wg.Done() }(i) } wg.Wait() }
// ParallelWrite writes data to file1 and file2, returns the errors. funcParallelWrite(data []byte)chanerror { res := make(chanerror, 2) f1, err := os.Create("file1") if err != nil { res <- err } else { gofunc() { // This err is shared with the main goroutine, // so the write races with the write below. _, err = f1.Write(data) res <- err f1.Close() }() } f2, err := os.Create("file2") // The second conflicting write to err. if err != nil { res <- err } else { gofunc() { _, err = f2.Write(data) res <- err f2.Close() }() } return res }
// ParallelWrite writes data to file1 and file2, returns the errors. funcParallelWrite(data []byte)chanerror { res := make(chanerror, 2) f1, err := os.Create("file1") if err != nil { res <- err } else { gofunc() { // This err is shared with the main goroutine, // so the write races with the write below. _, err := f1.Write(data) res <- err f1.Close() }() } f2, err := os.Create("file2") // The second conflicting write to err. if err != nil { res <- err } else { gofunc() { _, err := f2.Write(data) res <- err f2.Close() }() } return res }
funcmain() { c := make(chanstruct{}) // or buffered channel
// The race detector cannot derive the happens before relation // for the following send and close operations. These two operations // are unsynchronized and happen concurrently. gofunc() { c <- struct{}{} }() close(c) }
所以在关闭 channel 前,增加对 channel 的读取操作来保证数据发送完成:
1 2 3 4 5 6 7 8 9
package main
funcmain() { c := make(chanstruct{}) // or buffered channel
不过上述配置不适合 Kotlin 项目,会提示 Missing: no javadoc jar found in folder '/com/example/username/awesomeproject',需要将 maven-javadoc-plugin 替换为 dokka-maven-plugin:
<name>ossrh-demo</name> <description>A demo for deployment to the Central Repository via OSSRH</description> <url>http://github.com/simpligility/ossrh-demo</url>
Central sync is activated for com.example.awesomeproject. After you successfully release, your component will be available to the public on Central https://repo1.maven.org/maven2/, typically within 30 minutes, though updates to https://search.maven.org can take up to four hours.
也就是30分钟内即可从 Maven 中央仓库下载 JAR 包,不过要想能在 search.maven.org 搜索到你的 JAR 包,需要等待至多4个小时。
Posted onWord count in article: 16kReading time ≈26 mins.
介绍
MapReduce: Simplified Data Processing on Large Clusters 是 6.824: Distributed Systems 中所介绍的第一篇论文。它提出了一种针对大数据处理的编程模型和实现,使得编程人员无需并行和分布式系统经验就可以轻松构建大数据处理应用。该模型将大数据处理问题拆解为两步,即 map 和 reduce,map 阶段将一组输入的键值对转化为中间结果键值对,reduce 阶段对中间结果键值对按照相同的键进行值的合并,从而得到最终的结果。
背景
对于 Google 来说,每天运行的系统会产生大量的原始数据,同时又要对这些原始数据进行加工产生各种衍生数据,虽然大部分数据加工的逻辑都较为简单,然而由于数据量过于庞大,为了在合理的时间内完成数据处理,通常需要将待处理的数据分发到几百或几千台机器上并行计算,这就存在几个问题:
map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate(w, "1");
reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result));
存储使用的是廉价的 IDE 硬盘,并直接装载到了机器上。不过 Google 内部实现了一套分布式文件存储系统来管理这些硬盘上的数据,并通过数据冗余作为在不可靠的硬件上实现可用性和可靠性的手段。
用户向调度系统提交一组任务,每个任务包含多个子任务,调度系统会为每个任务分配一批集群内的机器执行。
执行概览
在 map 执行阶段,框架会自动将输入数据分为 M 片,从而将 map 任务分发到多台机器上并行执行,每台机器只处理某一片的数据。同样的,在 reduce 阶段,框架首先将中间结果数据根据分片函数(例如 hash(key) mod R)拆分为 R 片,然后分发给 reduce 任务执行,用户可自行指定 R 的值和实现具体的分片函数。
类似于游戏的自动存档,我们可以定期为主节点内部的数据结构保存检查点。如果主节点发生异常,则可以重新启动一个主节点程序并加载最新的检查点数据。然而对于单个主节点来说,主节点异常发生的概率较小,所以在 Google 的实现中,如果主节点发生异常,则会直接中断当次 MapReduce 任务。客户端可捕获到这种情况,并根据自身需要决定是否进行重试。
如前文所述 MapReduce 框架会根据输入数据拆分为 M 个 map 任务和 R 个 reduce 任务。理想情况下,M 和 R 的值应该远大于工作节点的数量。为什么需要远大于?首先,M 和 R 的数量比工作节点的数量少是不适合的,会造成资源空闲;其次,如果 M 和 R 的数量和工作节点相等,由于每台机器的处理能力有差异或者输入数据不同,每个工作节点完成某个任务需要的时间也不同,会存在部分机器先完成任务的情况,同样会造成资源空闲,另一方面 MapReduce 面向的是大数据处理,输入数据的数据量远大于工作节点的数量,M 和 R 数量较少的情况下单个工作节点需要处理的数据量则较大,单次处理时间可能较慢,而如果发生了异常,重新执行的成本也较高。所以 M 和 R 的数量设置较大能更好的应对负载均衡,性能好的机器在完成任务后可以继续处理其他任务,同时当某个任务执行异常时也能更快的恢复:该异常的工作节点已完成的 map 任务可分发给余下的工作节点执行。
当然 M 和 R 的数量也是存在上限的,对于 master 节点来说,它需要维护 M 个 map 任务和 R 个 reduce 任务,时间复杂度是 O(M + R),另一方面每个 map 任务会产出 R 份中间结果数据,对应 R 个 reduce 任务,所以 master 节点也需要 O(M * R) 的空间复杂度来维护这样的对应关系(Google 实际实现时,每个 map/reduce 关系对约占据 1 byte 内存)。
另外,由于每个 reduce 任务的最终产出结果是一个单独的文件所以 R 的数量受用户设置限制。在实践中,会趋向于让每个 map 任务处理 16 MB 到 64 MB 的输入数据来确定 M 的大小,因为 64 MB 正好是 GFS 单个数据块的大小,这样每个 map 任务必然能从本地磁盘读取输入数据而不会涉及网络 IO(如果能将任务分发给存有对应输入数据的节点的话),而 R 的数量会在工作节点的数量上乘上一个较小的常数得到。Google 内部运行 MapReduce 任务时通常设置 M 为200000,使用2000台机器的情况下设置 R 为5000。
在某些场景下,map 任务产生的中间结果数据的键存在大量的重复,同时用户编写的 reduce 函数又符合交换律和结合律(即 a + b = b + a,(a + b) + c = a + (b + c))。一个典型案例就是前文描述的单词计数程序,每个 map 任务都会产生成百上千的形如 <the, 1> 的中间结果数据,其中 the 指某个单词,1表示该单词出现的次数。这些同键的中间结果数据接着会经过网络传输发送给 reduce 任务,然后由 reduce 函数合并相加。为了减少这种雷同数据的网络传输,用户可编写自定义的合并函数,map 任务在生成中间结果数据前先进行同键的合并,从而将原来成百上千的同键网络传输降低为1次。