概览

Lab 1 需要我们实现一个单机多线程、多进程的 MapReduce 程序,通过 test-mr.sh 可以看到该实验会先启动一个主节点进程,然后再启动多个工作节点进程。

主节点

主节点的入口是 mrcoordinator.go,通过 go run mrcoordinator.go pg*.txt 可运行一个主节点程序,其中 pg*.txt 是本次 MapReduce 程序的输入数据。同时,主节点和工作节点间通过 RPC 进行交互,本实验需要我们实现以下 RPC 接口:

  1. 实验中主节点的任务分发采用的是拉模式,工作节点会定期向主节点请求一个任务,这个任务可以是 map 任务也可以是 reduce 任务,由主节点根据当前任务的状态决定应该推送 map 任务还是 reduce 任务。由于 reduce 任务会不间断的拉取中间结果数据,这里为了方便处理,当某个工作节点正在处理 reduce 任务时,在向主节点请求任务时可指明要求分发对应的 reduce 任务
  2. map 任务完成后需要通知主节点生成的中间结果文件地址,主节点收到请求后需要将中间结果文件地址保存到内部数据结构中,供后续发送给对应 reduce 任务
  3. reduce 任务完成后同样需要通知主节点任务完成,否则主节点无法知晓所有 reduce 任务是否已完成,从而无法退出主节点

任务管理

主节点需要维护所有 map 任务和 reduce 任务的状态,由于工作节点有可能失败,所以主节点需要同时记录每个任务已执行了多久,如果超过一定时间还没有收到任务完成的通知,则认为执行这个任务的工作节点已失联,然后需要将该任务重新分配给其他工作节点,在本实验中主节点等待每个任务完成的时间为10秒。

工作节点

工作节点的入口是 mrworker.go,通过 go run mrworker.go wc.so 可运行一个工作节点程序,其中 wc.so 是本次 MapReduce 程序用到的用户自定义 mapreduce 函数。工作节点只有一个 Worker 主方法,需要不断向主节点轮询请求任务,那么工作节点什么时候结束轮询?根据实验建议有两种方法,一种是当本次 MapReduce 任务完成后,主节点进程退出,当工作节点再次请求主节点任务时,RPC 请求必然失败,此时工作节点可认为本次任务已完成主节点已退出,从而结束轮询并退出;另一种是当主节点完成后,在一定时间内,在收到工作节点新的任务请求时,返回一个要求工作节点退出的标识(或者也可抽象为一种任务类型),工作节点收到 RPC 响应后退出,主节点在等待时间到了之后也进行退出。

空任务

当工作节点向主节点申请任务,而此时主节点没有可分发的任务时,主节点可返回一个自定义任务类型的任务,来表示空任务,工作节点收到响应后则直接睡眠等待下次唤醒。

map 任务

map 任务负责调用用户自定义的 map 函数,生成一组中间结果数据,然后将中间结果数据保存为文件,实验建议的文件名是 mr-X-Y,其中 X 表示 map 任务的编号,Y 表示 reduce 任务的编号,map 任务编号的范围可简单使用 [1, 输入文件的数量] 来表示,reduce 任务编号的范围为 [1, nReduce]。每个 map 任务会生成 R 个中间结果文件,实验已提供了分片函数,对中间结果的每个键使用 ihash(key) % nReduce 来决定写入到哪个中间结果文件中。

从数据结构角度来说,所有的中间结果数据是一个 nReduce * nMap 的二维矩阵,每一行对应一个 reduce 任务。

由于 map 节点有可能执行失败,为避免 reduce 节点读取到的是未写入完成的中间结果文件,在写入中间结果文件时可以先写入一个临时文件,在写入完成后再重命名为最终的文件名。在本实验中,可以使用 ioutil.TempFile 来创建临时文件,以及使用 os.Rename 来原子性的重命名文件。

在当前 map 任务的中间结果文件写入完成后,需要通过 RPC 请求通知主节点所有中间结果文件的文件地址。在本实验中,各个工作节点运行在同一台机器上,实验要求保存 map 任务的中间结果文件到当前文件夹,这样 reduce 任务就能通过中间结果文件的文件名读取中间结果数据。

对中间结果数据的写文件可以借助 Goencoding/json 模块,例如:

1
2
3
enc := json.NewEncoder(file)
for _, kv := ... {
err := enc.Encode(&kv)

然后 reduce 任务读取文件:

1
2
3
4
5
6
7
8
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}

reduce 任务

reduce 任务负责将所有中间结果数据按照键排序后应用用户自定义的 reduce 函数,并生成最终输出文件,文件格式为 mr-out-X,其中 X 表示 reduce 任务的编号。文件中的每一行对应一个 reduce 函数调用的结果,需要按照 Go%v %v 形式对键值对进行格式化。

在收到所有的中间结果数据之前,reduce 任务无法开始执行,所以在这个期间当工作节点请求 reduce 任务时,如果主节点暂时没有新的 reduce 任务可分发,则可返回一个空任务,工作节点收到响应后则暂时等待一段时间再重新请求任务。reduce 每次收到中间结果数据后会暂存在内存中,如果暂存的中间结果数据的数量等于 map 任务的数量(这个值可以放在 RPC 响应中),则说明所有中间结果数据已经接收完毕,可以开始执行 reduce 任务。

注意

mrworker.go 中注释描述通过 go run mrworker.go wc.so 来运行工作节点,不过如果构建 wc.so 时开启了竞争检测(-race),则运行 mrworker.go 时也同样需要开启竞争检测,否则会提示 cannot load plugin wc.so,如果打印 err 则会显示 plugin.Open("wc"): plugin was built with a different version of package runtime

同样的,如果使用 GoLand 调试程序,由于在调试模式下 GoLand 编译 Go 文件时会增加 -gcflags="all=-N -l" 参数,所以也需要在打包 wc.so 时增加相同的参数。

活锁

在最后的 crash test 遇到个类似活锁的问题,在前文提到,如果某个工作节点开始了 reduce 任务但是还没有接收全部的中间结果数据,则该节点下次申请任务时会继续申请该 reduce 任务(普通工作节点对申请的任务类型没有要求)。在 crash test 下,工作节点在执行用户自定义的 mapreduce 函数时有一定概率结束进程,假设现在有4个工作节点,其中一个在执行 map 任务,另外三个各自在执行 reduce 任务(轮询获取中间结果数据),不幸的是
这个时候 map 节点挂了,此时 test-mr.sh 会自动再启动一个工作节点,然后更不幸的是挂掉的 map 任务在主节点看来还没有到超时时间,所以主节点此时不会分配 map 任务给新的节点(假设没有其他 map 任务),会再分配一个 reduce 任务给新的节点,至此所有工作节点都在执行 reduce 任务,又都在等待中间结果数据完成,却又不可能完成。

造成这个问题的主要原因是任务分配顺序,上述问题下的任务分配顺序是:

  1. 指定的 reduce 任务
  2. 空闲或超时的 map 任务
  3. 空闲或超时的 reduce 任务

解决方法就是把前两个换下顺序即可,即:

  1. 空闲或超时的 map 任务
  2. 指定的 reduce 任务
  3. 空闲或超时的 reduce 任务

参考

Lab 1 虽然是个单机器多线程、多进程的程序,但主节点和工作节点的交互依然通过 RPC 实现,Go 本身也提供了开箱即用的 RPC 功能,下面将通过一个简单的求和服务来了解在 Go 中如何实现一个 RPC 服务。

定义请求体和响应体

请求体和响应体都非常简单,SumRequest 中包含要求和的两个数字,SumReply 中存放求和的结果:

1
2
3
4
5
6
7
8
9
10
package pb

type SumRequest struct {
A int
B int
}

type SumReply struct {
Result int
}

服务端

首先定义服务类 SumService 和提供的方法:

1
2
3
4
5
6
7
8
type SumService struct {
}

func (sumService *SumService) Sum(sumRequest *pb.SumRequest, sumReplay *pb.SumReply) error {
sumReplay.Result = sumRequest.A + sumRequest.B

return nil
}

SumService 只有一个 Sum 方法,接收 SumRequestSumReply 两个参数,求和后将结果放回到 SumReply 中(Sum 的方法签名必须是这样的形式,即两个入参和一个 error 类型的出参,具体规则见下文描述)。

然后进行服务注册:

1
2
3
sumService := &SumService{}
rpc.Register(sumService)
rpc.HandleHTTP()

通过 rpc.Register 这个方法可以知道,一个服务类及其提供的方法必须满足以下条件才能注册成功:

  1. 服务类必须是公共的
  2. 服务类提供的方法必须是公共的
  3. 服务类提供的方法入参必须是两个,一个表示请求,一个表示响应(从编码的角度来说方法入参是两个,但是实际代码是判断是否等于3个,因为在这种场景下定义的方法的第一个入参类似于 Java 中的 this
  4. 服务类提供的方法的第一个参数类型必须是公共的或者是 Go 内置的数据类型
  5. 服务类提供的方法的第二个参数类型也必须是公共的或者是 Go 内置的数据类型,且必须是指针类型
  6. 服务类提供的方法的出参个数只能是1个
  7. 服务类提供的方法的出参类型必须是 error

rpc.HandleHTTP() 表示通过 HTTP 作为客户端和服务端间的通信协议,当客户端发起一个 RPC 调用时,本质上是将要调用的方法和参数包装成一个 HTTP 请求,服务端收到 HTTP 请求后,解码出要调用的本地方法名称和入参,然后调用本地方法,在本地方法调用完成后再将结果写入到 HTTP 响应中,客户端收到响应后,再解析出远程调用的结果。

rpc.HandleHTTP() 本质上是个 HTTP 路由注册,实际上是调用 Handle(pattern string, handler Handler) 方法,当请求路由匹配 pattern 时,会调用对应的 handler 执行,对于 Go RPC 来说,固定路由路径是 /_goRPC_

所以,在完成 HTTP 路由注册后,还需要配合开启一个 HTTP 服务,这样才能接受远程服务调用:

1
2
3
4
5
6
7
8
listener, err := net.Listen("tcp", ":1234")

if err != nil {
log.Fatal("listen error:", err)
}

fmt.Println("Listening on port 1234")
http.Serve(listener, nil)

http.Serve 方法中对于每一个客户端的连接,最终会分配一个 goroutine 来调用 HandlerServeHTTP(ResponseWriter, *Request) 方法来处理,对于 GoRPC 包来说,则可以实现该方法来处理 RPC 请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ServeHTTP implements an http.Handler that answers RPC requests.
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "CONNECT" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
io.WriteString(w, "405 must CONNECT\n")
return
}
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
return
}
io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
server.ServeConn(conn)
}

