func(a *App) RPC(args interface{}, reply interface{}) { // ... a.mutex.Lock() i := a.raft.Start(args) // update some data structure so that apply knows to poke us later a.mutex.Unlock() // wait for apply to poke us return }
func(r *Raft) Start(cmd interface{}) int { r.mutex.Lock() // do things to start agreement on this new command // store index in the log where cmd was placed r.mutex.Unlock() return index }
func(a *App) apply(index int, cmd interface{}) { a.mutex.Lock() switch cmd := cmd.(type) { case GetArgs: // do the get // see who was listening for this index // poke them all with the result of the operation // ... } a.mutex.Unlock() }
强一致性保证经常被认为和高吞吐、高可用是冲突的。系统设计者一般不会牺牲系统的吞吐或者可用性,而是牺牲强一致性保证。The Google File System(GFS)就体现了这样的思想。实际上,大型存储服务的强一致性保证和高吞吐、高可用并不是冲突的。本文介绍的 chain replication 方式在对 fail-stop 类型异常容错的同时,能同时支持高吞吐,高可用和强一致性。
存储服务接口
客户端会向存储系统发起查询和更新操作的请求。虽然能做到每一个到达存储服务的请求都能保证被执行,不过在论文 End-to-end arguments in system design 中提到这样做意义不大。存储服务可以简单的只处理能到达的请求,并在请求完成时响应客户端,这样就不用区分对待请求丢失和响应丢失这两种情况:客户端可以在一段时间没有收到响应后重新发起请求。
中间节点 S 异常时,系统会从链中移除节点 S。主节点会首先通知 S 的后继节点 S+ 和 S 的前继节点 S−,告诉它们新的前继和后继节点。不过这有可能会违反 Update Propagation Invariant,因为 S 收到的某些请求可能还没有转发给 S+(这些请求必然在任一 S 节点前面的节点 i 的 HistobjIDi 中)。最适合将这些请求发送给 S+ 的就是 S−,不过需要些额外的协作。
记 U 表示一个请求集合,<U 表示该集合中所有请求的顺序。如果下述条件满足,则认为请求序列 Mr 和 (U,<U) 一致:
每个节点 i 维护了一个已转发给后继节点但可能还没有被尾节点处理的请求列表,记 Senti。对 Senti 的增删非常简单:每当节点 i 将某个请求 r 转发给后继节点时,就将 r 加入 Senti;当尾节点处理完某个请求 r 时,它就会给前继节点发送一个 ack(r) 请求,表示请求 r 已处理完毕。当前继节点收到 ack(r) 请求时,就将 r 从 Senti 中移除,并同样给它的前继节点发送一个 ack(r) 请求。
系统会将异常的节点从链中移除。不过链越短则能容忍的节点异常也就越少,最终由于节点数过少从而影响了对象存储的可用性。解决方法就是当链的长度减少到一定程度时,向链中增加新的节点。鉴于节点异常的概率不是很高,以及往链中添加一个节点不需要太长时间,链的长度基本能维持在期望的 t 个节点的水平(因此 t - 1 个节点都发生了异常才会造成可用性问题)。
一种实现协同的方式是为每一个不同的协同需求开发一个服务。例如,Amazon Simple Queue Service 专注于消息队列。同时也存在专门为了选主和配置所开发的服务。针对较强的原语开发的服务能够用于实现较弱一级的原语。例如,Chubby 是一个强同步性保证的锁服务。则可以借助锁来实现选主,群组成员关系等服务。
相较于在服务端实现特定的协同原语,ZooKeeper 的作者选择暴露某些 API 来让应用开发者自行实现需要的原语。这种设计选择需要实现一个协同内核(coordination kernel)使得新原语的开发不需要修改核心服务端代码。这种方式能够适配应用程序对不同协同形式的需求,而不是让开发者受限于某几个固定的原语。
在设计 ZooKeeper 的 API 时,设计者移除了阻塞原语,例如锁。一个协同服务的阻塞原语会导致某些问题,缓慢或出错的客户端会拖慢快速的客户端的性能。如果服务处理请求时需要依赖响应以及负责客户端的异常检测,那么服务的实现会变得更为复杂。因此,ZooKeeper 实现了一套 API 能够操作以类似文件系统的方式组织的无等待(wait-free)对象。实际上,ZooKeeper 的 API 类似于其他任何的文件系统,以及和去除了加锁(lock),打开(open),关闭(close)这些方法的 Chubby 类似。实现了无等待对象的 ZooKeeper 显著有别于其他基于阻塞原语(例如锁)的系统。
虽然无等待这一特性对于性能和容错很重要,但是对于协同来说来不够。ZooKeeper 还需要对各操作提供其他保证。对客户端 FIFO 的操作保证和线性化写入的保证确保了服务的高效实现,同时也能够满足应用程序实现自定义协同原语的需求。实际上,利用 ZooKeeper 的 API 可以实现任意节点数量的共识算法。
不过上述方案还存在一个问题:如果某个工作节点此时看见 ready 节点存在,但是同时主节点删除了 ready 节点然后开始更新配置参数,那么工作节点就会读取到正在更新的配置参数。这个问题通过监听通知的顺序性保证来解决:如果客户端对某个节点 A 开启了监听,此时系统先对节点 A 进行了修改,然后对另一个节点 B 进行了修改,此时客户端发起了对节点 B 的读请求,那么 ZooKeeper 会保证客户端先收到节点 A 修改的异步通知。所以,如果客户端在判断 ready 节点是否存在时开启了监听,那么它就会在读取到修改中的配置参数前先收到 ready 节点修改的通知,从而可以中断配置参数的读取。
如果客户端之间还有除了 ZooKeeper 之外的通信方式也会引发另一个问题。例如,两个客户端 A 和 B 通过 ZooKeeper 共享配置,然后通过其他某种方式通信。如果 A 修改了 ZooKeeper 中的配置然后告诉 B,那么 B 收到通知后读取 ZooKeeper 就期望能获取到修改后的配置。不过如果 B 连接的 ZooKeeper 副本落后于主节点,那么 B 可能无法读取到最新的配置。而采用写入 ready 节点再读取的方式能保证 B 读取到最新的配置。ZooKeeper 提供了 sync 方法来更高效的解决这个问题:如果 sync 请求之后有一个读请求,则 ZooKeeper 会暂缓这个读请求。sync 会同步在这之前进行中的写请求,而无需等待当前所有的待写入操作完成。这个原语类似于 ISIS 中的 flush 原语。
首先定义节点 l 来实现锁。然后,将所有希望获取锁的客户端按照请求顺序排序,之后这些客户端就能按照请求的顺序获取锁。客户端希望获取锁时需要执行下面的操作:
1 2 3 4 5 6 7 8 9 10
Lock 1 n = create(l + "/lock-", EPHEMERAL|SEQUENTIAL) 2 C = getChildren(l, false) 3 if n is lowest znode in C, exit 4 p = znode in C ordered just before n 5 if exists(p, true) wait for watch event 6 goto 2
Unlock 1 delete(n)
第一行 SEQUENTIAL 的标记用来将所有希望获取锁的客户端进行排序。每个客户端首先在节点 l 下创建一个临时顺序的子节点,然后获取 l 的所有子节点。之后在第三行判断自己创建的节点是否在所有子节点中有着最小的序号,如果是,则表示当前客户端获得了锁。如果不是,说明有其他序号更小的子节点存在,当前客户端需要排在这之后获取锁。然后客户端会尝试判断排在当前序号前的子节点是否存在,如果存在则设置监听状态等待前一个节点删除的通知,如果不存在,则继续回到第二行执行。每个客户端只监听排在自己前面的子节点避免了羊群效应,因为任何一个子节点删除的通知只会发给其中的一个客户端。每当客户端收到前面节点删除的通知时,需要再次获取 l 的所有子节点来判断自己是否是最小子节点(因为排在前面的子节点并不一定持有锁,可能是更前面的子节点持有锁。这里能否直接复用第一次请求 getChildren 的信息?实现起来会较麻烦些,因为需要挨个判断排在前面的子节点是否还存在,不如直接拉取一份最新的子节点信息)。
Write Lock 1 n = create(l + "/write-", EPHEMERAL|SEQUENTIAL) 2 C = getChildren(l, false) 3 if n is lowest znode in C, exit 4 p = znode in C ordered just before n 5 if exists(p, true) wait for watch event 6 goto 2
Read Lock 1 n = create(l + "/read-", EPHEMERAL|SEQUENTIAL) 2 C = getChildren(l, false) 3 if no write znodes lower than n in C, exit 4 p = write znode in C ordered just before n 5 if exists(p, true) wait for watch event 6 goto 3
funcPrettyDebug(topic logTopic, format string, a ...interface{}) { if debugVerbosity >= 1 { t := time.Since(debugStart).Microseconds() t /= 100 prefix := fmt.Sprintf("%06d %v ", t, string(topic)) format = prefix + format log.Printf(format, a...) } }
POTENTIAL DEADLOCK: Previous place where the lock was grabbed goroutine 240 lock 0xc820160440 inmem.go:799 bttest.(*table).gc { t.mu.RLock() } <<<<< inmem_test.go:125 bttest.TestConcurrentMutationsReadModifyAndGC.func5 { tbl.gc() }
Have been trying to lock it again for more than 40ms goroutine 68 lock 0xc820160440 inmem.go:785 bttest.(*table).mutableRow { t.mu.Lock() } <<<<< inmem.go:428 bttest.(*server).MutateRow { r := tbl.mutableRow(string(req.RowKey)) } inmem_test.go:111 bttest.TestConcurrentMutationsReadModifyAndGC.func3 { s.MutateRow(ctx, req) }
Here is what goroutine 240 doing now goroutine 240 [select]: github.com/sasha-s/go-deadlock.lock(0xc82028ca10, 0x5189e0, 0xc82013a9b0) /Users/sasha/go/src/github.com/sasha-s/go-deadlock/deadlock.go:163 +0x1640 github.com/sasha-s/go-deadlock.(*Mutex).Lock(0xc82013a9b0) /Users/sasha/go/src/github.com/sasha-s/go-deadlock/deadlock.go:54 +0x86 google.golang.org/cloud/bigtable/bttest.(*table).gc(0xc820160440) /Users/sasha/go/src/google.golang.org/cloud/bigtable/bttest/inmem.go:814 +0x28d google.golang.org/cloud/bigtable/bttest.TestConcurrentMutationsReadModifyAndGC.func5(0xc82015c760, 0xc820160440) /Users/sasha/go/src/google.golang.org/cloud/bigtable/bttest/inmem_test.go:125 +0x48 created by google.golang.org/cloud/bigtable/bttest.TestConcurrentMutationsReadModifyAndGC /Users/sasha/go/src/google.golang.org/cloud/bigtable/bttest/inmem_test.go:126 +0xb6f
在上图中,方块中的数字表示任期。当前已提交的日志的索引位置是1-9,一共有四台机器在1-9位置的日志相同,符合过半数的原则。a 和 b 相比于主节点来说缺少日志,c 和 d 相比于主节点来说有多余的日志,e 和 f 两种情况都有。对于 f 来说,它在任期2中被选为主节点,然后开始接受客户端请求并写入本地日志,但是还没有成功复制到其他从节点上就异常了,恢复后又被选举为任期3的主节点,又重复了类似的操作。
在上图中,集群由原来3台机器扩展为5台,由于每台服务器实际替换配置文件的时机不同,如红色箭头所示,存在某一时刻集群中可能会有两个主节点,假设此时发生选主,由于在 Server 1 看来集群中的节点数量还是3个,所以它只要获取到 Server 2 的选票就可以声明自己为主节点;而在 Server 5 看来,此时集群中有5个节点,所以它在获取了 Server 3、Server 4 的选票后就成为主节点,此时集群中就存在了两个主节点。
上图展示了 Raft 快照的概念。每台服务器会独立的执行快照,并且只包含已提交的日志。执行快照时大部分的工作是状态机将当前的状态写入到快照中。Raft 同时添加了一小部分元数据到快照中:快照对应的最后一个日志的索引(last included index,状态机已应用的最后一条日志),以及最后一个日志对应的任期(last included term)。这两个元数据信息用于执行快照后的第一次 AppendEntries 请求的一致性检查,因为需要比对前一个日志的索引和任期。同时为了启用集群配置变更,快照中也保存了当前最新的集群配置。一旦服务器完成了快照的写入,那么它就可以删除到快照为止的所有日志,以及以前的快照。
第二个性能问题是一次写快照可能会花费较长的时间,所以需要它不能延误正常操作。解决方法是使用写时复制技术(copy-on-write),这样节点也能同时接受新的更新而不会影响快照。例如,由函数式数据结构组成的状态机天然的支持写时复制。或者操作系统的写时复制支持(例如 Linux 的 fork)可以用来在内存中创建一份整个状态机的快照(Raft 的实现采用了这种方式)。
另一种只需要耗费少量带宽的方式是状态机(state machine)同步。该方法将主从同步抽象为确定性状态机(deterministic state machine)同步问题,在确定性状态机模型下,对于两个初始状态一样的状态机来说,按照相同的顺序执行相同的一系列输入指令后,最后的状态也一定是相同的。然而,对于大部分的服务来说,存在某些非确定性的操作,例如生成一个随机数,这时候就需要额外的协调使得主从间依然是同步的,即从节点也要生成一模一样的随机数。不过,处理这种情况所需要维护的额外信息相比于主节点状态的修改(主要是内存的修改)来说不值一提。
通过主虚拟机的延迟输出,保证从虚拟机确认收到了所有日志后,主虚拟机才将输出返回给客户端来实现 Output Requirement。一个先决的条件是主虚拟机在执行输出操作前,从虚拟机必须已经收到所有的日志。这些日志能保证从虚拟机执行到主虚拟机最新的执行点。然而,假设当主虚拟机刚开始执行输出操作时发生了异常,此时发生了主从切换,从虚拟机必须先将未处理完的日志进行重放,然后才能 go live(不再执行重放,接管成为主虚拟机)。如果在这之前从虚拟机 go live,可能会有一些非确定性的事件(例如计时器中断)在从虚拟机执行输出操作前改变了执行的路径。
上图展示了 FT 协议的要求。主虚拟机到从虚拟机的箭头表示日志的发送,从虚拟机到主虚拟机的箭头表示日志的确认。所有异步事件,输入和输出的操作都必须发送给从虚拟机并得到确认。只有当从虚拟机确认了某条输出操作的日志后,主虚拟机才能执行输出操作。所以只要遵循了 Output Rule,从虚拟机在接管执行时就能保持和主虚拟机一致的状态。
不过在异常发生时,VMware FT 并不能保证所有的输出只发送一次。在缺少两阶段提交的帮助下,从虚拟机不能知晓主虚拟机在发送某条输出之前还是之后发生了异常。不过,网络协议(包括常见的 TCP 协议)在设计时就已经考虑了包的丢失和重复包的情况,所以这里无需特殊处理。另外,在主虚拟机异常时发送给主虚拟机的输入也有可能丢失,因此从虚拟机也会丢失这部分的输入。不过,即使在主虚拟机没有异常的情况下,网络包本身就有可能丢失,所以这里同样也不需要特殊处理,不管是网络协议、操作系统还是应用程序,在设计和编写时本身已经考虑到了包丢失的情况。
监测和响应异常
之前提到过,当主虚拟机或者从虚拟机发生异常时,双方都必须能快速响应。当从虚拟机异常时,主虚拟机会进入 go live 模式,即不再记录执行日志,以常规的方式执行。当主虚拟机异常时,从虚拟机也会进入 go live 模式,不过相比于主虚拟机略微复杂些。因为从虚拟机在执行上本身就落后于主虚拟机,在主虚拟机异常时,从虚拟机已经收到和确认了一部分执行日志,但是还没有执行重放,此时从虚拟机的状态和主虚拟机还是不一致的。所以,从虚拟机必须先将暂存的日志进行重放,当所有重放都执行完成后,从虚拟机就会进行 go live 模式,正式接管主虚拟机(此时缺少一个从虚拟机)。因为此时这台从虚拟机不再是从虚拟机,被虚拟化的操作系统所执行的输出操作都会发送给客户端。在这个转换期间,可能也需要某些设备执行一些特定的操作来保证后续输出的正确性。特别是对于网络输出来说,VMware FT 会自动的将新的主虚拟机的 MAC 地址在网络中广播,使得物理交换机知道最新的主虚拟机的地址。另外,后文会提到新的主虚拟机可能会重新发起一些磁盘 IO 操作。
有很多种方式来监测主虚拟机和从虚拟机的异常。VMware FT 通过 UDP 心跳来监测启用了容错的虚拟机是否发生了异常。另外,VMware FT 还会监控 logging channel 中的流量,包括主虚拟机发送给从虚拟机的日志,以及从虚拟机的消息确认回执。因为操作系统本身存在时钟中断,所以理论上来说 logging channel 中的流量应该是连续不断的。因此,如果监测到 logging channel 中没有流量了,那么就可以推断出某台虚拟机发生了异常。如果没有心跳或者 logging channel 中没有流量超过一段指定的时间(近似几秒钟),那么系统就会声明这台虚拟机发生了异常。
然而,这种异常监测机制可能会引发脑裂问题。如果从虚拟机不再接收到来自主虚拟机的心跳,那么有可能说明主虚拟机发生了异常,但也有可能只是双方间的网络断开。如果此时从虚拟机进入 go live 模式,由于此时主虚拟机依然存活,就有可能给客户端造成数据损坏或其他问题。因此,当监测到异常时必须保证只有一台主虚拟机或者从虚拟机进入 go live 模式。为了解决脑裂问题,VMware FT 借助了虚拟机所连接的共享存储。当主虚拟机或者从虚拟机希望进入 go live 模式时,它会向共享存储发起一个原子性的 test-and-set 操作。如果操作成功,那么当前虚拟机可以进入 go live 模式,如果操作失败,说明已经有其他虚拟机先进入了 go live 模式,所以当前虚拟机就将自己挂起。如果虚拟机访问共享存储失败,那么它会一直等待直到访问成功。如果共享存储由于网络问题造成无法访问,那么虚拟机本身也做不了什么因为它的虚拟磁盘就挂载在共享存储上。所以使用共享存储来解决脑裂问题不会带来其他可用性问题。
最后一个设计的点是如果虚拟机发生了异常,使得某台虚拟机进入了 go live 模式,那么 VMware FT 会自动在其他物理机上启动一台新的备份虚拟机。
FT 的实际实现
上节主要描述了 FT 的基础设计和协议。然而,为了构建一个可用,健壮和自动化的系统,还需要设计和实现很多其他的组件。
其中一个原因造成主虚拟机的日志缓冲区写满是因为从虚拟机执行的太慢从而造成消费日志太慢。一般来说,从虚拟机必须以和主虚拟机记录执行日志一样的速度来执行重放。幸运的是,在 VMware FT 的实现下,记录执行日志和重放所需要的时间基本是相同的。不过,如果从虚拟机所在的机器存在和其他虚拟机资源竞争(资源超卖),不管 hypervisor 的虚拟机调度多么高效,从虚拟机都有可能得不到足够的 CPU 和内存资源来保证和主虚拟机一样的速度执行重放。
除了主虚拟机日志缓冲区满造成的不可控暂停外,还有一个原因也要求主从虚拟机间的状态不能差太远。当主虚拟机异常时,从虚拟机必须尽快的将所有的执行日志进行重放,达到和主虚拟机一样的状态,然后接管主虚拟机向客户端提供服务。结束重放的时间基本上等于异常发生时从虚拟机落后主虚拟机的时间,所以从虚拟机进入 go live 模式需要的时间就基本上等于检测出异常的时间加上当前从虚拟机落后的时间。因此,从虚拟机不能落后主虚拟机太多(大于一秒),否则这会大大增加故障切换的时间。
因此,VMware FT 有另一套机制来保证从虚拟机不会落后主虚拟机太多。在主从虚拟机间的通信协议里,还会发送额外的信息来计算两者间的执行时间差。一般来说这个时间差在100毫秒以内。如果从虚拟机开始明显落后主虚拟机(例如大于1秒),那么 VMware FT 会通知调度器降低主虚拟机的 CPU 资源配额(初始减少几个百分点)来延缓主虚拟机的执行。VMware FT 会根据从虚拟机返回的落后时间来不断调整主虚拟机的 CPU 资源配额,如果从虚拟机一直落后,那么 VMware FT 会逐渐减少主虚拟机的 CPU 资源配额。相反的,如果从虚拟机开始赶上了主虚拟机的执行速度,那么 VMware FT 会逐渐增加主虚拟机的 CPU 资源配额,直到两者的执行时间差到达一个合理的值。
不过在实际场景中减慢主虚拟机执行的速度非常少见,一般只会发生在系统承受极大负载的情况下。
FT 虚拟机上的操作
另一个实际中要考虑的问题是处理针对主虚拟机的一系列控制操作。例如,如果主虚拟机主动关机了,从虚拟机也需要同步关机,而不是进入 go live 模式。另外,所有对主虚拟机的资源修改(例如增加 CPU 资源配额)都必须应用到从虚拟机上。针对这些操作,系统会将其转化为特殊的执行日志发送到 logging channel,从虚拟机收到后也会将其正确的重放。
一般来说,大部分对虚拟机的操作都只应该在主虚拟机上发起。然后 VMware FT 会将其转化为日志发送给从虚拟机来进行重放。唯一可以独立的在主虚拟机和从虚拟机上执行的操作是 VMotion。即主虚拟机和从虚拟机都可以独立的被复制到其他机器上。VMware FT 保证了在复制虚拟机时不会将其复制到一台已经运行了其他虚拟机的机器上,因为这无法提供有效的容错保证。
在非共享磁盘的场景下,系统就不能借助共享存储来解决脑裂问题。在这种场景下,系统可借助其他的外部组件,例如某个主从虚拟机都可以连接的第三方服务器。如果主从服务器属于某个多于两个节点的集群,那么就可以使用某个选举算法来选择谁能进入 go live 模式。在这种场景下,如果某台虚拟机获得了大多数节点的投票,那么它就可以进入 go live 模式。
Posted onWord count in article: 23kReading time ≈38 mins.
介绍
在 MapReduce: Simplified Data Processing on Large Clusters 中提到,MapReduce 任务的输入输出构建在 GFS 之上,GFS 是 Google 内部开发的一个分布式文件系统,用于应对大型的数据密集型应用。在 GFS 之前,业界已经存在了一些分布式文件系统的实现,为什么 Google 还要再实现一套?因为基于 Google 内部应用的特点,有别于传统的分布式文件系统,除了考虑性能、可扩展性、可靠性和可用性之外,GFS 在设计时还考虑了以下三个方面:
当主节点崩溃重启后会通过重放操作日志来恢复崩溃前的状态,然而如果每次都从第一条日志开始重放,主节点崩溃重启到可用需要的时间会越来越久,因此当操作日志的大小增长到一定程度的时候,主节点会为当前的元数据创建一个检查点,当主节点崩溃恢复后,可以先加载最新的检查点数据,然后再重放在这个检查点之后生成的操作日志。检查点是一个类似于 B 树的数据结构,可以轻易的映射到内存数据结构中,并且方便根据文件的命名空间检索。这就加快了主节点崩溃恢复的速度和提高了系统的可用性。
追加写在 Google 的分布式应用中使用的非常频繁,多个客户端会并发的对同一个文件追加写入。在传统的随机写操作下,客户端需要额外的同步机制例如实现一个分布式锁来保证写入的线程安全性。在 Google 的应用场景下,文件一般是作为一个多生产者/单消费者的缓冲队列或者包含了多客户端合并后的结果,所以追加写已经能满足需求。