客户端

对于客户端来说,发起远程方法调用前需要先和服务端建立连接:

1
2
3
4
5
6
7
client, err := rpc.DialHTTP("tcp", ":1234")

if err != nil {
log.Fatal("dialing:", err)
}

defer client.Close()

该方法同时返回了一个 RPC 客户端类,内部同时负责对 RPC 请求的编码和解码。

然后通过 client.Call 来发起远程调用:

1
2
3
4
5
6
7
8
9
10
11
12
sumRequest := &pb.SumRequest{
A: 1,
B: 2,
}
sumReply := &pb.SumReply{}
err = client.Call("SumService.Sum", sumRequest, sumReply)

if err != nil {
log.Fatal("call error:", err)
}

fmt.Println("Result:", sumReply.Result)

这里的调用一共有三个参数,第一个是被调用的远程方法名,需要是 类名.方法名 的形式,后两个则是远程方法的约定入参。

完整的代码可参考 go-rpc-demo

参考

Lab 1 提供了一个串行化的示例 MapReduce 程序,整体分两部分,第一部分是用户自定义的 mapreduce 函数,第二部分是框架代码。

用户自定义 map 和 reduce 函数

以单词计数应用 wc.go 为例,对于 map 函数来说,它的输入键值对类型为 <string, string>,中间结果数据类型为框架定义的 KeyValue 类型,本质上也是个 <string, string> 类型。map 函数首先将文件内容拆分为单词,然后遍历每个单词,输出对应中间结果数据 <w, "1">

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//
// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
//
func Map(filename string, contents string) []mr.KeyValue {
// function to detect word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }

// split contents into an array of words.
words := strings.FieldsFunc(contents, ff)

kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}

reduce 函数的输出类型为 string,其逻辑较为简单,中间结果数组的长度就是单词的个数:

1
2
3
4
5
6
7
8
9
//
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
//
func Reduce(key string, values []string) string {
// return the number of occurrences of this word.
return strconv.Itoa(len(values))
}

框架代码

我们可以通过 go run -race mrsequential.go wc.so pg*.txt 来运行串行化的 MapReduce 程序,这里的 wc.so 内包含了用户自定义的 mapreduce 函数,pg*.txt 则是本次 MapReduce 程序的原始输入数据。

首先,根据入参提供的插件找到用户自定义的 mapreduce 函数:

1
mapf, reducef := loadPlugin(os.Args[1])

接着,依次读取输入文件的内容,并调用用户自定义的 map 函数,生成一组中间结果数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//
// read each input file,
// pass it to Map,
// accumulate the intermediate Map output.
//
intermediate := []mr.KeyValue{}
for _, filename := range os.Args[2:] {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
}

然后,对所有中间结果数据按照键进行排序,而在 MapReduce 论文中,中间结果数据会经过分片函数分发给不同的 reduce 节点,由 reduce 节点自行排序处理:

1
2
3
4
5
6
7
//
// a big difference from real MapReduce is that all the
// intermediate data is in one place, intermediate[],
// rather than being partitioned into NxM buckets.
//

sort.Sort(ByKey(intermediate))

最后,遍历所有中间结果数据,对同键的中间结果数据调用用户自定义的 reduce 函数,并将结果写入到最终输出文件中,同样的,这里也只有一个最终输出文件而不是多个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
oname := "mr-out-0"
ofile, _ := os.Create(oname)

//
// call Reduce on each distinct key in intermediate[],
// and print the result to mr-out-0.
//
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)

// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

i = j
}

ofile.Close()

至此,一个串行化的 MapReduce 程序就完成了。

参考

Lab 1 中遇到的第一个命令是 go build -race -buildmode=plugin ../mrapps/wc.go,其中 -buildmode=plugin 表示以插件的形式打包源文件,这里的 wc.go 是用户实现的 mapreduce 方法,这体现了面向接口编程的思想,只要用户编写的 mapreduce 方法遵循统一的签名,则可以在不重新编译 MapReduce 框架代码的情况下,实时替换运行不同的用户应用。

假设有个 sum.go 文件,里面只有一个 Sum 方法:

1
2
3
4
5
package main

func Sum(a int, b int) int {
return a + b
}

sum.go 以插件形式编译:

1
go build -buildmode=plugin sum.go

会生成一个 sum.so 文件。

接着,在 main.go 中就可以通过 plugin.Open 读取 sum.so

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"log"
"plugin"
)

func main() {
fileName := "sum.so"
p, err := plugin.Open(fileName)

if err != nil {
log.Fatalf("cannot load plugin %v", fileName)
}

sumSymbol, err := p.Lookup("Sum")

if err != nil {
log.Fatalf("cannot find Map in %v", fileName)
}

sum := sumSymbol.(func(int, int) int)

fmt.Println("1 + 2 is", sum(1, 2))
}

然后通过 Lookup 根据方法名找到 Sum 方法,按照指定方法签名转换后即可进行调用。而如果需要换一个 Sum 的实现,则无需重新编译 main.go

参考:

Lab 1 中遇到的第一个命令是 go build -race -buildmode=plugin ../mrapps/wc.go,其中 -race 表示启用 Go 的竞争检测,在多线程编程时,数据竞争是必须要考虑的问题,而数据竞争的问题往往难以察觉和排查,所以 Go 内置了竞争检测的工具来帮助开发人员提前发现问题。另外,MIT 6.824 是一门分布式系统课程,必然会涉及多线程编程,所以竞争检测也是校验 Lab 程序正确性的一种方式。

介绍

Go 借助 goroutine 来实现并发编程,所以数据竞争发生在多个 goroutine 并发读写同一个共享变量时,并且至少有一个 goroutine 存在写操作。来看一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import "fmt"

func main() {
done := make(chan bool)
m := make(map[string]string)
m["name"] = "world"
go func() {
m["name"] = "data race"
done <- true
}()
fmt.Println("Hello,", m["name"])
<-done
}

其中 m 是一个共享变量,被两个 goroutine 并发读写,将上述代码保存为文件 racy.go,然后开启竞争检测执行:

1
go run -race racy.go

会输出类似如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Hello, world
==================
WARNING: DATA RACE
Write at 0x00c00007e180 by goroutine 7:
runtime.mapassign_faststr()
/usr/local/go/src/runtime/map_faststr.go:203 +0x0
main.main.func1()
/path/to/racy.go:10 +0x50

Previous read at 0x00c00007e180 by main goroutine:
runtime.mapaccess1_faststr()
/usr/local/go/src/runtime/map_faststr.go:13 +0x0
main.main()
/path/to/racy.go:13 +0x16b

Goroutine 7 (running) created at:
main.main()
/path/to/racy.go:9 +0x14e
==================
==================
WARNING: DATA RACE
Write at 0x00c000114088 by goroutine 7:
main.main.func1()
/path/to/racy.go:10 +0x5c

Previous read at 0x00c000114088 by main goroutine:
main.main()
/path/to/racy.go:13 +0x175

Goroutine 7 (running) created at:
main.main()
/path/to/racy.go:9 +0x14e
==================
Found 2 data race(s)
exit status 66

竞争检测会提示第13行和第10行存在数据竞争,一共涉及两个 goroutine,一个是主 goroutine,另一个是手动创建的 goroutine

典型数据竞争场景

循环计数器竞争

这个例子中每次循环时会创建一个 goroutine,每个 goroutine 会打印循环计数器 i 的值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"sync"
)

func main() {
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
fmt.Println(i) // Not the 'i' you are looking for.
wg.Done()
}()
}
wg.Wait()
}

我们想要的结果是能输出 01234 这5个值(实际顺序并不一定是 01234),但由于主 goroutinei 的更新和被创建的 goroutinei 的读取之间存在数据竞争,最终的输出结果可能是 55555 也可能是其他值。

如果要修复这个问题,每次创建 goroutine 时复制一份 i 再传给 goroutine 即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"sync"
)

func main() {
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func(j int) {
fmt.Println(j) // Good. Read local copy of the loop counter.
wg.Done()
}(i)
}
wg.Wait()
}

意外地共享变量

在日常开发中,我们可能不经意间在多个 goroutine 间共享了某个变量。在下面的例子中,首先 f1, err := os.Create("file1") 会创建一个 err 变量,接着在第一个 goroutine 中对 file1 写入时会对 err 进行更新(_, err = f1.Write(data)),然而在主 goroutine 中创建 file2 时同样会对 err 进行更新(f2, err := os.Create("file2"),这里虽然用的是 :=,不过 err 并不是一个新的变量,在同一个作用域下是不允许重复对某个同名变量使用 := 创建的,因为 f2 是一个新的变量,所以这里可用 :=),这就产生了数据竞争:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import "os"

// ParallelWrite writes data to file1 and file2, returns the errors.
func ParallelWrite(data []byte) chan error {
res := make(chan error, 2)
f1, err := os.Create("file1")
if err != nil {
res <- err
} else {
go func() {
// This err is shared with the main goroutine,
// so the write races with the write below.
_, err = f1.Write(data)
res <- err
f1.Close()
}()
}
f2, err := os.Create("file2") // The second conflicting write to err.
if err != nil {
res <- err
} else {
go func() {
_, err = f2.Write(data)
res <- err
f2.Close()
}()
}
return res
}

func main() {
err := ParallelWrite([]byte("Hello, world!"))
<-err
}

修复方法也很简单,在每个 goroutine 中使用 := 创建 err 变量即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import "os"

// ParallelWrite writes data to file1 and file2, returns the errors.
func ParallelWrite(data []byte) chan error {
res := make(chan error, 2)
f1, err := os.Create("file1")
if err != nil {
res <- err
} else {
go func() {
// This err is shared with the main goroutine,
// so the write races with the write below.
_, err := f1.Write(data)
res <- err
f1.Close()
}()
}
f2, err := os.Create("file2") // The second conflicting write to err.
if err != nil {
res <- err
} else {
go func() {
_, err := f2.Write(data)
res <- err
f2.Close()
}()
}
return res
}

func main() {
err := ParallelWrite([]byte("Hello, world!"))
<-err
}

未受保护的全局变量

某个包下对外暴露了 RegisterServiceLookupService 两个方法,而这两个方法会对同一个 map 变量进行读写,客户端调用时有可能多个 goroutine 并发调用,从而存在数据竞争:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"net"
)

var service map[string]net.Addr = make(map[string]net.Addr)

func RegisterService(name string, addr net.Addr) {
service[name] = addr
}

func LookupService(name string) net.Addr {
return service[name]
}

func main() {
go func() {
RegisterService("hello", &net.IPAddr{IP: net.ParseIP("127.0.0.1")})
}()
go func() {
fmt.Println(LookupService("hello"))
}()
}

可以通过 sync.Mutex 来保证可见性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"net"
"sync"
)

var (
service map[string]net.Addr = make(map[string]net.Addr)
serviceMu sync.Mutex
)

func RegisterService(name string, addr net.Addr) {
serviceMu.Lock()
defer serviceMu.Unlock()
service[name] = addr
}

func LookupService(name string) net.Addr {
serviceMu.Lock()
defer serviceMu.Unlock()
return service[name]
}

func main() {
go func() {
RegisterService("hello", &net.IPAddr{IP: net.ParseIP("127.0.0.1")})
}()
go func() {
fmt.Println(LookupService("hello"))
}()
}

未受保护的基本类型变量

除了 map 这样的复杂数据类型外,基本类型变量同样会存在数据竞争,如 boolintint64。例如在下面的例子中,主 goroutinew.last 的更新和创建的 goroutine 中对 w.last 的读取间存在数据竞争:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"os"
"time"
)

type Watchdog struct{ last int64 }

func (w *Watchdog) KeepAlive() {
w.last = time.Now().UnixNano() // First conflicting access.
}

func (w *Watchdog) Start() {
go func() {
for {
time.Sleep(time.Second)
// Second conflicting access.
if w.last < time.Now().Add(-10*time.Second).UnixNano() {
fmt.Println("No keepalives for 10 seconds. Dying.")
os.Exit(1)
}
}
}()
}

func main() {
watchdog := &Watchdog{}
watchdog.Start()
watchdog.KeepAlive()
select {}
}

我们依然可以借助 channelsync.Mutex 来修复这个问题,不过类似于 JavaGo 中同样有相应的原子变量来处理基本类型的并发读写,上述例子就可以通过原子包下的 atomic.StoreInt64atomic.LoadInt64 来解决:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"os"
"sync/atomic"
"time"
)

type Watchdog struct{ last int64 }

func (w *Watchdog) KeepAlive() {
atomic.StoreInt64(&w.last, time.Now().UnixNano())
}

func (w *Watchdog) Start() {
go func() {
for {
time.Sleep(time.Second)
if atomic.LoadInt64(&w.last) < time.Now().Add(-10*time.Second).UnixNano() {
fmt.Println("No keepalives for 10 seconds. Dying.")
os.Exit(1)
}
}
}()
}

func main() {
watchdog := &Watchdog{}
watchdog.Start()
watchdog.KeepAlive()
select {}
}

未同步的 send 和 close 操作

虽然对一个 channel 的发送和相应的读取完成之间存在 happens-before 的关系,但是对 channel 的发送和关闭间并没有 happens-before 的保证,依然存在数据竞争:

1
2
3
4
5
6
7
8
9
10
11
package main

func main() {
c := make(chan struct{}) // or buffered channel

// The race detector cannot derive the happens before relation
// for the following send and close operations. These two operations
// are unsynchronized and happen concurrently.
go func() { c <- struct{}{} }()
close(c)
}

所以在关闭 channel 前,增加对 channel 的读取操作来保证数据发送完成:

1
2
3
4
5
6
7
8
9
package main

func main() {
c := make(chan struct{}) // or buffered channel

go func() { c <- struct{}{} }()
<-c
close(c)
}

竞争检测的开销

在开启竞争检测的情况下,程序的内存使用可能会增加5到10倍,性能可能会增加2到20倍,所以一般建议在压力测试或集成测试阶段开启,或者线上集群选择某台机器单独开启。

参考

看汇编语言时看到,标志寄存器中 CF 标志位表示无符号数运算时是否向最高有效位外的更高位产生进位或借位,而 OF 标志位表示有符号数运算时是否产生溢出。这里存在两个疑问:

  1. 对于 CPU 来说,它并不区分处理的是无符号数还是有符号数,那什么时候设置 CF,什么时候设置 OF
  2. CF 表示进位时也是一种溢出,能否和 OF 共用一个

CF

首先来看 CF 进位的例子,这里我们以8位无符号数为例,其最大值为255,那么计算 255 + 1 则会产生进位。可以通过一段简单的汇编代码进行验证:

1
2
3
4
5
6
7
8
.section .text
.globl _start
_start:
mov $255, %al
add $1, %al
movl $1, %eax
movl $0, %ebx
int $0x80

在上述代码中,al 是一个8位寄存器,是 eax 寄存器的低8位,这里首先将255放到 al 寄存器内,然后对 al 寄存器中的值加1并放回到 al 寄存器中,即实现 255 +1 的运算。

最后的 int $0x80 中的 int 表示 interrupt,即中断,当发生一个中断时会有一个与之对应的中断处理程序来处理,这里的 $0x80 就是声明由哪个中断处理程序处理,在 Linux 中,$0x80 对应的是操作系统内核,用于发起一个系统调用,而具体发起哪个系统调用则由 eax 中的值决定,这就是 movl $1, %eax 的作用,1对应的系统调用是 exit,用于退出程序,而程序退出时会伴有一个状态码,这个状态码的值来自于 ebx,也就是 movl $0, %ebx 的作用,这里使用0来表示程序正常退出。

接下来我们借助 gdb 来观察程序运行时 CF 的值的变化。首先将上述代码保存为 demo.s 后进行编译:

1
as demo.s -o demo.o -gstabs+

这里的 -gstabs+ 表示生成机器码时同时生成调试信息,如果没有这个选项后续 gdb 加载时会提示 (No debugging symbols found in ./demo)

然后进行链接:

1
ld demo.o -o demo

这个时候就可以通过 gdb 加载生成的可执行文件:

1
gdb ./demo

alt

然后输入 break 4 在代码第四行设置一个断点,即 mov $255, %al 处,最后输入 run 开始调试执行:

alt

此时可输入 layout reg 来观察各寄存器内的值,我们需要关注的是 eflags 寄存器,它展示了哪些标志位生效了:

alt

或者通过执行 info registers eflags 来查看 eflags 的值:

1
2
(gdb) info registers eflags
eflags 0x202 [ IF ]

目前只有一个 IF 标志位,它用于表示是否响应中断。

接着,输入 next 来执行当前断点所在处的指令,可以看到,执行后 rax 寄存器内的值变成了255(rax 是64位 CPU 下的一个通用寄存器,32位 CPU 下对应为 eax):

alt

再输入一次 next 来执行加法运算,此时 rax 中的值变为了0(实际的二进制结果应该是100000000,因为 al 寄存器最多只能表示8位,所以最高位的1无法表示,最终结果为0),eflags 中出现了 CF 标志位,说明发生了进位:

alt

rax 中的值为0也说明了加法运算后产生的进位并不会体现在比参与运算的寄存器位数更多的寄存器中,否则 rax 中的值应该是256。

再来看借位,将程序稍加修改执行一个 1 - 2 的运算:

1
2
3
4
5
6
7
8
.section .text
.globl _start
_start:
mov $1, %al
sub $2, %al
movl $1, %eax
movl $0, %ebx
int $0x80

最后 rax 中的值为255(存在高位借位的情况下最后的二进制结果为11111111,解释为无符号数为255),eflags 中同样出现了 CF 标志位。

alt

所以,CF 的标记取决于两个二进制数的运算是否产生进位或借位。

OF

有符号数的溢出分两种情况,一种是运算结果应该是正数却返回负数,另一种是运算结果应该是负数却返回正数。

首先来看两个正数运算得到负数的例子,同样对代码稍加修改实现 127 + 1 的运算:

1
2
3
4
5
6
7
8
.section .text
.globl _start
_start:
mov $127, %al
add $1, %al
movl $1, %eax
movl $0, %ebx
int $0x80

最后 rax 中的值为128(对应二进制表示为10000000,以有符号数的角度来看,其值为-128,即两个正数相加得到一个负数),eflags 中出现了 OF 标志位,说明发生了溢出:

alt

从有符号数的角度来看,参与运算的两个数的符号位都是0,相加后符号位却是1,所以 OF 设置为1。

再来看两个负数运算得到正数的例子,再次对代码稍加修改实现 -128 - 1 的运算,-128的二进制补码表示为10000000,即无符号数角度下的128,-1的二进制补码表示为11111111,即无符号数角度下的255:

1
2
3
4
5
6
7
8
.section .text
.globl _start
_start:
mov $128, %al
add $255, %al
movl $1, %eax
movl $0, %ebx
int $0x80

最后 rax 中的值为127(对应二进制表示为01111111,以有符号数的角度来看,其值为127,即两个负数相加得到一个正数),eflags 中出现了 OF 标志位,说明发生了溢出:

alt

从有符号数的角度来看,参与运算的两个数的符号位都是1,相加后符号位却是0,所以 OF 设置为1。

所以,OF 的标记取决于运算结果的符号位是否发生变化,这里的变化指的是两个相同符号位的数的运算结果是一个不同符号位的数。

比较

注意到前面有符号数 -128 - 1 运算的例子中,最后 CFOF 都被设置为了1,说明 CFOF 并不是互斥的关系,在这个例子中即发生了进位又发生了符号位的变更,也就是说如果满足了设置 CF 的条件,那么 CF 就是1,如果满足了设置 OF 的条件,那么 OF 就是1。因此,回到文章开头的问题,CPU 并不是去判断该设置 CF 还是 OF,而是只要条件满足就会设置对应的标志位,而具体应该关注哪个标志位,则交由编译器去判断,因为对 CPU 而言它处理的只是比特运算,只有编译器知道当前的运算数是无符号数还是有符号数。

另外,CFOF 也不能合二为一,无法相互替代,例如两个无符号数相加 CF 有可能是0,但是 OF 却是1,如 127 + 1;两个有符号数相加 OF 有可能是0,但是 CF 却是1,如 -1 - 1。也有可能 CFOF 都是1,如有符号数运算 -128 - 1

参考

Sonatype OSSRH (OSS Repository Hosting) 提供了 JAR 包发布服务,并支持自动将 JAR 包同步到 Maven 中央仓库,所以我们将 JAR 包发布到 Sonatype OSSRH 即可。

创建 Sonatype 工单

第一步在 Sonatype 上注册一个账号,创建成功后在上面创建一个 IssueProject 选择 Community Support - Open Source Project Repository Hosting (OSSRH)Issue Type 选择 New Project

alt

这里要注意的是 Group Id 的填写,根据 Coordinates 的描述,这里分两种情况:

  1. 你拥有某个域名,如 example.com
  2. 你没有域名,但是你的代码托管在了某个代码托管服务上,如 GitHub

对于第一种情况,你的 Group Id 可以是任何以 com.example 为前缀的字符串,如 com.example.myawesomeproject。不过,Sonatype 会要求你证明确实拥有 example.com 域名,你需要在你的域名注册商那创建一条 TXT 记录,其内容就是你创建的 Issue 的工单号,如 OSSRH-12345,具体步骤可参考 How do I set the TXT record needed to prove ownership of my Web Domain?

对于第二种情况,以 GitHub 为例,你的 Group Id 必须是 io.github.myusernamemyusername 是你的 GitHub 账户名或者是组织名,类似的,为了证明你对 myusername 的所有权,你需要在 myusername 下创建一个公开的仓库,仓库名称为你所创建 Issue 的工单号,如 OSSRH-12345,认证完成之后你就可以删掉这个仓库。Sonatype 所支持的代码托管服务如下:

Service Example groupId
GitHub io.github.myusername
GitLab io.gitlab.myusername
Gitee io.gitee.myusername
Bitbucket io.bitbucket.myusername
SourceForge io.sourceforge.myusername

工单示例可参考 Publish my open source java package

安装 GPG

GPG 用于对所发布的包进行签名,在 GnuPG 根据自己的操作系统下载 GPG 安装包,安装完成后执行 gpg --full-gen-key 生成秘钥对,选择默认选项即可,生成秘钥对时会要求输入姓名、邮箱、注释和密码,其中密码在发布阶段会用到,秘钥生成信息类似如下:

1
2
3
4
pub   rsa3072 2022-02-26 [SC]
E892F685E5EA9005E0A2DE31F0F732425A15D81D
uid examplename <examplename@example.com>
sub rsa3072 2022-02-26 [E]

其中 E892F685E5EA9005E0A2DE31F0F732425A15D81D 是秘钥的 ID,然后我们需要将公钥分发到公共的秘钥服务器上,这样 Sonatype 就可以通过这个公钥来验证我们所发布包的签名是否正确:

1
gpg --keyserver keyserver.ubuntu.com --send-keys E892F685E5EA9005E0A2DE31F0F732425A15D81D

这里选择的公共秘钥服务器是 keyserver.ubuntu.com,也可以选择其他服务器,如 keys.openpgp.org 或者 pgp.mit.edu

配置 settings.xml

为了将包发到 Sonatype OSSRH,需要在 Mavensettings.xml 中配置用户信息,即在 servers 下添加如下信息,这里的 your-jira-idyour-jira-pwd 对应第一步创建的账号和密码:

1
2
3
4
5
<server>
<id>ossrh</id>
<username>your-jira-id</username>
<password>your-jira-pwd</password>
</server>

另外,为了在打包时对文件进行签名还需要在 profiles 下添加如下信息,这里的 the_pass_phrase 为生成 GPG 秘钥时设置的密码:

1
2
3
4
5
6
7
8
9
10
<profile>
<id>ossrh</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<gpg.executable>gpg</gpg.executable>
<gpg.passphrase>the_pass_phrase</gpg.passphrase>
</properties>
</profile>

配置 pom.xml

最后是配置 pom.xml,首先我们需要告诉 Maven 将包部署到 Sonatype OSSRH,需要增加一个 nexus-staging-maven-plugin 插件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.7</version>
<extensions>true</extensions>
<configuration>
<serverId>ossrh</serverId>
<nexusUrl>https://s01.oss.sonatype.org/</nexusUrl>
<autoReleaseAfterClose>true</autoReleaseAfterClose>
</configuration>
</plugin>
</plugins>
</build>

然后是配置 Javadoc 和源码插件,如果最后的 JAR 包没有包含 Javadoc 和源码,Sonatype 会不允许通过:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

不过上述配置不适合 Kotlin 项目,会提示 Missing: no javadoc jar found in folder '/com/example/username/awesomeproject',需要将 maven-javadoc-plugin 替换为 dokka-maven-plugin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<build>
<plugins>
<plugin>
<groupId>org.jetbrains.dokka</groupId>
<artifactId>dokka-maven-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<id>attach-javadocs-dokka</id>
<goals>
<goal>javadocJar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

最后,剩下补充一些元数据,这个也是必填项,包括:

  • 项目名称,描述和地址
  • 许可证信息
  • 开发者信息
  • 源码地址

完整的示例可参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.simpligility.training</groupId>
<artifactId>ossrh-demo</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<name>ossrh-demo</name>
<description>A demo for deployment to the Central Repository via OSSRH</description>
<url>http://github.com/simpligility/ossrh-demo</url>

<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>

<developers>
<developer>
<name>Manfred Moser</name>
<email>manfred@sonatype.com</email>
<organization>Sonatype</organization>
<organizationUrl>http://www.sonatype.com</organizationUrl>
</developer>
</developers>

<scm>
<connection>scm:git:git://github.com/simpligility/ossrh-demo.git</connection>
<developerConnection>scm:git:ssh://github.com:simpligility/ossrh-demo.git</developerConnection>
<url>http://github.com/simpligility/ossrh-demo/tree/master</url>
</scm>

...

</project>

发包

执行 mvn clean deploy 即可发包,如果执行成功,在提交的工单中会自动增加一条回复:

Central sync is activated for com.example.awesomeproject. After you successfully release, your component will be available to the public on Central https://repo1.maven.org/maven2/, typically within 30 minutes, though updates to https://search.maven.org can take up to four hours.

也就是30分钟内即可从 Maven 中央仓库下载 JAR 包,不过要想能在 search.maven.org 搜索到你的 JAR 包,需要等待至多4个小时。

另外,因为配置 nexus-staging-maven-plugin 时指定了 autoReleaseAfterClosetrue,所以发包后不需要去 https://oss.sonatype.org/#stagingRepositories 手动执行 closerelease 操作。

参考

二叉搜索树的删除可以分为三种情况。第一,被删除的节点是叶子节点:

alt

第二,被删除的节点只有一个孩子节点:

alt

第三,被删除的节点有两个孩子节点:

alt

对于第一种情况,我们只需断开被删除的节点和其父节点的关联即可,即将节点3的左孩子节点指针置为空;对于第二种情况,我们可以用被删除的节点的孩子节点来替代被删除的节点,即将节点5的右孩子指针改为指向节点7;第三种情况是最为复杂的情况,相当于删除一个子树的根节点,为了保持二叉搜索树的性质,我们可以使用左子树中的最大值或右子树的最小值来替代被删除的根节点。

不过在实现时,考虑到实现的简便,对于第三种情况会通过直接修改当前节点的值来替代修改节点的指针指向,以上述例子来说,如果使用指针修改的方式,则需要修改节点5的左孩子指针,修改节点2的左孩子指针和右孩子指针(这里假设使用节点2来替代被删除的节点3),总共三处修改较为繁琐;而如果使用修改节点值的方式,只需要先将节点3的值改为2(这里假设使用节点2来替代被删除的节点3),然后就可以将问题转化为在余下的左子树中删除节点2。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class TreeNode:
def __init__(self, val=0, left=None, right=None):
self.val = val
self.left = left
self.right = right


class Solution:
def deleteNode(self, root: TreeNode, key: int) -> TreeNode:
if not root:
return root

if root.val < key:
root.right = self.deleteNode(root.right, key)
elif root.val > key:
root.left = self.deleteNode(root.left, key)
else:
if root.left and root.right:
root.val = self._find_min(root.right)
root.right = self.deleteNode(root.right, root.val)
else:
return root.left or root.right

return root

def _find_min(self, root):
while root.left:
root = root.left

return root.val

介绍

MapReduce: Simplified Data Processing on Large Clusters6.824: Distributed Systems 中所介绍的第一篇论文。它提出了一种针对大数据处理的编程模型和实现,使得编程人员无需并行和分布式系统经验就可以轻松构建大数据处理应用。该模型将大数据处理问题拆解为两步,即 mapreducemap 阶段将一组输入的键值对转化为中间结果键值对,reduce 阶段对中间结果键值对按照相同的键进行值的合并,从而得到最终的结果。

背景

对于 Google 来说,每天运行的系统会产生大量的原始数据,同时又要对这些原始数据进行加工产生各种衍生数据,虽然大部分数据加工的逻辑都较为简单,然而由于数据量过于庞大,为了在合理的时间内完成数据处理,通常需要将待处理的数据分发到几百或几千台机器上并行计算,这就存在几个问题:

  1. 如何使计算可并行
  2. 如何分发数据
  3. 如何处理异常

如果每一个数据加工任务都需要独立去解决上述的问题,一方面会使得原本简单的代码逻辑变得庞大、复杂和难以维护,另一方面也是在重复工作。受 Lisp 等其他函数式编程语言中的 mapreduce 函数的启发,Google 的工程师们发现大部分的数据处理遵循如下的模式:

  1. 对输入的每一条数据应用一个 map 函数产生一组中间结果键值对
  2. 对中间结果键值对按照相同的键聚合后,应用 reduce 函数生成最终的衍生数据

因此,Google 的工程师们抽象出了 MapReduce 框架,使得应用开发人员可以专注于计算逻辑实现而无需关心底层运行细节,统一由框架层处理并行、容错、数据分发和负载均衡等系统问题。现在再来看前面提到的问题是如何解决的:

  1. 如何使计算可并行:在 map 阶段,对数据分发后,各任务间无依赖,可并行执行;在 reduce 阶段,不同 key 的数据处理间无依赖,可并行执行
  2. 如何分发数据:在 map 阶段,可按执行 map 任务的节点数量平均分发(这只是一种可能的策略,具体分发策略见后文描述);在 reduce 阶段,可按 key 相同的数据聚合后分发
  3. 如何处理异常:重新执行某个节点上失败的 mapreduce 任务作为首要的容错手段

编程模型

假设需要统计一组文档中每个单词出现的次数,在 MapReduce 框架下用户需要编写 mapreduce 函数,近似的伪代码表示如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");

reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));

假设有两个文档 hello.txtworld.txt,其内容分别为:

1
2
3
4
5
hello.txt:
It was the best of times

world.txt:
it was the worst of times

对上述 mapreduce 函数来说,map 函数每次处理一个文档,key 为文档的名称,value 为文档的内容,即:

1
2
map("hello.txt", "It was the best of times")
map("world.txt", "it was the worst of times")

map 函数执行时会遍历文档的内容,对每个单词输出中间结果键值对(作为示例,这里省去了将文档内容拆分为单词的过程,同时也忽略了标点符号、大小写等与示例无关的内容),键为单词,值为 "1",所有 map 函数执行完成后生成的中间结果为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
hello.txt:
it 1
was 1
the 1
best 1
of 1
times 1

world.txt:
it 1
was 1
the 1
worst 1
of 1
times 1

然后,MapReduce 框架对所有中间结果按照相同的键进行聚合,即:

1
2
3
4
5
6
7
it ["1", "1"]
was ["1", "1"]
the ["1", "1"]
best ["1"]
worst ["1"]
of ["1", "1"]
times ["1", "1"]

最后,MapReduce 框架将上述聚合后的数据分发给 reduce 函数执行,即:

1
2
3
4
5
6
7
reduce("it", ["1", "1"])
reduce("was", ["1", "1"])
reduce("the", ["1", "1"])
reduce("best", ["1"])
reduce("worst", ["1"])
reduce("of", ["1", "1"])
reduce("times", ["1", "1"])

reduce 函数执行时会遍历 values,将每个字符串转换为整型后累加,然后作为 reduce 的结果返回,最终得到所有单词出现的次数:

1
2
3
4
5
6
7
it 2
was 2
the 2
best 1
worst 1
of 2
times 2

实际执行 reduce 函数时,并不会将 values 一次性传给某个 reduce 函数,因为有可能数据量太大无法完全载入内存,所以 values 在实现时是个迭代器,reduce 函数能以流式的形式获取值。

另外,虽然在上述的例子中 mapreduce 处理的都是字符串类型的数据,但是也可以支持其他类型的数据,mapreduce 处理的数据类型遵循如下的模式:

1
2
map (k1, v1) -> list(k2, v2)
reduce (k2, list(v2)) -> list(v2)

可以看到,map 产生的中间结果的数据类型和最终结果的数据类型是一致的。对整个框架来说,最初的输入和最终的输出都是某种形式的字节流或字符串,因此在 GoogleC++ 实现中,提供了专门的数据转换接口,用户可实现该接口用于字符串和 mapreduce 需要的数据类型之间转换。

实现

MapReduce 的具体实现视硬件环境的不同而不同,论文中描述的实现是针对 Google 内部广泛使用的硬件环境,即通过交换以太网相连的大量廉价 PC 组成的集群:

  1. 每台机器的配置一般为双核 x86 处理器,2-4 GB 内存,运行 Linux 系统
  2. 使用廉价网络硬件,带宽一般为 100 Mbit/s1 Gbit/s,不过平均来说会小于 bisection bandwidthbisection bandwidth 指当某个网络被分成两部分时,这两部分间的带宽)
  3. 一个集群一般由几百上千台机器组成,所以机器异常是家常便饭
  4. 存储使用的是廉价的 IDE 硬盘,并直接装载到了机器上。不过 Google 内部实现了一套分布式文件存储系统来管理这些硬盘上的数据,并通过数据冗余作为在不可靠的硬件上实现可用性和可靠性的手段。
  5. 用户向调度系统提交一组任务,每个任务包含多个子任务,调度系统会为每个任务分配一批集群内的机器执行。

执行概览

map 执行阶段,框架会自动将输入数据分为 M 片,从而将 map 任务分发到多台机器上并行执行,每台机器只处理某一片的数据。同样的,在 reduce 阶段,框架首先将中间结果数据根据分片函数(例如 hash(key) mod R)拆分为 R 片,然后分发给 reduce 任务执行,用户可自行指定 R 的值和实现具体的分片函数。

下图展示了 Google 所实现的 MapReduce 框架的整体执行流程:

alt

当用户提交 MapReduce 任务后,框架会执行以下一系列流程(下文中的序号和上图中的序号对应):

  1. 首先 MapReduce 框架将输入数据分为 M 片,每片数据大小一般为 16 MB64 MB(具体大小可由用户入参控制),然后将 MapReduce 程序复制到集群中的一批机器上运行。
  2. 在所有的程序拷贝中,某台机器上的程序会成为主节点(master),其余称为工作节点(worker),由主节点向工作节点分派任务,一共有 Mmap 任务和 Rreduce 任务需要分派。主节点会选择空闲的工作节点分派 mapreduce 任务。
  3. 如果某个工作节点被分派了 map 任务则会读取当前的数据分片,然后将输入数据解析为一组键值对后传递给用户自定义的 map 函数执行。map 函数产生的中间结果键值对会暂存在内存中。
  4. 暂存在内存中的中间结果键值对会周期性的写入到本地磁盘中,并根据某个分片函数将这些数据写入到本地磁盘下的 R 个区,这样相同键的中间结果数据在不同的 map 节点下属于同一个区号,就可以在后续将同一个键的中间结果数据全部发给同一个 reduce 节点。同时,这些数据写入后的地址会回传给 master 节点,master 节点会将这些数据的地址发送给相应的 reduce 节点。
  5. reduce 节点接收到 master 节点发送的中间结果数据地址通知后,将通过 RPC 请求根据数据地址读取 map 节点生成的数据。在所有中间结果数据都读取完成后,reduce 节点会先将所有中间结果数据按照键进行排序,这样所有键相同的数据就聚合在了一起。之所以要排序是因为一个 reduce 节点会分发处理多个键下的中间结果数据。如果中间结果数据量太大不足以完全载入内存,则需要使用外部排序。
  6. reduce 节点执行时会先遍历排序后的中间结果数据,每遇到一个新的键就会将该键及其对应的所有中间结果数据传递给用户自定义的 reduce 函数执行。reduce 函数执行的结果数据会追加到当前 reduce 节点的最终输出文件里。
  7. 当所有 map 任务和 reduce 任务都执行完成后,master 节点会唤醒用户程序,并将控制权交还给用户代码。

当成功结束 MapReduce 任务后,其执行结果就保存在了 R 个文件中(每个文件对应一个 reduce 节点的产出,文件的名字由用户所指定)。一般来说,用户不必将这 R 个输出文件合并成一个,它们通常会作为另一个 MapReduce 任务的输入,或交由其他分布式应用处理。

基于上述流程,再来看在 编程模型 这节中的例子。假设有6个文档,分别是 1.txt6.txt,每个文档中的内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1.txt:
It was the best of times

2.txt:
it was the worst of times

3.txt:
it was the age of wisdom

4.txt:
it was the age of foolishness

5.txt:
it was the epoch of belief

6.txt:
it was the epoch of incredulity

对应 MapReduce 执行流程为:

  1. 我们假设每两个文档的数据大小为 16 MB,则6个文档对应3片数据
  2. 由1所知一共有3个 map 任务,不妨将 reduce 任务也设为3个,并将6个文档按顺序每两个一组依次分发给每个 map 节点
  3. 每个 map 节点处理的数据分片为两个文档,所产生的中间结果数据分别为:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    map worker 1:
    it 1
    was 1
    the 1
    best 1
    of 1
    times 1
    it 1
    was 1
    the 1
    worst 1
    of 1
    times 1

    map worker 2:
    it 1
    was 1
    the 1
    age 1
    of 1
    wisdom 1
    it 1
    was 1
    the 1
    age 1
    of 1
    foolishness 1

    map worker 3:
    it 1
    was 1
    the 1
    epoch 1
    of 1
    belief 1
    it 1
    was 1
    the 1
    epoch 1
    of 1
    incredulity 1
  4. 在每个 map 节点上将中间结果数据按照某个哈希函数分发到3个区,不妨为以下结果:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    map worker 1:
    region 1:
    it 1
    best 1
    it 1

    region 2:
    was 1
    of 1
    was 1
    worst 1
    of 1

    region 3:
    the 1
    times 1
    the 1
    times 1

    map worker 2:
    region 1:
    it 1
    age 1
    it 1
    age 1
    foolishness 1

    region 2:
    was 1
    of 1
    of 1
    wisdom 1
    was 1

    region 3:
    the 1
    the 1

    map worker 3:
    region 1:
    it 1
    epoch 1
    belief 1
    it 1
    epoch 1

    region 2:
    was 1
    of 1
    was 1
    of 1

    region 3:
    the 1
    the 1
    incredulity 1
  5. reduce 节点按照数据分区接收到所有中间结果数据后将其按照键排序:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    reduce worker 1:
    age 1
    age 1
    belief 1
    best 1
    epoch 1
    epoch 1
    foolishness 1
    it 1
    it 1
    it 1
    it 1
    it 1
    it 1

    reduce worker 2:
    of 1
    of 1
    of 1
    of 1
    of 1
    of 1
    was 1
    was 1
    was 1
    was 1
    was 1
    was 1
    wisdom 1
    worst 1

    reduce worker 3:
    incredulity 1
    the 1
    the 1
    the 1
    the 1
    the 1
    the 1
    times 1
    times 1
  6. reduce 节点调用用户自定义 reduce 函数计算单词出现次数,最终每个 reduce 节点的输出文件为:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    reduce worker 1 output:
    age 2
    belief 1
    best 1
    epoch 2
    foolishness 1
    it 6

    reduce worker 2 output:
    of 6
    was 6
    wisdom 1
    worst 1

    reduce worker 3 output:
    incredulity 1
    the 6
    times 2
  7. 将代码控制权交还给用户代码

Master 节点数据结构

master 节点需要维护当前所有的 mapreduce 任务,每个任务需区分不同的状态(空闲、进行中、完成),同时还需要知道每个任务对应的工作节点。作为 map 节点和 reduce 节点间中间结果数据的传输媒介,master 节点需保存 R 个中间结果分区,每当一个 map 节点执行成功时,会将生成的 R 个中间结果文件地址发送给 master 节点,当 master 节点收到通知后,会将其转发给对应进行中的 reduce 节点。

对应数据结构简单示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// 任务状态
enum TaskState {
// 空闲
Idle,

// 进行中
InProgress,

// 完成
Completed
}

// 一个 map 或 reduce 任务
class Task {
// 任务状态
TaskState state;

// 对应的工作节点 id
int workerId;
}

// 工作节点
class Worker {
// 节点 id
int id;
}

// Map 任务产生的中间结果文件,一个 map 任务一般会产生多个中间结果文件
class IntermediateFile {
// 文件地址
string location;

// 文件大小
long size;
}

// 中间结果文件集,所有 map 任务产生的中间结果文件会根据分片函数划分到本地磁盘下的 R 个区
class IntermediateFileRegion {
// 中间结果文件
IntermediateFile[] intermediateFiles;
}

// Map 节点
class MapWorker : Worker {
// 中间结果文件集,一共有 R 个
IntermediateFileRegion[] intermediateFileRegions;
}

// Reduce 节点
class ReduceWorker : Worker {
// 中间结果文件,master 节点会不断发送中间结果文件给 reduce 节点,当所有中间结果文件都收到后,reduce 节点开始工作
IntermediateFile[] intermediateFiles;
}

// 主节点
class Master {
// Map 任务,一共有 M 个
Task[] mapTasks;

// Reduce 任务,一共有 R 个
Task[] reduceTasks;

// 工作节点,最多有 M + R 个,一个工作节点并不是只负责 map 或者 reduce 任务,master 节点会选择空闲节点分派 map 或者 reduce 任务
Worker[] workers;

// 中间结果文件集,一共有 R 个,由 map 节点下的中间结果文件集聚合而来,某个 map 节点执行成功后会将生成的 R 个中间结果文件地址发送给 master 节点,由 master 节点将某个区下的中间结果文件地址转发给对应 reduce 节点
IntermediateFileRegion[] intermediateFileRegions;
}

容错

因为 MapReduce 框架借助几百或几千台机器来处理海量数据,所以必须优雅的应对机器异常。

工作节点异常

master 节点会周期性的对工作节点进行探活。如果某个工作节点在一段时间内无响应,则 master 节点会将该工作节点标记为异常。该工作节点完成的所有 map 任务的状态都会被重置为空闲,可重新被 master 节点调度到其他工作节点上执行。类似的,该工作节点所有进行中的 mapreduce 任务也都会被重置为空闲,并重新接受调度。

之所以这里已完成的 map 任务也需要重新执行是因为所产生的中间结果文件是保存在 map 节点的本地磁盘上,当该节点无响应时便认为无法与之连通从而认为无法通过 RPC 请求获取这些数据。而如果 reduce 节点异常,它所完成的 reduce 任务不需要重新执行是因为 reduce 节点执行成功后产生的输出文件是保存在全局的文件系统上。

如果某个 map 任务一开始由工作节点 A 执行,之后由工作节点 B 执行(因为节点 A 发生了异常),则所有执行 reduce 任务的节点都会被通知,其中所有要从节点 A 读取数据但还未读取的 reduce 节点会转而从节点 B 读取数据。

MapReduce 框架能从容应对大量的节点异常。例如,在某次 MapReduce 任务中,由于对运行中的集群进行网络维护一次性造成了80台机器在几分钟内无法连通。MapReduce 框架可直接重新分发和执行这些不连通的节点正在处理的任务,然后继续后续流程,并最终完成当次任务。

主节点异常

类似于游戏的自动存档,我们可以定期为主节点内部的数据结构保存检查点。如果主节点发生异常,则可以重新启动一个主节点程序并加载最新的检查点数据。然而对于单个主节点来说,主节点异常发生的概率较小,所以在 Google 的实现中,如果主节点发生异常,则会直接中断当次 MapReduce 任务。客户端可捕获到这种情况,并根据自身需要决定是否进行重试。

执行语义

如果用户编写的 mapreduce 函数是确定性的函数(即对于相同的输入始终返回相同的输出),则对于同一份输入,分布式的 MapReduce 框架的执行结果和一个串行执行且没有任何异常的 MapReduce 框架的执行结果相同。

不论是 map 还是 task 任务,都需要将执行结果写入到文件系统上,通过原子性的写入提交,可实现上述的语义保证。每个进行中的任务会先将输出结果写入到私有临时文件中,对 reduce 任务来说,最终只产生一个文件,而对于 map 任务则会产生 R 个文件(每个文件对应一个 reduce 任务)。当一个 map 任务执行完成时,map 节点会发送一条消息给 master 节点,这条消息中包含了 map 任务所生成的 R 个临时文件的名字。如果 master 节点收到了一条已经完成的 map 任务的消息,则会忽略该消息,否则将 R 个临时文件的名字保存在内部的数据结构中。

reduce 任务执行完成时,reduce 节点能原子性的将其生成的临时文件重命名为最终的输出文件。如果同一个 reduce 任务有多个工作节点执行(因为网络连通问题导致 master 重新分发 reduce 任务),则对同一个最终输出文件会有多个文件重命名的请求。通过底层文件系统的原子性重命名保证,最终的输出文件只会对应一个 reduce 任务的结果。

Google 内部大部分的 mapreduce 函数都是确定性的,在这种情况下分布式程序执行的结果和串行程序执行的结果相同的语义性保证使得开发人员能很容易的审视所编写的程序的行为(即如果程序的执行结果不符合预期,那么可以基本肯定的是开发人员编写的 map 或者 reduce 函数存在问题,而不是 MapReduce 框架存在问题)。当 map 或者 reduce 函数不具有确定性时,框架能提供稍弱一级但仍是合理的语义性保证。在非确定性的函数下,某个 reduce 任务 R1 由分布式执行的结果等价于一个串行执行的程序 A 执行 R1 后的结果。但是,另一个 reduce 任务 R2 的执行结果也可能等同于由另一个不同的串行执行的程序 B 执行后的结果。

假设有一个 map 任务 M,以及总共有两个 reduce 任务 R1R2,记 e(Ri) 表示 Ri 执行并提交成功的结果。以前面的单词统计为例,假设发送给 map 任务的只有两个文档:

1
2
3
4
5
1.txt:
It was the best of times

2.txt:
it was the worst of times

map 函数是非确定性的情况下,不妨这样实现 map 函数:

1
2
3
4
5
6
7
8
9
10
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
r = Random(0, 1)

if r > 0.5:
EmitIntermediate(w, "1");
else:
EmitIntermediate(w, "0");

即对于某个单词,map 函数有一半的概率计数为1,一半的概率计数为0。

类似的,以同样的手段来实现 reduce 函数,对于某个单词的所有出现次数,reduce 函数有一半的概率会计数,一半的概率会忽略:

1
2
3
4
5
6
7
8
9
10
11
12
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
r = Random(0, 1)

if r > 0.5:
result += ParseInt(v);
else:
result += 0;
Emit(AsString(result));

R1 为统计单词 it 的个数,经过 map 任务后,生成的中间结果键值对可能为以下四种情况:

  1. [0, 1]
  2. [1, 0]
  3. [1, 1]
  4. [0, 0]

最后由 reduce 任务执行后的结果可能为0、1、2三种情况,而相同的输入由一个串行执行的程序来执行也是同样的结果,即不管是分布式的程序还是串行的程序最终结果都是相同的集合,所以认为两者是等价的,也是合理的。

在确定性的函数下,相同的输入必然返回相同的输出,而在不确定性的函数下,不同的输入可能返回相同的输出或者相同的输入可能返回不同的输出。这就类似于知道 x 的定义域是 {1, 2, 3}y 值域是 {4, 5, 6},求 f(x),显然 f(x) 存在不止唯一的解。

记上述的 mapreduce 函数组成的串行程序为 A,假设有另一个串行程序 B,其中 map 函数不变,reduce 函数变为:

1
2
3
4
5
6
7
8
9
10
11
12
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
r = Random(0, 1)

if r > 0.5:
result += 0;
else:
result += ParseInt(v);
Emit(AsString(result));

R2 为统计单词 was 的个数,由 AB 执行的最终结果都等于集合 {0, 1, 2},相比于确定性的函数,开发人员因此无法有效的审视所编写函数的行为。

局部性

Google 内部的运行环境中网络带宽属于稀缺资源,不管是 map 还是 reduce 任务都依托于文件的读取,不可避免的会产生大量网络 IO。而在前面提到,Google 内部实现了一套分布式文件存储系统(GFS)来管理存储在集群内机器本地磁盘上的文件,对于每一个文件 GFS 会将其切分为若干个 64MB 的数据块,每个数据块存有多份冗余(一般是3份)保存在不同的机器上。对于 MapReduce 框架来说,原始的数据输入是保存在本地磁盘上的,依据这个特性,框架在分发 map 任务时,根据输入数据在 GFS 内的位置会优先选择本地磁盘上存有对应输入数据的工作节点。如果找不到这样的工作节点,则会选择一个距离输入数据最近的工作节点(例如工作节点和存有输入数据的节点由同一个交换机相连)。当运行大量的 MapReduce 任务时,大部分的输入数据都是从本地读取从而不消耗任何网络带宽。

任务粒度

如前文所述 MapReduce 框架会根据输入数据拆分为 Mmap 任务和 Rreduce 任务。理想情况下,MR 的值应该远大于工作节点的数量。为什么需要远大于?首先,MR 的数量比工作节点的数量少是不适合的,会造成资源空闲;其次,如果 MR 的数量和工作节点相等,由于每台机器的处理能力有差异或者输入数据不同,每个工作节点完成某个任务需要的时间也不同,会存在部分机器先完成任务的情况,同样会造成资源空闲,另一方面 MapReduce 面向的是大数据处理,输入数据的数据量远大于工作节点的数量,MR 数量较少的情况下单个工作节点需要处理的数据量则较大,单次处理时间可能较慢,而如果发生了异常,重新执行的成本也较高。所以 MR 的数量设置较大能更好的应对负载均衡,性能好的机器在完成任务后可以继续处理其他任务,同时当某个任务执行异常时也能更快的恢复:该异常的工作节点已完成的 map 任务可分发给余下的工作节点执行。

当然 MR 的数量也是存在上限的,对于 master 节点来说,它需要维护 Mmap 任务和 Rreduce 任务,时间复杂度是 O(M + R),另一方面每个 map 任务会产出 R 份中间结果数据,对应 Rreduce 任务,所以 master 节点也需要 O(M * R) 的空间复杂度来维护这样的对应关系(Google 实际实现时,每个 map/reduce 关系对约占据 1 byte 内存)。

另外,由于每个 reduce 任务的最终产出结果是一个单独的文件所以 R 的数量受用户设置限制。在实践中,会趋向于让每个 map 任务处理 16 MB64 MB 的输入数据来确定 M 的大小,因为 64 MB 正好是 GFS 单个数据块的大小,这样每个 map 任务必然能从本地磁盘读取输入数据而不会涉及网络 IO(如果能将任务分发给存有对应输入数据的节点的话),而 R 的数量会在工作节点的数量上乘上一个较小的常数得到。Google 内部运行 MapReduce 任务时通常设置 M 为200000,使用2000台机器的情况下设置 R 为5000。

后备任务

类似于木桶原理,一次 MapReduce 任务完成的时间取决于最慢的机器完成 mapreduce 任务的时间,这也是造成 MapReduce 任务耗时长的常见原因之一。某台机器执行慢可能有好几个原因造成,例如某台机器的磁盘存在异常,可能频繁遭遇可校正的异常,从而使得磁盘的读速度从 30 MB/s 降低到 1 MB/s。而调度系统同时有可能分配了其他的任务给这台机器,会进一步引发 CPU、内存、本地磁盘、网络带宽的竞争,从而造成执行 MapReduce 任务的耗时更长。Google 内部曾经遇到一个问题,由于机器初始化代码中的一个 bug 造成处理器的缓存被禁用,在这些受影响的机器上运行的任务耗时增长了超过100倍。

针对这个问题,Google 提出了一个通用的缓解机制。当一次 MapReduce 任务快执行结束时,框架会将剩余还在进行中的任务分配给其他机器执行。不管是原先分配的机器执行完成,还是新分配的机器执行完成,对应的任务都将标记为完成。让一个任务由两台机器同时执行势必存在资源浪费,Google 通过调优使得耗费的计算资源控制在了增加几个百分比以内。这个机制在处理一个数据量巨大的 MapReduce 任务时能大幅降低整体耗时。在某个约需处理 1T 数据的排序任务中,不启用这个机制的情况下整体耗时会增加44%。

改进

大多数情况下用户仅需编写 mapreduce 函数就能满足需求,本节主要描述一些 MapReduce 的扩展,可能在某些场合下会比较有用。

分片函数

用户可指定 MapReduce 任务最终输出文件的数量 R,也即 reduce 任务的数量。那么由 map 任务产生的中间结果数据应该发给哪个 reduce 节点执行呢?这个就交由分片函数决定,默认的分片函数是哈希函数(例如 hash(key) mod R),这种分片结果一般比较均匀。不过,有时候自定义分片函数会更有用,例如,当最终结果文件的键是 URL 时,我们希望同属于一个 host 下的 URL 对应的数据最终都在同一个文件里,用户可自定义分片函数来实现,例如 hash(Hostname(urlkey)) mod R,即先通过 urlkey 提取 host,然后对 host 计算哈希最后取模 R

顺序保证

MapReduce 框架保证在同一个中间结果分区内,即同一个 reduce 任务内,中间结果数据是按照键的升序处理的,因为 reduce 任务处理前会先将中间结果数据按照键进行排序。这样在 reduce 任务处理完成后,最终结果文件内的数据也是按照键的顺序排序的,这就有利于对最终结果文件按键进行高效的随机查找,或方便其他需要排好序的数据的场景。

合并函数

在某些场景下,map 任务产生的中间结果数据的键存在大量的重复,同时用户编写的 reduce 函数又符合交换律和结合律(即 a + b = b + a(a + b) + c = a + (b + c))。一个典型案例就是前文描述的单词计数程序,每个 map 任务都会产生成百上千的形如 <the, 1> 的中间结果数据,其中 the 指某个单词,1表示该单词出现的次数。这些同键的中间结果数据接着会经过网络传输发送给 reduce 任务,然后由 reduce 函数合并相加。为了减少这种雷同数据的网络传输,用户可编写自定义的合并函数,map 任务在生成中间结果数据前先进行同键的合并,从而将原来成百上千的同键网络传输降低为1次。

一般来说,合并函数和 reduce 函数的用户代码实现是相同的。不同在于 MapReduce 框架如何处理这两个函数产出的结果,reduce 函数的产出结果会写到最终的结果文件里,而合并函数的产出结果会写到中间结果文件里,然后发送给 reduce 任务。

在特定情况下,由于省去了大量的网络 IO,合并函数能显著的降低一次 MapReduce 任务执行的耗时。

输入和输出类型

MapReduce 框架支持从多个数据格式读取输入数据。例如,text 模式下将输入数据的每一行作为键值对,其中键通过在文本中的偏移量来确定,而值就是当前行的内容。另一种通用支持的格式是本身保存了已排好序的键值对。不管是哪种输入格式,MapReduce 都能从原始输入中准确切分出键值对供 map 任务使用(例如 text 模式保证以每一行的结束进行切分)。用户也可实现自定义的 reader 接口来支持读取新的输入格式,不过大部分情况下内置的输入格式已经能满足需求。

虽然前文描述过 MapReduce 的原始输入数据来源于文本文件,不过用户自定义的 reader 接口并不一定要从文本文件读取,例如还可以从数据库或内存中读取。

类似的,MapReduce 框架也支持不同的最终输出数据的格式,用户也同样可实现支持自定义的输出格式。

副作用

在某些情况下,用户可能希望在 mapreduce 阶段生成额外的辅助文件,这就要求开发人员自己保证输出文件的原子性和幂等性,特别是用户程序先将数据写入到临时文件内,最后在所有数据写入完成后能原子性的将临时文件重命名。

不过,MapReduce 框架本身并不支持两阶段协议来保证 mapreduce 任务输出多个文件时的一致性,同样的,这也需要开发人员自己来保证。因此多文件一致性对应的任务应当是确定性的,否则如何确定产出的文件是符合一致性的?而在实践中要求任务是确定性的并不是个问题。

忽略异常数据

有时候由于用户编写的 mapreduce 函数存在 bug,导致处理某些数据时 mapreduce 函数必然发生异常,这就造成 MapReduce 任务无法正常完成。正常来说应当修复 bug,但有时候不可行,例如造成 bug 的代码可能是第三方库引入的。另一方面,有时候忽略这些造成异常的数据也是可以接受的,例如在对一个数据量非常庞大的数据集做统计分析时。因此,MapReduce 框架提供了一种可选的执行模式,当其检测到某些输入数据必然造成异常时,则会跳过这些数据从而使得执行流程能继续走下去。

为了实现这个功能,首先每个工作节点上都安装了一个 signal handler 程序用于捕获段异常和总线异常。在执行 mapreduce 任务之前,MapReduce 框架首先将当前任务需要的输入数据所对应的序号保存在工作节点内的一个全局变量中,在执行 mapreduce 任务时,如果用户代码发生异常,此时 signal handler 能捕获到相应的异常信号,然后 signal handler 会发送一个 UDP 数据包给主节点,该数据包中包含了执行当次任务的输入数据序号。如果主节点发现某个数据对应的任务执行失败了多次,则会忽略该数据而不是重新执行 mapreduce 任务。按照这样的描述,被忽略的数据是数据片维度,而不是键值对维度,因为每片的数据块大小相比于总数据量的大小来说微乎其微,所以整体影响不大。

本地执行

调试分布式程序并不是件简单的事,对于 MapReduce 任务来说,一次任务会被分发到几千台机器上执行,每台机器实际执行的任务也无法预测。为了方便调试、性能分析和小规模测试,Google 实现的 MapReduce 框架也提供了一个串行执行的版本,能在单台机器上串行执行所有任务。同时,用户也可通过参数控制一次 MapReduce 任务只执行些特定的 map 任务。通过在启动程序时指定调试参数,用户就可轻松的使用调试或测试工具(如 gdb)对编写的程序进行调试和测试。

状态信息

主节点内部同时运行了一个 HTTP 服务,用于提供给用户查看一系列状态信息。状态信息页面展示了当前任务的进度,例如有多少个任务已经完成,有多少个任务正在进行中,输入数据的大小,中间结果数据的大小,最终结果数据的大小,任务处理百分比等。同时,状态页面也提供了每个任务执行产生的标准错误输出和标准输出文件。用户可根据这些信息来预测任务需要多久才能完成,以及是否需要添加更多的计算资源。状态页面也可用于判断当前任务执行耗时是否比预期的长。

此外,状态页面也显示了失败的工作节点,以及这些失败的工作节点对应的 mapreduce 任务。这有助于用户排查编写的代码中是否有 bug

计数器

MapReduce 框架内部提供了一个计数器用于统计各个事件发生的次数。例如,用户可能希望统计一次任务中一共处理了多少个单词,或者有多少个德语文档建立了索引。

如果要开启这个功能,用户需要编写一个命名计数器,然后在 mapreduce 函数中在需要的时候对计数器自增,例如:

1
2
3
4
5
6
7
8
Counter* uppercase;
uppercase = GetCounter("uppercase);

map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppsercase->Increment();
EmitIntermediate(w, "1");

每个工作节点上的计数器的值会周期性的发送给主节点(如前文所述,主节点会周期性的对工作节点进行心跳探测,工作节点会在响应结果中带上计数器的值)。主节点会对执行成功的 mapreduce 任务返回的计数器聚合,当整个 MapReduce 任务完成将控制权交还给用户代码时,用户代码可获取到创建的计数器的值。当前的计数器的值也同样会展示在状态页面,用户也可根据此信息来观测整个任务的进展。在对计数器聚合时,和主节点会忽略已完成的某个任务的重复通知一样,主节点同样会忽略某个来自已完成任务的计数器更新,从而避免重复计数(任务的重复执行主要有两种情况,一种是由于网络不连通,导致主节点重新分配某个 mapreduce 任务到新的工作节点上;另一种是触发了后备任务,主节点主动分发同一个 mapreduce 任务给多个工作节点执行)。

MapRecue 框架本身也维护了一些计数器,例如已处理的输入数据键值对的数量,以及已生成的最终数据键值对的数量。

用户能很方便的通过计数器来检查 MapReduce 任务的行为。例如,在任务执行时用户可通过计数器来确保输出的键值对数量是否等于输入的键值对数量,或者已处理的德语文档的数量在全部已处理的文档数量中的占比是否符合预期。

性能

这一节主要描述 MapReduceGoogle 内部环境下运行的性能情况,这里不再赘述。简单举例来说,在1800台机器上执行一个 10T 数据量的分布式 grep 搜索耗时约150秒。

总结

最后,来自 Google 的总结:

  1. 限制性的编程模型使得计算并行化变得容易,以及有着较好的容错性,这也体现了计算机领域的一个重要思想:抽象
  2. 对于大型系统来说,网络 IO 容易成为瓶颈
  3. 冗余执行可以作为有效降低成为性能短板的机器带来的影响的手段,另外冗余也是应对机器异常、数据丢失的方式

参考

往一个二叉搜索树中插入一个节点后的结果并不唯一,例如对于下面的二叉搜索树:

alt

如果要插入节点2,可以将2作为3的左子节点:

alt

或者将2作为1的右子节点:

alt

对于第一种方法,类似于往单链表的中间插入节点,既要更新前继节点的 next 指针,又要将新的节点的 next 指针指向下一个节点;而对于第二种方法,只需要将新节点挂载到目标节点的左子节点或右子节点即可,实现上较为简洁,可分为非递归和递归两种解法。

非递归

整体算法分为两步:

  1. 找到要挂载的叶子节点
  2. 将新节点挂载到该叶子节点的左子节点或右子节点上

第一步等同于二叉搜索树的查找,从根节点开始,将目标值和当前节点的值进行比较,如果当前节点的值比目标值小,说明要找的节点在右子树中,移动到右子节点中查找;如果当前节点的值比目标值大,说明要找的节点在左子树中,移动到左子节点中查找。

找到目标叶子节点后,比较该叶子节点的值和目标值的大小,来决定新节点是作为左子节点还是右子节点插入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class TreeNode:
def __init__(self, val=0, left=None, right=None):
self.val = val
self.left = left
self.right = right


class Solution:
def insertIntoBST(self, root: TreeNode, val: int) -> TreeNode:
if not root:
return TreeNode(val)

prev, current = None, root

while current:
prev = current

if current.val < val:
current = current.right
else:
current = current.left

if prev.val > val:
prev.left = TreeNode(val)
else:
prev.right = TreeNode(val)

return root

递归

一般的二叉树问题的递归解法遵循如下的模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def dfs(root):
if not root:
# 处理终止条件

# 一种情况是左右子树只处理一边
if some condition:
dfs(root.left)
else:
dfs(root.right)

# 另一种情况是左右子树都处理
dfs(root.left)
dfs(root.right)

return something

在当前的问题下,终止条件发生的条件为找到了目标叶子节点,此时需要新建一个节点;而对于递归的处理,这里适用于第一种情况,即左右子树只处理一边,判断条件为比较当前节点的值和目标值的大小,所以可以粗略的构造出程序的框架:

1
2
3
4
5
6
7
8
9
10
def dfs(root, val):
if not root:
create new node with val

if root.val > val:
dfs(root.left)
else:
dfs(root.right)

return something

下一个问题是,这个递归函数的返回值是什么?从终止条件的处理可以看到递归函数返回的是某个节点,联想到往一个二叉搜索树中插入一个节点后需要返回一个新的树,所以这里递归函数的返回值应该是根节点。

然而还缺少一步,就是新节点的挂载,目前新节点返回后并没有任何节点引用它,需要在终止条件的上层调用中处理,即每次递归调用时都重新赋值左子树或右子树的根节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class TreeNode:
def __init__(self, val=0, left=None, right=None):
self.val = val
self.left = left
self.right = right


class Solution:
def insertIntoBST(self, root: TreeNode, val: int) -> TreeNode:
if not root:
return TreeNode(val)

if root.val < val:
root.right = self.insertIntoBST(root.right, val)
else:
root.left = self.insertIntoBST(root.left, val)

return root
0%