Students’ Guide to RaftMIT 6.824: Distributed Systems 之前的助教写给学生看的实验生存指南。

实现 Raft

论文中的 Figure 2Figure 13 描述了实现 Raft 的主要接口,以及各节点需要维护的状态和响应接口时的行为逻辑,所以在做 Lab 2 之前需要先充分理解这部分的内容。文中的建议是必须严格落实图中的每一句话,而不是仅仅实现了功能,否则就有可能遇到奇怪的问题或不正确的系统行为。

文中举了三个例子。第一,每次收到 AppendEntriesRequestVote 请求就重置 election timeout,因为收到这两个请求说明存在主节点或者正在选主。不过,图中还有描述:

If election timeout elapses without receiving AppendEntries RPC from current leader or granting vote to candidate: convert to candidate.

重置 election timeout 有两个关键条件:AppendEntries 要求发送请求的主节点是当前任期的主节点;RequestVote 要求成功投出选票。

先看第一个条件,假设当前任期的主节点异常,此时还有一个之前任期的主节点存活在不断发送 AppendEntries 请求,假设此时某个从节点马上要进入选主状态,但由于收到了 AppendEntries 请求又没有校验任期就重置了 election timeout,造成无主状态时间延长。这个场景下如果过期的主节点不实现下面的逻辑杀伤力倍增:

If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower

如果不实现上面的逻辑,那么过期的主节点会一直是主节点,然后一直重置其他本来可以成为候选节点的从节点。

再来看第二个条件,假设当前任期的主节点异常,各从节点准备进入选主状态,此时一个不可能成为主节点的从节点先变成了候选节点(该节点的日志相对于其他任何一个从节点来说都不够新,即最后一条日志的任期不够大,或者任期相同但是日志条数不够多),RequestVote 永远不会成功,如果各从节点一收到 RequestVote 就重置 election timeout,就会造成始终没有主节点的情况。

第二个例子,从节点收到心跳 AppendEntries 后就直接返回 success = true 和重置 election timeout。重置 election timeout 的危害上面已经说过,充当心跳的 AppendEntries 是不带日志的请求,文中提到学生在实现时容易将心跳 AppendEntries 特殊对待,而不进行 prevLogIndexprevLogTerm 的校验。从节点返回 true 时主节点就会认为从节点的日志和自己匹配,从而错误的认为某些日志已经复制到了从节点上,从而可以提交日志。不过看到这里可能会有疑问,为什么主节点会在心跳响应中提交日志?通过心跳更新 nextIndexmatchIndex 是合理的,如果过半数节点已复制的日志是之前任期的,论文中有描述这是不允许提交的;如果过半数节点已复制的日志是当前任期的,那么可能是之前带日志的 AppendEntries 请求中从节点实际已完成了日志的复制,但是主节点没有收到响应,所以在最新的心跳中提交日志。

引申开来,假设系统中此时 x = 1,有个客户端发送了 write x = 2 的请求,主节点成功将日志复制到了过半数的节点上,但是还没有响应客户端就异常了。系统重新选主后,此时有个客户端发送请求 read xx 应该返回多少?应该是2,因为在 Raft 层面,一条日志是否已提交只取决于是否在当前任期内复制到了过半数的节点上,而不是取决于客户端是否收到响应,而只要日志提交了就可以被应用到状态机。类似的场景可以想象一下平时填了某个表单,提交时提示系统异常,但是刷新页面后表单信息已更新。

第三个例子,学生在实现 AppendEntries 时,可能将 prevLogIndex 之后的日志都替换为请求中的日志。同样的,图中也有描述:

If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it.

因为当前的 AppendEntries 可能是个过期的请求,假设请求中的日志只是当前从节点已提交的日志的某一段子集,那么如果从节点将 prevLogIndex 之后的日志都替换为请求中的日志就会丢失已提交的日志。

调试 Raft

调试是实验中最花时间的一个部分,文中列举了四个常见的问题:活锁,不正确或不完整的 RPC 实现,没有遵循论文中的 Rules for Servers 以及任期混淆。

活锁

出现活锁时,等同于各节点都在做无用功。选主是最容易出现活锁的场景,例如始终无法选出一个主节点,或者某个候选节点获得了足够的选票后,马上有另外一个节点发起选主,使得刚成为主节点的节点重新退回到从节点(某个节点成为主节点后,还没来得及发送心跳,刚投了票的某个节点 election timeout 到期,发起新的选主,由于它的任期加了1比当前的主节点的任期大,主节点收到 RequestVote 后发现自己的任期小,从而转为从节点)。

常见的原因有以下几种:

第一,没有严格按照 Figure 2 的要求重置 election timeout。重置 election timeout 仅限于几种情况:

  1. 收到当前任期的主节点的 AppendEntries 请求(如果 AppendEntries 中的任期比自身的任期小,则忽略该请求)。
  2. 开启一轮新的选主,这个属于 election timeout 到期,选主的同时需要重置一个新的随机值。
  3. 在某轮选举中,将票投给了某个候选节点。

第二,没有按照 Fiture 2 的要求开启选主。特别的,如果当前节点是候选节点,但是 election timeout 到期了,那么需要开启新的一轮选主。

第三,没有遵循 Rules for Servers 的第二条:

If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)

例如,当前节点已投票,按照 Raft 的要求,一个从节点在一个任期内只能投票一次,此时又收到了一个 RequestVote 请求,请求中的任期大于自身的任期,那么就需要再次投票并更新自身的任期,避免新的候选节点拿不到足够的选票。

不正确的 RPC 实现

这里总结了学生们过往在实现 RPC 接口时容易犯错的点:

  • 如果某一步要求 reply false,那么这说明需要立即回复 false 并返回,不要再执行后续的步骤。
  • 如果收到了一条 AppendEntries 请求,但是请求中的 prevLogIndex 比当前日志的最后一条的索引还要大,也要返回 false,这也属于日志不匹配。
  • 如前面描述,即使 AppendEntries 没有包含任何日志,接收方也需要校验 prevLogIndexprevLogTerm
  • AppendEntries 中的第五步的 min 是有必要的,而且是和新日志中的最后一条日志的索引相比较,而不是当前日志的最后一条索引。
    • 首先来看 leaderCommit < index of last new entry 的情况,假设 S1 为主节点,日志为 [1, 2, 3, 4, 5]leaderCommit 为3(索引按从1开始算),S2 为从节点,日志为 [1, 2],那么 S1 需要将 [3, 4, 5] 发给 S2S2 收到后 index of last new entry 是5,但是 commitIndex 只能更新到 3,也就是 leaderCommit,因为 [4, 5] 还没有提交。
    • 然后来看 leaderCommit > index of last new entry 的情况,乍看之下好像不可能,因为对于主节点来说,在添加一条新日志的那个时刻,leaderCommit 肯定小于新的日志的索引,复制日志到从节点时这个关系也不会变。这里只能想到的是,leaderCommit 被并发修改了,因为主节点不会删除日志,所以 index of last new entry 不会变,当主节点和从节点的日志不匹配时,主从节点需要来回的就 prevLogIndexprevLogTerm 达成一致(或者单纯就是因为执行的慢),在这个期间,可能过半数的节点已经达成了完成日志复制,主节点就可以提交这条日志,并更新 leaderCommit,所以在原先的 AppendEntries 中,如果 leaderCommit 引用的是全局变量,而 entries[] 是固定的,就会造成 leaderCommit 的值比 entries[] 的最后一条日志的索引还要大,这个时候从节点的 commitIndex 就需要取 index of last new entry。文中举了个例子说明为什么这时候不能取 leaderCommit,假设 S1 为主节点,日志为 [1, 2, 3, 6]leaderCommit 为3,S2 为从节点,日志为 [1, 2, 3, 4, 5],那么 S1 需要将 [6] 发给 S2,注意这里 prevLogIndexprevLogTerm 校验通过,而论文中只提到校验不通过时才删除从节点的日志,所以这里就不会删除 S2[4, 5],又假设此时 S1 通过其他节点的交互将日志成功提交到了 [1, 2, 3, 6, 7]leaderCommit 变为了第5个位置,S2 收到的 AppendEntriesleaderCommit = 5entries = [6],如果按照最新的 leaderCommit 更新就会造成把 S2 日志中的5归为已提交。
  • 严格按照论文5.4节的描述来比较两个节点的日志哪个较新,不要只比较日志的长度。

未遵循 Rules for Servers

  • 如果 commitIndex > lastApplied,说明可以应用新的日志到状态机中。应用到状态机的顺序必须严格按照日志的顺序,可以由一个单独的线程顺序执行,也可以由多个线程并行执行,但各线程间必须确保不互相干扰,避免执行覆盖,对于同一个变量的不同写请求,多线程间必须按照日志的顺序写入。
  • 确保周期性的检查 commitIndex > lastApplied 或者在 commitIndex 更新后检查(在 matchIndex 更新后)。这里并没有太明白这个问题,作者举了个例子,假设现在主节点发出了 AppendEntries 请求并同时检查 commitIndex,如果这条日志被过半数的节点复制了,那么需要等到主节点再添加一条新日志后才能应用上一条日志到状态机。不过根据这个例子也没有看出两者的关联,因为如果采用单线程顺序应用日志到状态机的方式,如果 commitIndex > lastApplied 不满足,线程可以先睡眠然后再尝试,除此以外没有依赖其他条件。
  • 假设主节点发出 AppendEntries 请求然后被拒绝了,如果不是因为日志不一致被拒绝,那么这个主节点就必须马上转为从节点,且不能更新 nextIndex,因为这说明当前主节点的任期小于从节点。如果错误的更新了 nextIndex,而这个主节点在转为从节点后又当选了主节点,就会造成 nextIndex 和从节点永远不会匹配的情况(相当于错位了一格)。
  • 如果主节点发现某条过去任期的日志被大多数的节点复制了,但是自身的 commitIndex 却小于这条日志的索引,此时主节点也不能够更新 commitIndex,这个就是论文中 Figure 8 描述的问题,提交一条之前任期的日志存在被新一轮的主节点覆盖的风险。所以提交日之前需要判断 log[N].term == currentTerm

有一个常见的困惑是 nextIndexmatchIndex 的区别。你可能会观察到 matchIndex = nextIndex - 1,然后用 nextIndex 来替代 matchIndex,然而这是有风险的。举一个很明显的反例,假设某个节点被选为主节点,此时收到一个客户端请求并添加了一条日志,那么它发给从节点的 nextIndex 就是最后一条日志的下一条,而 matchIndex 显然不是 nextIndex - 1(新添加的日志的索引)。

任期混淆

任期混淆指的是节点收到了来自之前任期的 RPC 请求。Figure 2 清晰的描述了收到过期任期的请求应该怎么做,但是并没有说明收到过期任期的响应应该怎么做。一个简单的做法就是比较响应中的任期和当前的任期,如果相等则执行相应的逻辑,如果当前任期小则转为从节点(只有主节点、候选节点才能发请求,才有收到响应的可能),如果当前任期大则不处理。

还有一个相关但不同的问题,假设主节点收到 AppendEntries 响应后设置 matchIndex = nextIndex - 1matchIndex = len(log),这都是不安全的,因为 nextIndexlen(log) 在这期间都有可能改变,正确的做法应该是使用原始请求中不变的参数,即 matchIndex = prevLogIndex + len(entries[])

关于性能

实验中会要求实现两个关于性能方面的优化,一个是日志压缩,另一个是快速确认正确的 prevLogIndex

关于日志压缩需要注意几点:

  • 当执行快照时,需要确保快照中的状态和指定日志中的索引是严格对应的,即快照不能落后于日志,也不能超前。
  • 论文中并没有描述当节点异常然后恢复后应该如何处理快照。论文中要求创建完快照后,需要删除快照所对应的日志,而系统可能在删除日志前异常,那么系统恢复后会根据快照和日志进行重放。文中说系统可能会重放已经在快照中的日志,这里有点疑问,因为快照中包含了 lastIncludedIndexlastIncludedTerm,和当前日志进行对比就能知道哪些日志已经在快照中了,从而可以跳过。

论文中并没有详细描述如何快速确认正确的 prevLogIndex,这部分的逻辑可以参考 6.824 2022 Lecture 7: Raft (2) 或者文中的建议:

  • 如果从节点在 prevLogIndex 处没有日志,说明从节点的日志较短,则返回 conflictIndex = len(log) 以及 conflictTerm = None
  • 如果从节点在 prevLogIndex 处有日志,但是日志对应的任期不匹配,那么返回 conflictTerm = log[prevLogIndex].Term,然后找到属于 conflictTerm 的第一条日志的索引。
  • 主节点收到 conflictIndex/conflictTerm 后,首先在日志中搜索 conflictTerm,如果找到有日志属于 conflictTerm,那么 nextIndex 需要更新为主节点中属于 conflictTerm 的最后一条日志的索引加1。
  • 如果主节点没有找到属于 conflictTerm 的日志,那么更新 nextIndexconflictIndex

其中一个不完善的实现是只考虑 conflictIndex 而忽略了 conflictTerm,这样会简化实现,不过主从节点交互的次数会变多,因为如果考虑了 conflictTerm,那么一次性能跳过的日志会更多。

最后一部分的 Applications on top of Raft 属于 Lab 3 的内容,即基于 Raft 构建应用,在 Lab 2 中暂不描述。

参考

介绍

共识算法使得一批机器作为一个整体对外提供服务,同时当部分机器异常时系统仍能保证可用性。因此,共识算法在构建可靠的大型软件系统中扮演着至关重要的角色。而在所有的共识算法中,Paxos 占据了主导的地位:大部分共识算法的实现都是基于 Paxos 或者受其影响,并且它也成为了教授学生们共识算法的第一选择。

然而,尽管人们为了让 Paxos 易于理解做了大量的尝试,Paxos 依然非常难以理解。另外,如果要将 Paxos 应用到实际的系统中需要涉及复杂的修改。因此,系统开发人员和学生都受困于 Paxos

在受困于 Paxos 之后,Raft 的作者开始尝试设计一种新的共识算法,从而能够为系统构建和教学提供一个更好的基础。和常见的共识算法的设计目标不同,这个新的共识算法的首要设计目标是可理解性:能否为实际的系统设计一个远比 Paxos 易于理解的共识算法?另外,对于系统构建者来说这个算法需要能易于实现。该算法仅仅是正确的还不够,重要的是能显而易见的被人们理解为什么是正确的。

这个新的共识算法就是 Raft。在设计 Raft 时为了提高可理解性作者借助了某些特定的手段,包括解耦(Raft 分离了选主,日志复制和安全性的处理)和减少了状态机的状态(相比于 PaxosRaft 减少了非确定性的状态以及各服务器间不一致的情况)。根据两所大学共43名学生的调查反馈,Raft 明显比 Paxos 易于理解:在学习了两种共识算法后,有33名学生在回答关于 Raft 的问题时比回答关于 Paxos 的问题表现的更好。

Raft 和现今已有的共识算法有很多相之处(特别是 OkiLiskovViewstamped Replication),不过也有几个方面的创新:

  • 强主节点(Strong leader):相比于其他共识算法,Raft 使用了更严格的主节点要求。例如,日志只会从主节点发往其他服务器。这简化了日志复制的管理,同时也使得 Raft 更易于理解。
  • 选主(Leader election):Raft 使用随机计时器来进行选主(后面会提到主节点和各从节点间会维持一个心跳,如果在一段时间内从节点没有收到心跳,那么就可以认为主节点异常,从而重新发起选主,对于每个从节点来说,这个等待的时间不是固定的,而是随机的)。因为心跳本身在共识算法中是一个必不可少的技术,使用随机计时器仅仅在这之上增加了一些额外的机制,却能简单快速的解决选主冲突问题(例如有两个节点瓜分了存活着的节点的全部选票,却没有任何一个节点获得了超过半数的选票,需要重新选主)。
  • 节点变更(Membership changes):当集群中的节点需要变更时,Raft 使用 joint consensus 机制来保证在变更时新旧两套集群下过半数的节点同时属于两套集群。这就保证了整个集群在节点变更时依然正常对外提供服务。

Raft 的作者认为不管是出于教学还是实现目的,Raft 都优于 Paxos 和其他共识算法。它相比于其他共识算法更简单和易于理解;也完全覆盖了实现一个实际系统的需求;它也有了一些开源的实现并且已经被一些公司所使用;它的安全性也已被正式定义和证明;其性能也不输给其他共识算法。

复制状态机

谈论共识算法时一般离不开复制状态机(replicated state machines)。在这个模型下,集群中的每台机器上的状态机能产生有着相同状态的副本,并且在某些机器异常时整个系统依然能对外提供服务。复制状态机被用于解决分布式系统中的一系列容错问题。例如,对于 GFSHDFSRAMCloud 这样的单主节点的大型系统来说,一般会用一个独立的复制状态机来管理选主,以及存储某些配置信息,并且主节点发生异常时这些信息也不会丢失。复制状态机的应用包括 ChubbyZooKeeper

alt

复制状态机一般通过复制日志来实现。在上图中,每台机器保存的日志中包含了一系列命令,这些命令会被状态机按顺序执行。每份日志中以相同的顺序保存着相同的命令,所以每台状态机能以相同的顺序执行这些命令。因为状态机是确定性的,所以最终所有状态机的状态和输出的结果都是相同的。

共识算法的任务就是要保证这些日志数据的一致性。服务器上的共识模块收到客户端的命令后会将其添加到本地日志中。然后它会和其他服务器上的共识模块通信来保证即使在某些服务器异常的情况下,各服务器也会以相同的顺序记录下所有的请求日志。当客户端命令被正确复制后,每台服务器上的状态机会以日志中的顺序执行这些命令,然后将结果返回给客户端。从整体上来说,所有的服务器对外组成了一个独立,高可用的状态机。

针对实际系统的共识算法来说一般有以下几个特性:

  • 保证在所有非拜占庭情况下的正确性(永远不会返回一个错误的结果给客户端),这里的场景包括网络延迟,网络分区,网络包丢失、重复和重排序等等。
  • 只要系统中过半数的服务器依然存活并且相互间以及和客户端间可以通信,整个系统对外来说依然是可用的。因此,一个由五台服务器组成的集群可以容忍任意两台服务器的异常。如果某台服务器停止响应则会被认为是异常,它可能之后会自动恢复并加载持久化存储上的状态然后重新加入到集群中。
  • 不依赖时间来保证日志的一致性:错误的时钟和极大的消息延迟在最坏的情况下会造成可用性问题。
  • 在一般情况下,当集群中过半数的服务器在一轮 RPC 请求中成功响应时,这次的客户端请求就被视为完成,剩下少数响应缓慢的服务器不会影响整个系统的性能。

Paxos 的问题

Leslie LamportPaxos 协议几乎成为了共识算法的代名词:它是在课堂上被教授的最多的协议,以及大部分的共识算法的实现都以此为出发点。Paxos 首先定义了一个协议能够对单一决策达成共识,例如复制某一条日志。这个被称之为 single-decree Paxos。然后,Paxos 能组合多个单一决策以达成对一系列决策的共识(multi-Paxos),例如一整个日志文件。Paxos 保证了安全性和存活性,同时支持集群中的节点变更。它的正确性已经被证明而且在常规使用中已足够高效。

不幸的是,Paxos 有两个重大的缺点。第一个缺点是 Paxos 非常难以理解。它的完整的解释是众所周知的晦涩难懂,只有少数人拼尽全力后才能正确理解。因此,人们尝试用通俗易懂的方式来解释 Paxos。不过这些尝试主要针对的是 single-decree Paxos,虽然也足够具有挑战性。根据 NSDI 2012 与会者的一项非正式调查显示,即使在经验老到的研究者中,也只有少数人能掌握 PaxosRaft 的作者自身也受困于 Paxos,直到它们读了某些简化的 Paxos 的解释和实现了 Raft 之后才理解了 Paxos 的完整的协议,而这花了几乎一年的时间。

Raft 的作者认为 Paxos 的晦涩难懂来源于 Paxos 选择 single-decree 作为其协议的基础。single-decree Paxos 难以理解:它被分为两阶段但是又缺少简单直白的解释来单独理解每个阶段。鉴于此,人们也很难理解为什么整个 single-decree 协议是正确的。而由 single-decree Paxos 组合而来的 multi-Paxos 则更添加了额外的复杂性。Raft 的作者相信多决策共识的问题能够以更直白的方式拆解。

Paxos 的第二个问题是没有为构建实际的系统提供坚实的基础。其中一个原因是还没有一个被广泛认可的 multi-Paxos 算法。Lamport 的论文中主要描述的是 single-decree Paxos;他只是概括性的描述了 multi-Paxos,但是缺少很多细节。虽然人们有很多尝试来补充和优化 Paxos,但是它们相互之间以及和 Lamport 的描述都各有不同。虽然有些系统如 Chubby 实现了类似 Paxos 的算法,但是实现的算法细节并没有公开。

另外,Paxos 的架构对于实际系统的构建来说不够友好,这也是一个将 single-decree 分为两阶段后造成的后果。例如,没有必要独立的选择一些日志然后再将其合并为有序的日志,这只会增加复杂性。相比而言,设计一个以日志为中心的系统并且只允许按照指定的顺序来追加写日志会更简单和高效。另一方面,Paxos 的核心实现采用了对等点对点的方式(虽然它最终建议一种弱主节点的方式来作为一种性能优化的手段)。这种设计适合于单一决策共识的场景,不过很少有实际的系统采用这种方式。当需要对一系列决策达成共识时,首先选择一个领导者然后由领导者协调做决策会更简单和高效。

因此,实际的系统很少采用 Paxos 的方式。每一种实现都基于 Paxos,然后在实现时遇到了困难,接着就演变出了大不相同的架构。这既费时又容易出错,Paxos 的难以理解又加剧了这个问题。Paxos 的描述可能非常适合证明其正确性,不过实际系统的实现却大相径庭,Paxos 的证明也没有什么太大的帮助。来自 Chubby 的实现者的评论一针见血的指出了这个问题:

Paxos 的描述和现实世界的系统的实现间存在巨大的鸿沟…最终的系统将会构建在一个没有被证明的协议上。

鉴于以上的问题,Paxos 即没有为系统构建也没有为教学提供一个坚实的基础。考虑到共识算法在构建大型软件系统中的重要性,Raft 的作者决定尝试能否设计成比 Paxos 更优秀的共识算法,Raft 因此应运而生。

为可理解性设计

作者在设计 Raft 时有几个目标:必须为系统构建提供完整坚实的基础,从而能大大减少开发人员的设计工作;必须在任何场景下保证安全性以及在特定操作场景下保证可用性;大多数的操作必须高效。不过最重要也是最困难的是可理解性。它必须能让大部分的受众易于理解。另外,这个算法必须能让人形成直观的认识,系统构建者就可以在实现时进行必要的扩展。

在设计 Raft 时有很多方面需要在多种方案中做选择。在选择哪种方案时基于的是可理解性:解释每种方案难度有多大(例如,其内部状态有多复杂),读者完全理解这个方案需要付出多大的努力?

虽然做这样的分析有很大的主观性,作者使用了两方面的手段来解决这个问题。第一个手段是众所周知的问题分解:只要有可能,作者都会先将一个问题分解为一系列独立可解决,可解释,可相对的单独理解的子问题。例如,在 Raft 中选主,日志复制,安全性,集群节点变更被分解为独立的模块。

第二个手段是通过减少系统状态的数量来简化系统状态,这样就使得系统更具一致性并尽可能的消除非确定性。特别的,系统中的日志不允许有空洞,Raft 也限制了各节点间日志不一致的场景。虽然在大多数情况下会尽可能的消除非确定性,不过在某些场景下非确定性却更有助于理解。特别是随机化会带来非确定性,不过它能以相似的手段来处理所有可能的场景来降低系统状态的复杂性。Raft 使用随机化来简化了选主算法。

Raft 共识算法

Raft 是一种管理第二节中所描述的复制日志的算法,下面描述了该算法的关键特性:

  • Election Safety:在任一任期内最多只有一个节点被选为主节点。
  • Leader Append-Only:主节点永远不会覆盖或者删除某些日志项,它只会追加写新的日志项。
  • Log Matching:如果两份日志在同一索引处的日志项对应相同的任期,那么双方在这个索引之前的日志项都相同。
  • Leader Completeness:如果某个日志项在某个任期内被提交了,那么这条日志会出现在后续所有新任期下的主节点的日志中。
  • State Machine Safety:如果某台服务器将某个索引位置的日志应用到了自身的状态机中,那么不会有任何一台服务器在相同索引位置应用了一条不同的日志到状态机中。

Raft 在实现共识时会先进行选主,然后完全交由主节点来管理日志的复制。主节点接收来自客户端的日志请求,然后将日志复制到其他从节点上,最后在合适的时机告诉各从节点将日志中的内容应用到自身的状态机中。使用主节点的方式简化了复制日志的管理。例如,主节点可自行决定在哪里插入新的日志而不用和其他服务器交互,其他数据流也是类似的只会从主节点流向从节点。当主节点异常或无法和其他从节点连通时,系统会选举一个新的主节点。

通过主节点的方式,Raft 将共识问题分解成了三个相对独立的子问题:

  • 选主:当主节点异常时,系统必须选举一个新的主节点。
  • 日志复制:主节点必须从客户端接收日志请求,然后将其复制到其他从节点上,并强制要求其他从节点的日志以主节点的为准。
  • 安全性:如果任意一台服务器已经将某条日志应用到了自身的状态机中,那么其他任何服务器都不能在这条日志对应的索引上应用不同的命令到状态机中。

Raft 基础

一个 Raft 集群包含若干台服务器,5台是一个常见的配置,这允许系统最多能容忍两台服务器的异常。在任一时刻,每台服务器只会处于其中一个状态:主节点(leader),从节点(follower),或者候选节点(candidate)。在正常操作下,系统中只有一个主节点,剩下的都是从节点。从节点是被动的:它们不会主动发起任何请求,只是简单的响应来自主节点和候选节点的请求。主节点会处理所有来自客户端的请求(如果客户端将请求发给了一个从节点,从节点会将其转发给主节点)。候选节点这个状态会在选举新的主节点时用到。下图展示了各节点状态间的转换:

alt

如下图所示,Raft 将时间切分为任意长度的任期(term)。任期会以连续的整数来标记。每个任期以选举作为开始,一个或多个候选节点会尝试成为主节点。如果某个候选节点赢得了选举,那么在这个任期剩下的时间里它将作为主节点。在某些情况下,多个候选节点可能会分票,造成没有一个候选节点赢得半数的选票。在这种情况下,当前任期会以没有主节点的状态结束,接着系统会马上对新的任期发起新的一轮选举。Raft 保证在任一任期内至多只有一个主节点。

alt

不同的服务器可能会观测到不同次数的任期转变,在某些情况下一台服务器可能观测不到某次选举的发生或者感知不到整个任期。任期在 Raft 中扮演了逻辑时钟的角色,它能允许服务器侦测过期的信息例如过期的主节点。每台服务器都会保存一个当前任期(current term)的数字,这个数字会随时间递增。在服务器间通信时双方会附加上当前任期,如果某台服务器的当前任期小于另外一台服务器,那么这个服务器会将当前任期更新为另一台服务器的值。如果某个候选节点或者主节点发现自己的当前任期过期了,那么它会马上转为从节点状态。如果某台服务器收到的请求中的任期过期了,那么它会拒绝这个请求。

Raft 服务器间通过 RPC 进行通信,基础的共识算法只需要两种 RPCRequestVote 用于候选节点在选主期间获取选票,AppendEntries 用于主节点向各从节点复制日志以及充当心跳的作用。如果某个 RPC 请求在一段时间内没有响应,那么服务器会重新发起请求,同时服务器也会并行发送 RPC 请求来达到最佳性能。

选主

Raft 通过心跳机制来触发选主。当服务器启动时,它们的初始状态是从节点。只要服务器能收到来自候选节点或者主节点的有效请求,就会一直出于从节点状态。主节点会定期的向所有从节点发送心跳(不带任何日志的 AppendEntries 请求)来维持自己主节点的地位。如果在某段时间内某台从节点没有收到心跳,那么它就会认为此时没有存活的主节点,则会发起选主来选择一个新的主节点,这个等待时间就叫做 election timeout

开始新的一轮选主时,从节点会先将当前任期递增然后转换为候选节点。接着,它会给自己投一票然后并行发起 RequestVote 请求给其他从节点来获取选票。一个候选节点会保持当前的状态直到下面三种情况之一发生:

  1. 当前候选节点赢得了选举
  2. 有另外一个候选节点赢得了选举
  3. 一段时间之后没有一个候选节点赢得选举

当候选节点获得了集群中针对某个任期的过半数节点的选票时就赢得了选举。在某个任期内,每台服务器只会最多给一个候选节点投票,以先来后到为准。过半数的选票保证了在某个任期内最多只会有一个候选节点被选举为主节点(Election Safety Property)。当某个候选节点赢得选举时,它就成为了主节点。然后它就开始向其他服务器发送心跳来维持主节点的状态并阻止其他节点继续发起选主。

候选节点在等待选票时有可能收到其他自认为是主节点的 AppendEntries 的请求。如果请求中的任期号不小于当前候选节点记录的任期号,则该候选节点会将此主节点作为主节点并转为从节点状态。如果请求中的任期号小于当前候选节点记录的任期号,则该候选节点会拒绝此请求并继续处于候选节点状态。

第三种情况是没有一个候选节点赢得了选举:当很多从节点在同一时间转变为候选节点时,会分散选票,最终造成没有一个候选节点赢得过半数的选票。当发生这种情况时,候选节点会将此次选主作超时处理,然后再次将当前任期自增,重新发起新的任期的选主,并向其他从节点继续发起一轮 RequestVote 请求。不过,如果缺少额外机制,分票可能会一直持续下去。

Raft 通过随机的 election timeout 来确保分票极少会发生并且在发生时能快速解决。为了避免分票,首先 election timeout 的值会在一个固定区间内随机选择(例如150-300ms)。这就分散了各从节点的选主启动时机,使得在大多数的情况下只有一个从节点会进入选主状态;在其他从节点进入选主状态之前,这个节点就已经赢得了选举并向其他从节点发送了心跳,这就大大扼杀了分票的可能性。同样的机制也被用来解决当分票确实发生的场景,每个候选节点在启动新的选主时会重新设置一个随机的 election timeout,然后在这段期间内等待其他从节点的选票,或者新的主节点的心跳,假设第一轮选主发生了分票,那么由于随机 election timeout 的存在,不同的候选节点进入第二轮选主的时机也不会相同,这就降低了第二轮选主继续发生分票的可能性。

选主是一个很好的例子展示了可理解性这个设计目标如何来指导在不同设计方案中做出选择。在最初的方案中作者打算使用一个排序系统:每一个候选节点被分配了一个唯一的权重,用来在多个候选节点中选择最终的主节点。如果某个候选节点发现其他候选节点的权重比自己高,那么这个候选节点就会退回到从节点状态,这就使得有着更高权重的候选节点能更容易的赢得下一轮的选举。不过这种实现可能会造成难以察觉的可用性问题(在不采用随机 election timeout 的情况下,当某个高权重的候选节点异常时,由于低权重的候选节点已经退回到了从节点状态,它需要再等待一个 election timeout 周期才能再次转变为候选节点,而在正常的情况下本身各节点间的信息交换速度较快,其他候选节点可能都已经退回到了从节点状态,此时高权重的候选节点异常就会造成系统没有候选节点,而距离各从节点进入选主状态又还有较长时间,从而造成系统在这段期间的不可用)。虽然作者对该算法进行了多次调整,但是每次调整后都会出现新的边界问题。最终作者认为随机化的 election timeout 更胜一筹和易于理解。

日志复制

当某个候选节点被选为主节点后,它就开始处理客户端请求。每个客户端请求中包含了复制状态机需要执行的命令。主节点将这个命令以日志的形式追加到自己的日志文件中,然后并行的给所有从节点发送 AppendEntries 请求来复制日志。当日志在各从节点上被安全的复制后,主节点就将日志对应的命令应用到自身的状态机中,并将结果返回给客户端。如果某个从节点异常或者运行缓慢,或者网络包丢失,主节点会一直重发 AppendEntries 请求(即使主节点已经将结果返回给了客户端),直到所有从节点保存了所有的日志。

日志内容的组织如下图所示,每一条日志包含了状态机需要执行的命令以及主节点收到该请求时对应的任期。日志中的任期信息用来检测日志间的不一致性以及保证前面所提到的 Raft 的几个关键特性。每条日志同时有一个索引信息来标记这条日志在日志文件中的位置。

alt

在上图中,方块中形如 x <- 3 表示客户端的命令,命令上方的数字表示任期。

主节点会决定什么时候能安全的将某条日志对应的命令应用到状态机中,该条日志就被称为已提交(commited)。Raft 保证已提交的日志是持久化了的,并且最终会被所有可用的状态机执行。当主节点将某条日志复制到集群中过半数的机器上时(例如上图中索引位置为7的日志),这条日志就会被标记为已提交。同时,该条日志之前的日志也都会被提交,包括从其他主节点复制而来的日志。主节点维护了已提交日志中的最大的索引值,并将其附加到后续的 AppendEntries 请求中(包括心跳),所以最终所有的机器都能知道当前的最远日志提交位置。当某个从节点发现某条日志已经提交了,它就会将该条日志对应的命令应用到自身的状态机中(以日志中出现的顺序执行命令)。

Raft 设计的日志机制能保证在不同机器间高层次下的一致性。这不仅简化了系统的行为和使得系统更有可预测性,同时也是保证安全性的重要组成部分。Raft 保证了下面两个性质(同时也组成了 Log Matching Property):

  • 如果两个日志文件中相同索引位置的日志保存着相同的任期,则它们保存着相同的客户端命令
  • 如果两个日志文件中相同索引位置的日志保存着相同的任期,则在这个索引位置之前的日志都相同

第一个性质保证是因为主节点在某个任期内最多只会在某个索引位置创建一条日志,而日志的位置创建后就不会改变。第二个性质保证则是通过 AppendEntries 请求中的一致性检查来实现。当主节点发送 AppendEntries 请求时,主节点会附带上本次需要复制的日志的前一条日志的索引和对应任期。如果从节点没有在自己的日志中找到这个指定索引和任期的日志,那么它会拒绝新日志的写入请求。日志的一致性检查类似于数学归纳法:初始情况日志为空,所以满足 Log Matching Property,假设从索引位置1到 i 的日志都满足 Log Matching Property,那么当主节点在成功提交了新的日志即索引位置 i + 1 的日志后,则在 [i + 1, i + 1] 这个范围满足了 Log Matching Property,从而得出从索引位置1到 i + 1 的日志都满足了 Log Matching Property。因此,只要 AppendEntries 的请求成功返回,那么主节点就知道从节点的日志和自己的相同。

在正常的操作下,主节点的日志和从节点的日志始终能保持一致,所以 AppendEntries 的一致性检查从来不会失败。不过,当主节点异常时则会造成日志不一致(主节点还没有来得及复制所有的日志)。同样的,从节点的异常也会造成日志不一致(从节点没有收到所有的日志)。下图展示了从节点的日志可能和新的主节点不同的情况。一个从节点有可能缺少某些主节点拥有的日志,或者拥有一些主节点没有的日志,或者两者都有。这种日志的缺少或多余的情况可能会横跨几个任期。

alt

在上图中,方块中的数字表示任期。当前已提交的日志的索引位置是1-9,一共有四台机器在1-9位置的日志相同,符合过半数的原则。ab 相比于主节点来说缺少日志,cd 相比于主节点来说有多余的日志,ef 两种情况都有。对于 f 来说,它在任期2中被选为主节点,然后开始接受客户端请求并写入本地日志,但是还没有成功复制到其他从节点上就异常了,恢复后又被选举为任期3的主节点,又重复了类似的操作。

Raft 中,主节点通过强制从节点复制自己的日志来保证日志的一致性。也就是对于从节点来说,和主节点日志不一致的地方会被主节点的日志覆盖。

为了让从节点的日志和主节点保持一致,主节点必须知道到哪个索引位置为止主从节点间的日志是一致的,然后删除从节点在这个索引位置之后的日志,并替换为主节点中的日志。主节点为每个从节点维护了一个 nextIndex 变量,用来表示下一条由主节点发送给从节点的日志索引位置。当某个节点成为主节点时,它先将 nextIndex 初始化为自身日志文件中最后一条日志的索引位置加1。如果从节点的日志和主节点不一致,那么在下一次的 AppendEntries 请求中会返回失败。当主节点收到失败响应后,会将 nextIndex 减1并重新发送 AppendEntries 请求(前面提到,当主节点发送 AppendEntries 请求时,主节点会附带上本次需要复制的日志的前一条日志的索引和对应任期,从节点会根据这两个值来决定是否接受写入,当 nextIndex 减1时,在新的 AppendEntries 请求中这两个值也需要相应的往前移),最终 nextIndex 会等于某个值使得主从节点在 nextIndex 之前的日志都相同。此时 AppendEntries 会返回成功,从节点会将 nextIndex 及其之后的日志都替换为主节点的日志,至此,主从节点的日志就恢复了一致,并一直保持到当前任期结束。

如果需要的话,可以对上述的交互协议进行优化来减少 AppendEntries 的次数,尤其是从节点的日志落后主节点太多的时候。例如,当从节点需要拒绝 AppendEntries 的请求时,可以在响应结果中带上不一致的日志的任期,以及该任期下的第一条日志的索引位置。根据这个信息,主节点就可以大幅减少 nextIndex 的值从而直接跳过该任期下不一致的日志,而不是一次 RPC 请求识别一条日志。不过作者怀疑这个优化在实际中是否有必要,因为异常并不会频繁发生所以不太可能会有那么多的不一致。

针对上面的优化手段作者并没有描述过多的细节,在 6.824 2022 Lecture 7: Raft (2)Robert Morris 提出了自己的猜想。在下图中,S1 是主节点,S2S7 是从节点,其中 S2S4 节点的日志和主节点不一致,日志索引1到3已提交,现在主节点要在索引位置4追加一条新日志,分别来看 S2S4 如何处理。

alt

Robert Morris 认为当从节点拒绝 AppendEntries 请求时,需要返回三个信息给主节点:

  • XTerm:冲突的日志的任期(如果有的话)
  • XIndex:冲突的日志的任期下的第一条日志的索引位置(如果有的话)
  • XLen:整个日志的长度

prevLogIndex 表示主节点发送给从节点的上一条日志的索引位置,prevLogTerm 表示上一条日志所属的任期。则在主节点的第一轮 AppendEntriesprevLogIndex = 3prevLogTerm = 6,对于 S2S4 初始化的 nextIndex 为:

1
2
3
4
5
{
"S2": 4,
"S3": 4,
"S4": 4
}

对于 S2,首先检查自己日志中索引位置为 prevLogIndex = 3 的日志,发现其任期为5,和 prevLogTerm = 6 不匹配,从而拒绝请求,并返回 XTerm = 5XIndex = 2XLen = 3。主节点收到结果后,发现 S2prevLogIndex 指向的任期和自己不同,这里比主节点小所以主节点往前遍历日志,发现没有任期5的日志,说明 S2 中整个任期5的日志都可以跳过,因此主节点将 S2nextIndex 修改为 XIndex,即 nextIndex = 2。在第二轮的 AppendEntries 请求中,prevLogIndex = 1prevLogTerm = 4,同时主节点也会将索引位置2到3的日志连同最新的日志一起随请求发送。S2 再次收到请求后,发现这次 prevLogIndex/prevLogTerm 标记的日志和自己匹配,因此返回成功,并将主节点的日志覆盖到索引位置2到4中。

对于 S3,首先检查自己日志中索引位置为 prevLogIndex = 3 的日志,发现其任期为4,和 prevLogTerm = 6 不匹配,从而拒绝请求,并返回 XTerm = 4XIndex = 1XLen = 3。主节点收到结果后,发现 S3prevLogIndex 指向的任期和自己不同,这里比主节点小所以主节点往前遍历日志,发现了任期4的日志,说明 S3 中任期4的日志比主节点多,因此主节点将 S3nextIndex 修改为2,即主节点中任期4的最后一个日志的索引位置加1(这里 Robert Morris 的原话是 leader's last entry for XTerm,不过为了能统一处理下一次请求中的 prevLogIndexprevLogTerm,这里加了1)。在第二轮的 AppendEntries 请求中,prevLogIndex = 1prevLogTerm = 4,同时主节点也会将索引位置2到3的日志连同最新的日志一起随请求发送。S3 再次收到请求后,发现这次 prevLogIndex/prevLogTerm 标记的日志和自己匹配,因此返回成功,并将主节点的日志覆盖到索引位置2到4中。

对于 S4,首先检查自己日志中索引位置为 prevLogIndex = 3 的日志,发现不存在,所以拒绝请求,并返回 XTerm = nullXIndex = nullXLen = 1。主节点收到结果后,发现 XTermXIndex 都为空,说明 S4 的日志比自己短,因此主节点将 S4nextIndex 修改为2,即 S4 的日志长度加1(这里 Robert Morris 的原话是 XLen,不过为了能统一处理下一次请求中的 prevLogIndexprevLogTerm,这里加了1)。在第二轮的 AppendEntries 请求中,prevLogIndex = 1prevLogTerm = 4,同时主节点也会将索引位置2到3的日志连同最新的日志一起随请求发送。S4 再次收到请求后,发现这次 prevLogIndex/prevLogTerm 标记的日志和自己匹配,因此返回成功,并将主节点的日志覆盖到索引位置2到4中,如果不匹配,则又会流转到上面两种情况中。

在这种机制下,主节点不需要采用其他特殊的操作就能保持从节点日志的一致性。通过正常的 AppendEntries 请求,结合其一致性检查的功能,从节点的日志就可以自行恢复一致。而对于主节点来说,它永远不会覆盖或者删除自己的日志(Leader Append-Only Property)。

这种日志复制机制展现了第二节中描述的共识算法所需要的特性:只要集群中过半数的机器存活,Raft 就能接收,复制和应用新的日志;在正常情况下,只需要一轮过半数机器的 RPC 请求响应成功就能复制一条新日志;一台运行缓慢的机器并不会影响整体的性能。

安全性

前面几节描述了 Raft 如何选主以及复制日志。不过,这些机制还不足以保证每个节点能以相同的顺序执行相同的命令。例如,某个从节点可能在某段期间内异常了,在这段期间内某个主节点已经提交了部分日志,此时假设主节点异常,之前异常的节点恢复并被选为主节点,根据前面所描述的 AppendEntries 的工作流程,它就有可能将其他从节点中之前提交的日志覆盖掉。因此,不同的节点可能会执行不同的命令序列。

本节通过描述了什么样的节点才允许被选为主节点来补充完善 Raft 算法。这个选主的限制保证了如果某个节点被选为了主节点,那么它就一定包含之前所有任期内由其他主节点提交的日志(Leader Completeness Property)。根据这个选主限制,我们可以使日志提交的规则更加清晰。

选主限制

在所有基于主节点的共识算法中,最终主节点必须保存所有已提交的日志。在某些共识算法中,例如 Viewstamped Replication,如果某个节点没有包含所有已提交的日志也能被选为主节点。这些共识算法有额外的机制来识别出缺失的日志,并在选主期间或之后将缺失的日志发送给主节点。然而,这就增加了额外的机制和复杂性。Raft 采用了一种更简便的方案来保证每一个被新选举的主节点一定包含了之前所有主节点提交的日志,而无需额外传输缺失的日志给主节点。这就表明日志始终是单向流动,即从主节点流向从节点,并且主节点永远不会覆盖已经存在的日志。

Raft 在选主阶段会避免某个没有完整提交日志的候选节点成为主节点。一个候选节点必须和集群中过半数的节点通信来获取选票,这说明每一条提交的日志都至少存在于其中一台节点上(假设有 S1S5 五个节点,之前 S1 是主节点并将某条日志成功提交到 S1S2S3,此时 S1 异常假设 S4 成为新的主节点,并获得了 S3S4S5 的选票,由于过半数原则,新的主节点的选票必然和之前的主节点的选票存在重合,也就说明必然至少有一台机器上保存了之前主节点提交的日志)。如果候选节点的日志至少和过半数的节点的日志一样新(一样新的定义见后文描述),那么它将会拥有所有已提交的日志。RequestVote 接口实现了这个限制:发送请求时会带上候选节点的日志信息,如果其他节点的日志比这个候选节点还要新,则会拒绝这个候选节点的选票。

Raft 通过比较两个日志文件中的最后一条日志的索引位置和任期来决定哪个日志较新。如果两个日志的任期不同,则更高任期的日志较新;如果两个日志的任期相同,则索引位置大的日志较新。

提交之前任期的日志

对于主节点来说,如果当前任期下的某条日志被过半数的节点成功复制,那么这条日志就可以被提交。如果日志变为已提交前主节点异常了(已复制的副本未过半数或者还没有来得及执行提交),后续的主节点会尝试继续复制该日志。然而,主节点并不能马上下结论说某条之前任期产生的日志在过半数的节点上保存后就算被安全提交了。

alt

上图描述了某条日志被过半数的节点复制后,有可能会被某个新的主节点的日志所覆盖的情况。在 a 中,S1 是主节点,并且部分复制了索引位置2的日志到其他从节点上;在 b 中,S1 发生异常,S5 被选为主节点,在索引位置2的地方追加了一条任期3的日志;在 c 中,S5 发生异常,S1 恢复并再次被选为了主节点,此时任期是4,S1 会继续通过 AppendEntries 的一致性检查将索引位置2的日志继续复制给过半数的节点,但是还未提交,同时又在索引位置3的地方追加了一条任期4的日志;在 d 中,S1 发生异常,S5 成为主节点,则 S5 会将自己的索引位置2的日志复制给其他的从节点;而在 e 中的另一种情况下,如果 S1 在异常前将当前任期下的索引位置3的日志复制到了过半数的节点上,那么索引位置3的日志就可以被提交,因为 S5 节点不可能会成为主节点(它的日志相比于其他过半数的节点来说不够新)也就不会覆盖,而在此时,索引位置3之前的日志也可以被认为是已提交的。

为了避免上面 d 所描述的情况,Raft 永远不会通过计算某条之前任期的日志副本的数量来判断这条日志是否能提交(也就是 c 中的情况,此时 S1 是主节点,任期是4,通过和其他从节点的通信将任期2的日志复制给了过半数的从节点,但是 S1 不会提交任期2的日志,因为一旦提交就有可能出现 d 中的情况)。只有当前主节点当前任期下生成的日志才会根据已复制的副本的数量来判断是否能提交;如果当前任期下的某条日志以这种方式提交了,那么根据 Log Matching Property 这条日志之前的日志也会间接的被提交。虽然在某些场合下,主节点可以知道某条之前任期的日志是被提交的(例如,如果这条日志在每个节点上都有保存),但是 Raft 出于简洁性的考虑采用了更保守的策略。回到上面的例子,在 c 中即使 S1 将索引位置2的日志提交了才发生异常,也依然有可能发生 d 的情况,因为 d 中的 S5 并不知道其他过半数的从节点提交了索引位置2任期2的日志(除非有额外的机制,所以这里说 Raft 采用了保守的策略没有引入其他机制),也就是说提交一个之前任期的日志并不能保证它不会被之后的主节点覆盖,所以这部分之前任期的日志就不能当做已提交或者做了提交也没有用。一直等到 e 中的情况,在当前任期4下,只要 S1 将索引位置3的日志复制到了过半数的节点上,即使此时 S1 异常了,这个日志在未来也能被其他主节点间接提交,因为下一轮的候选节点要么有索引位置3的日志,要么没有索引位置3的日志,而根据日志新旧原则,没有索引位置3的日志的候选节点不可能成为主节点,所以只有那些复制了索引位置3的日志的候选节点才有可能成为新的主节点,因为主节点不会增删日志,所以索引位置2、3的日志必然存在于新的主节点中,一旦新的主节点提交了一个更高任期的日志,索引位置2、3的日志也就被间接提交了。这就保证了在这种方式下索引位置3及其之前的日志不会被覆盖。

因为主节点将之前任期的日志复制到其他从节点时依然保留原始任期信息,所以 Raft 选择在提交日志时增加额外的机制(上述描述的日志提交规则,同时引入了一定的复杂性)来保证安全性。在其他共识算法中,如果一个新的主节点需要复制之前任期的日志,那么就必须以当前任期的名义。Raft 的做法可以很方便的识别每条日志属于哪个任期,因为每条日志的任期不会随时间而改变。另外,相比于其他共识算法,在 Raft 中新的主节点会发送更少的来自之前任期的日志(其他共识算法在提交日志前需要发送冗余的日志来重新编号)。

安全性证明

以完整的 Raft 算法为基础,现在可以更精确的验证 Leader Completeness Property 的正确性。这里使用反证法来证明,假设 Leader Completeness Property 不正确,继而推导出一个矛盾,从而证明假设不成立。假设在任期 T 内某个主节点 leader_T 提交了一条日志,然后这条日志不会出现在新的任期的主节点中。不妨令满足假设的最小的任期为 UU > T),对应任期 U 的主节点为 leader_U,则 leader_U 没有 leader_T 提交的日志。

alt

  1. leader_T 提交的日志必然在选主时就不存在于 leader_U 中,因为主节点不会删除和覆盖日志。
  2. leader_T 将日志复制给了集群中过半数的节点,而 leader_U 从集群中过半数的节点获得了选票,所以两者必然有重合,因此必然存在一个节点即复制了 leader_T 提交的日志,又投票选举了 leader_U 作为新的主节点,在上图中这个节点就是 S3。这个节点是推导出矛盾的关键。
  3. S3 必然是先收到 leader_T 的复制日志请求然后才投票给 leader_U,否则的话如果 S3 先进入新的任期那么它会拒绝来自 leader_TAppendEntries 的请求,因为 S3 的当前任期更大。
  4. S3 在给 leader_U 投票前会完成日志的复制,因为我们这里假定的 U 是最小的一个不包含 leader_T 提交的日志的任期,所以根据这个假定在 [T + 1, U - 1] 之间的主节点都是必然包含 leader_T 提交的日志的,而主节点不会删除或覆盖日志,并且只在不一致的时候删除从节点的日志,所以在任期 U 内,S3 依然是保留 leader_T 提交的日志的。
  5. S3 投票给了 leader_U,所以根据选主规则 leader_U 的日志至少是和 S3 一样新。这就导致了两个矛盾。
  6. 首先,如果 S3leader_U 的最后一条日志的任期相同,那么 leader_U 的日志索引就必然大于等于 S3,则 leader_U 必然就包含 S3 的每一条日志。假设双方最后一条日志的任期是 X,则 T <= X <= U - 1,在这个范围内的主节点都是有 leader_T 所提交的日志的,根据 AppendEntries 请求的日志一致性校验,只要 leader_U 在这期间收到了主节点的请求,那么主节点就会补齐 leader_U 的日志(如果不一致的话),所以 leader_U 必然就包含 S3 的每一条日志。因此这就产生了一个矛盾,因为根据开头的假设 leader_U 是不应该有 leader_T 提交的日志的。
  7. 所以,要想让 leader_U 的日志比 S3 新,那就只能是 leader_U 的最后一条日志的任期大于 S3 的最后一条日志的任期。而这个任期也必然比 T 大,因为 S3 的最后一条日志的任期至少是 T。记这个任期的主节点为 leader_P,根据 Log Matching Propertyleader_U 中到最后一条日志前的日志应该和 leader_P 相同,又因为任期 U 之前的主节点都包含 leader_T 所提交的日志,所以 leader_U 也应该包含 leader_T 所提交的日志,所以又产生一个矛盾。
  8. 至此完成了所有矛盾的证明。因此,所有任期比 T 大的主节点都必然包含 leader_T 在任期 T 内所提交的日志。
  9. Log Matching Property 也保证了后续的主节点也包含了间接提交的日志。

根据 Leader Completeness Property,我们可以证明 State Machine Safety Property,即如果某台服务器将某个索引位置的日志应用到了自身的状态机中,那么不会有任何一台服务器在相同索引位置应用了一条不同的日志到状态机中。如果某台服务器要将某条日志应用到状态机中,那么这台服务器在这条日志之前的日志都是和主节点相同的,而且是已提交的。现在来考虑在某个最小的任期内,某一台服务器要应用某一条日志,根据 Log Completeness Property 的保证,后续更高任期的主节点都会存储这条日志,所以后续节点在后续任期中应用相同索引位置的日志时也必然是应用了相同的日志内容。因此 State Machine Safety Property 是正确的。

最后,Raft 要求各节点以日志的索引顺序来应用日志。结合 State Machine Safety Property,可以得出所有的节点会以相同的顺序应用相同的日志到自身的状态机中。

主节点和候选节点异常

截止目前我们讨论的都是主节点异常。从节点和候选节点异常相比于主节点异常来说更容易处理,而且能以相同的方式处理。当从节点或候选节点异常时,则其他节点发送的 RequestVoteAppendEntries 请求都会失败。Raft 通过无限重试来处理这个问题;如果异常的机器重启了,那么这些 RPC 请求就会成功。如果某台机器完成了某个请求但在响应前异常了,那么它会在重启后收到相同的请求。RaftRPC 请求都是幂等的,所以这种情况不会造成影响。例如,某个从节点收到了 AppendEntries 请求然后发现请求中的日志已经在本地日志中了,那么这个节点就会忽略这个请求。

时间和可用性

Raft 的其中一个要求是安全性的保证不依赖于时间:系统不能因为某些事件发生的快了些或慢了些而产生不正确的结果。然而,可用性(系统可以及时的响应客户端)不可避免的要依赖时间。例如,如果服务器间的信息交换需要的时间大于服务器异常的间隔时间(例如信息交换需要5秒,而每隔2秒服务器就异常了),则候选节点没有足够的时间来赢得选举;而缺少主节点,系统也无法继续运行。

选主是 Raft 中对时间要求最关键的方面。只要遵循如下的时间要求那么 Raft 就能维持选主的正常运行:

1
broadcastTime << electionTimeout << MTBF

其中 broadcastTime 是服务器并行的给集群中的其他服务器发送 RPC 请求并收到响应的平均时间;electionTimeout 是前面描述的选主等待时间,如果在这个时间段内某个节点没有收到来自主节点的心跳,那么它就会启动选主流程;MTBF 则是单台服务器各次异常间的平均间隔时间。broadcastTime 应该比 electionTimeout 小一个量级,这样主节点才能来得及给从节点发送心跳来维持主节点的地位;再结合随机化的 electionTimeout,这种不固定的时间也避免了选主的分票。electionTimeout 应该比 MTBF 小几个量级从而使得系统能平稳运行。当主节点异常时,系统大概会在 electionTimeout 的时间段内不可用,如果 MTBF 足够大就可以避免这个现象频繁发生。

broadcastTimeMTBF 由底层系统决定,而 electionTimeout 则是需要由设计者决定。RaftRPC 请求一般会要求接受者将数据持久化到可靠存储上,根据存储技术的不同 broadcastTime 的范围会在0.5毫秒到20毫秒之间。因此,electionTimeout 的设定范围一般是10毫秒到500毫秒之间。服务器的 MTBF 时间一般是几个月或更久,已经足够满足 Raft 对时间的要求。

集群节点变更

目前为止我们所讨论的都是基于集群配置(参与共识算法的服务器)不变的基础上。而实际上,集群的配置不是一成不变的,有时候就需要修改集群的配置,例如替换掉异常的服务器或者调整复制的级别。虽然操作时可以先下线集群中的全部机器,然后更新配置文件,最后再重启集群,不过在操作期间整个系统都是不可用的。另外,如果其中涉及了人工操作,那么就会有人为错误的风险。为了避免这些问题,Raft 的作者决定将集群配置设计为自动化并整合到 Raft 共识算法中。

为了使集群配置变更是安全的,需要保证在变更期间不会发生在某个任期内有两个主节点的情况。不幸的是,不管怎么样让服务器直接从旧的配置替换为新的配置都是不安全的。因为不可能原子性的将所有服务器一次性的完成配置替换,所以如下图所示,在转换期间整个集群有可能分裂成两个独立的群体。

alt

在上图中,集群由原来3台机器扩展为5台,由于每台服务器实际替换配置文件的时机不同,如红色箭头所示,存在某一时刻集群中可能会有两个主节点,假设此时发生选主,由于在 Server 1 看来集群中的节点数量还是3个,所以它只要获取到 Server 2 的选票就可以声明自己为主节点;而在 Server 5 看来,此时集群中有5个节点,所以它在获取了 Server 3Server 4 的选票后就成为主节点,此时集群中就存在了两个主节点。

为了保证安全性,机器配置变更必须使用两阶段提交。有很多种方式来实现两阶段提交,例如,某些系统在第一阶段会先禁用掉旧的配置从而使系统不再处理客户端请求;然后在第二阶段启用新的配置。在 Raft 中,集群会先切换为一个被称之为 joint consensus 的过渡配置;一旦这个过渡配置被提交,集群就会切换为新配置。joint consensus 包含了新老两套配置:

  • 日志会复制到两套配置下的所有节点中。
  • 新老配置下的节点都可以被选为主节点。
  • 选主和日志提交需要同时获得新老配置下大多数节点的选票。

joint consensus 使得每台服务器能在不同的时间点切换配置而不会破坏 Raft 的安全性。另外,joint consensus 也保障了集群在切换期间能正常响应客户端请求。

alt

集群配置通过日志文件中特殊的日志项来存储和通信,上图描述了配置变更的过程。当主节点收到需要将配置从 C_old 变为 C_new 的请求时,它首先将 joint consensus 的配置 C_old_new 写入到本地日志中,然后将其复制到过半数的从节点中。一旦某个节点将 C_old_new 写入到自己的日志中后,它后续的决定都会基于 C_old_new 的规则(Raft 中的节点始终使用最新的配置来做决策,不管这条日志是否已提交)。所以主节点会根据 C_old_new 需要的规则来决定 C_old_new 状态下的日志是否已提交。如果此时主节点异常,新的主节点可能来自 C_old,也可能来自 C_old_new,这取决于这个候选节点是否收到来自 C_old_new 的复制请求。不过不管哪种情况,在这期间 C_new 下的节点都不可能单方面做决策。

一旦 C_old_new 提交成功,说明集群中过半数的机器都有了 C_old_new 配置,此时不管是 C_old 还是 C_new 的机器都不可能单方面做决策,另外 Leader Completeness Property 也保证了后续没有 C_old_new 日志的节点不可能被选为主节点。所以此时主节点可以开始将 C_new 写入到日志中,然后再复制给其他从节点。同样的,各节点一旦收到 C_new 的配置就会以 C_new 的配置为准做决策。当 C_new 被提交后,旧配置就无关紧要了,那些不在新配置下的机器就可以被下线。在整个变更期间,没有任何一个时刻 C_oldC_new 的节点可以同时做决策,这就保证了安全性。

在配置变更时还有其他一些问题需要指出。第一个问题是新加入的节点一开始可能没有保存任何日志。如果它们以这个状态加入集群,那么它们可能需要很长一段时间来复制日志,可能会造成在这期间主节点无法提交新的日志。为了避免造成可用性问题,Raft 在配置变更前引入了额外的一个阶段,新加入的节点不会参与投票(主节点会将日志发给这些节点,但是在基于过半数节点原则做决策时会忽略这些节点)。当这些新加入的节点的日志追赶上其他节点后,Raft 就开始上面描述的配置变更流程。

第二个问题是在配置变更后当前的主节点可能不再属于新集群(例如归到其他集群或下线)。在这种情况下,一旦 C_new 的配置被提交,那么它就会退回到从节点状态。这说明存在某段时间(提交 C_new 的期间),当前主节点在管理集群时不会把自己计算在内;它在复制日志时依然会进行过半数的节点确认,但这个过半数的节点不包括自己。主节点状态的转变发生在 C_new 提交后,因为只有这个时间点 C_new 才能独立做决策(选出的主节点一定包含 C_new 的配置)。而在这个时间点之前,可能只有 C_old 的节点能被选为主节点。

第三个问题是被删除的节点可能会干扰集群(不在 C_new 中的节点)。因为这些节点不再收到心跳,所以过了 election timeout 后它们会发起选主流程。它们会向其他节点发送新任期的 RequestVote 请求,当前的主节点收到请求后因为新任期比自己的任期大(假设这些被剔除的节点在剔除时所属的任期和主节点相同;不过即使任期比主节点小也没关系,因为每次选主都会增加任期,这些被删除的节点不断的选主然后失败,任期会逐渐增加,最终超过主节点),主节点会认为自己的任期过期了,所以会转为从节点状态。最终系统会选择出一个新的主节点,不过这些被删除的节点又会再次超时然后再次发起选主,从而造成系统可用性问题。

为了避免这个问题,各节点在确认主节点存在的情况下会丢弃 RequestVote 请求。如果某个节点在最小 election timeout 内收到了 RequestVote 请求(前面提到,election timeout 的值是某个区间内的随机值,某个节点在收到主节点的心跳后,在还没有达到最小 election timeout 的时候就又收到 RequestVote 请求,例如 election timeout 的区间为 [150, 300],这里最小 election timeout 指的就是100),则该节点不会更新自己的任期或者投票。这样做并不会影响正常的选主,因为正常的选择至少会等待一个最小 election timeout 时间。不过这样做却能避免那些被删除的节点对集群的干扰,只要主节点能给其他从节点发送心跳,那么它就不会受到被删除的节点所发送的更大的任期的影响。

日志压缩

在和客户端的日常通信中,Raft 节点的日志会逐渐增长,但是在实际的系统中,日志不可能无限增长。当日志越来越多,它占据的空间也越来越大,需要根据日志来重放的时间也越来越多。如果没有一个机制来丢弃过期的日志,那么这最终会导致可用率问题。

快照是压缩日志的最简便的方法。执行快照时,整个系统的状态被写入到保存在可靠存储上的快照里,那么一直到快照执行时的日志都可以被丢弃。快照技术也在 ChubbyZooKeeper 中使用,在本节剩余的内容中将会介绍 Raft 中的快照。

类似 log cleaninglog-structured merge trees 这样的增量手段也能处理压缩日志。它们每次只操作一部分数据,所以就将压缩带来的负载影响随着时间摊平。它们首先会选择一片已经积累了大量被删除或被覆盖的对象的数据区域,然后以更紧凑的方式重写这片区域内存活的对象,最后释放这片区域。这种手段相对于快照来说需要引入额外的机制同时复杂性也更高,而快照通过始终操作整个数据集来简化了问题。log cleaning 技术需要对 Raft 进行修改,状态机可以使用和快照相同的接口来实现 log-structured merge trees

alt

上图展示了 Raft 快照的概念。每台服务器会独立的执行快照,并且只包含已提交的日志。执行快照时大部分的工作是状态机将当前的状态写入到快照中。Raft 同时添加了一小部分元数据到快照中:快照对应的最后一个日志的索引(last included index,状态机已应用的最后一条日志),以及最后一个日志对应的任期(last included term)。这两个元数据信息用于执行快照后的第一次 AppendEntries 请求的一致性检查,因为需要比对前一个日志的索引和任期。同时为了启用集群配置变更,快照中也保存了当前最新的集群配置。一旦服务器完成了快照的写入,那么它就可以删除到快照为止的所有日志,以及以前的快照。

虽然各节点能独立的创建快照,主节点有时必须将快照发给那些落后的从节点。这个发生在主节点已经丢弃了需要发送给从节点的日志的情况下。幸运的是,这种情况在正常情况下不大可能发生:一个时刻和主节点保持同步的从节点已经包含了需要的日志。然而,某个运行异常缓慢的从节点或者新加入集群的从节点可能会缺少很多日志。所以主节点通过直接发送快照的方式来同步从节点的状态。

主节点通过一个新的 RPC 请求 InstallSnapshot 来将快照发送给那些落后太多的从节点。当从节点收到这个请求后,它必须决定如何处理目前已有的日志。一般来说,快照会包含当前从节点还没有的日志。在这种情况下,因为快照捕捉的是整个状态机的状态,说明快照对应的日志范围大于当前从节点的日志范围,所以从节点直接丢弃自己的日志即可。当然从节点的日志可能会包含某些未提交的日志和快照冲突,不过因为是未提交所以也没有问题。不过,如果从节点收到的快照只覆盖了自己日志的前一部分(可能是因为重传或者错传。假设现在 S1 是主节点需要向从节点 S2 发送快照,不过此时主节点 S1 发生异常,快照请求也没有到达 S2,另一个从节点 S3 成为新的主节点,而 S3 的日志还未压缩,所以 S2 通过心跳交互从 S3 获取到了最新的日志,并在之后提交了几条日志;此时 S1 再次成为主节点,因为 Raft 通过 nextIndex 来计算需要发给从节点的日志范围,在 S1 的认知中,并不知道 S2 已经有了最新的日志,S1 会继续判断 rf.lastIncludedIndexnextIndex 的大小,因为 nextIndex 还未更新,所以在 S1 看来,根据 nextIndex 计算出的 prevLogIndex 依然小于 rf.lastIncludedIndex,所以 S1 会认为 S2 需要的日志自己没有,从而继续发送快照信息,而此时的快照只覆盖了 S2 日志的一部分。),那么和快照重合的日志就可以被删除,不过在这之后的日志还依然有效而且也必须保留。

快照违背了 Raft 的强主节点规则(strong leader),因为从节点可以在不知晓主节点的情况下创建快照。不过,作者认为这个做法是合理的。虽然主节点避免了做决策时发生冲突,不过因为快照保存的是已经应用到状态机的日志,本身就没有冲突,所以也就不需要主节点的存在。数据流依然只从主节点流向从节点,只不过有了快照后从节点可以重新组织自己的数据。

作者曾经考虑过另一种基于主节点的快照实现方案:快照只能由主节点创建,然后由主节点将快照发给每一个从节点。不过,这种方案有两个缺点。第一,由主节点发送快照会浪费带宽并且减慢了快照的完成速度。每个从节点本身就有了执行快照所需要的全部信息,从自身状态创建快照相比于从网络接收快照来说成本更低。第二,主节点的实现会更加复杂。例如,主节点需要并行的发送快照和新的日志给从节点,这样才不会阻塞新的客户端请求。

还有两个问题会影响快照的执行速度。第一,服务器必须决定在什么时候进行快照。如果执行快照太频繁,那么就会浪费磁盘 IO 和资源。如果执行快照频率太低,那么它就有耗尽存储空间的风险,以及延长了节点异常重启后重放日志的时间。一个简单的策略是当日志的大小达到一个固定的大小后就执行快照。如果这个大小远大于快照的大小,那么快照所带来的的磁盘 IO 影响就足够小。

第二个性能问题是一次写快照可能会花费较长的时间,所以需要它不能延误正常操作。解决方法是使用写时复制技术(copy-on-write),这样节点也能同时接受新的更新而不会影响快照。例如,由函数式数据结构组成的状态机天然的支持写时复制。或者操作系统的写时复制支持(例如 Linuxfork)可以用来在内存中创建一份整个状态机的快照(Raft 的实现采用了这种方式)。

Raft (2) FAQ 中提到写时复制是如何来实现的:

当某个节点需要写快照时,它会调用 Linuxfork 方法,从而创建了一个子进程,子进程会复制父进程的内存空间,从而复制了状态机的状态。当然,如果单纯的复制内存显然是不现实的,Linux 不会整个复制内存,在这里就采用了写时复制技术,操作系统会将涉及的内存页标记为 copy-on-write,对于父进程和子进程来说都是只读的,当父进程或子进程想要写入某个内存页时会触发缺页中断(page fault),这个时候操作系统才会实际复制这个内存页。

客户端交互

本节描述了客户端如何与 Raft 交互,包括客户端如何找到集群中的主节点以及 Raft 如何支持线性化语义。这几个问题同时适用于其他基于共识的系统,Raft 的解决方案也和其他系统类似。

Raft 的客户端会将所有请求发送给主节点。当客户端首次启动时,它会随机与一台服务器建立连接。如果客户端选中的服务器不是主节点,那么这台服务器就会拒绝客户端的请求并告知客户端它所知道的最近一段时间的主节点(AppendEntries 请求中包含了主节点的网络地址)。如果主节点异常了,那么客户端的请求会超时,然后它会继续随机挑选一台服务器重复上述流程。

Raft 的实现目标是线性化语义(每个操作看起来都是立即执行,在请求和响应之间只执行了一次)。不过,通过对 Raft 的描述可以知道一个客户端的请求有可能被执行多次:例如,主节点在日志提交完成后但是在响应客户端前发生异常,那么客户端就会选择一个新的主节点发起重试,造成请求被执行两次。这个解决方案是让客户端在每次请求时生成一个唯一的编号。这样,状态机就可以跟踪每个请求的最新编号和对应的响应。如果某个节点收到的请求编号已经被处理过了,那么它就立即返回响应的结果而不会重新执行。

只读请求可以不记录日志而直接处理。然而,如果没有其他机制的保证,这有可能会返回过期的数据,因为和客户端通信的主节点已经有可能被其他的主节点所取代。在线性化语义下,Raft 不能够返回过期数据。Raft 需要额外的两个措施在不借助日志的情况下避免返回过期的数据。第一,主节点必须知道最新提交的日志的信息。虽然 Leader Completeness Property 保证了主节点有所有已提交的日志,但是在主节点任期开始的时候,它可能并不知道哪些日志是提交的。为了找到已经提交的日志,主节点需要在任期内提交一条日志。Raft 会要求主节点在任期开始时提交一条 no-op 的日志。Raft (2) FAQ 中解释了为什么这么做:

提交 no-op 日志本质和提交普通的日志没有什么不同,也需要过半数的节点响应,一旦提交完成,那就说明 no-op 之前的日志也必然是被提交的,所以主节点就可以根据这个 no-op 日志为界来知道哪些日志被提交了。

第二,主节点在处理只读请求前必须检查自己还是否是有效的主节点(如果有新的主节点,那么当前主节点持有的信息有可能过期了)。主节点会先和集群中过半数的节点交换心跳来确保自己还是主节点,然后再响应只读请求。另外,主节点也可以在心跳机制的基础上引入租约来确保自己是主节点,不过这就依赖时间来保证正确性(假设时间误差是有界的)。同样的,在 Raft (2) FAQRobert Morris 也提出了自己的设想:

例如主节点在发给其他从节点的心跳中要求在接下来的100毫秒内其他节点不允许成为主节点,那么在接下来的100毫秒内主节点就可以直接处理只读请求而不用担心会有新的主节点。

RPC

这部分的内容来自于论文中的 Figure 2Figure 13,主要描述了 Raft 涉及的 RPC 接口的实现要求以及其他一些系统约束。

状态

本节描述了各个节点内部需要维护的状态。

所有节点都需要持久化的状态(先持久化再响应 RPC 请求):

  • currentTerm:当前节点所处在的最新任期(第一次启动时初始化为0,单调递增),如果这个不持久化,那么节点重启后由于不知道当前的准确任期从而无法发起选主流程,日志中虽然记录了任期但不一定是最新的,例如某个节点的日志可以一直为空但当前任期却一直在增长;同时也可以避免将选票投给任期更低的节点,以及识别出过期的主节点。
  • votedFor:在当前任期内投票的候选节点 id,可以为空,因为 Raft 规定了在某个任期内每个节点最多只能投票一次,为了避免节点投票后发生异常然后重启并继续投票,所以需要持久化。
  • log[]:日志是 Raft 的核心,是状态机重放的保证,必然要持久化。每条日志包含了发给状态机的命令和主节点收到请求时的任期,日志的索引位置从1开始。

所有节点无需持久化的状态:

  • commitIndex:已知目前最远已提交的日志索引(初始化为0,单调递增)。对于主节点来说,只要在重启后提交了一条新的日志(或者自己主动提交一条 no-op 日志),那么这条日志之前的日志都会被间接提交,当前日志的索引就是最新的 commitIndex;对于从节点来说,后面 AppendEntries 中会提到它会发送主节点的最远提交索引,所以没有必要持久化。
  • lastApplied:已知目前最远已应用到状态机的日志索引(初始化为0,单调递增)。对于不同的服务来说,它的状态机不一定是持久化的,例如内存数据库,这样的服务在重启后需要重放所有的日志来恢复状态,所以 lastApplied 从0开始,一直重放到 commitIndex。不过对于持久化的状态机来说,虽然从0开始重放不影响,但是效率太低,持久化 lastApplied 还是有必要的。

主节点无需持久化的状态(选主后重新初始化):

  • nextIndex[]:每个从节点插入下一条日志的索引位置(初始化为主节点最后一条日志的索引位置加1)。这个在前面提到过,根据主节点初始化的值,nextIndex 的有效值可以在多次 RPC 请求之间计算出来,所以没有必要持久化,而且持久化了有可能存在过期的风险。
  • matchIndex[]:每个从节点已复制的日志的最大索引位置(初始化为0,单调递增)。这个值可以结合 AppendEntries 请求计算出来,只要 AppendEntries 成功了,那么 matchIndex 就等于主节点当前日志的索引位置。同样的,持久化了也有可能存在过期的风险。

另外,某个主节点重启后不一定还能当选主节点,持久化 nextIndexmatchIndex 也意义不大。

AppendEntries RPC

AppendEntries 由主节点发送给从节点,用于复制日志和作为心跳探测。

参数:

  • term:主节点的任期
  • leaderId:主节点 id,当客户端连接上一个从节点时,从节点可以将主节点的 id 发给客户端,客户端就能和主节点建立连接
  • prevLogIndex:当前要写入的日志的前一个日志的索引
  • prevLogTerm:当前要写入的日志的前一个日志的任期
  • entries:需要从节点复制的日志(对于心跳来说这个字段为空),从效率考虑可能会包含多条日志,因为从节点的日志可能落后于主节点或者和主节点的不一致,这时就需要补齐从节点的日志
  • leaderCommit:主节点已提交的日志索引

返回:

  • term:当前从节点的任期,主节点收到以后如果发现自己的任期小于从节点的任期,就说明自己是个过期的主节点,从而会转为从节点状态,并更新当前的任期
  • success:如果 prevLogIndexprevLogTerm 和从节点当前的日志状态匹配,则返回 true,否则返回 false;主节点收到 false 就会更新 nextIndex 并继续发送新的 prevLogIndexprevLogTerm 给从节点

接受者实现:

  1. 如果主节点的任期小于从节点的任期,则返回 false
  2. 如果 prevLogIndexprevLogTerm 和从节点当前的日志状态不匹配,则返回 false
  3. 如果新添加的日志和当前某个位置的日志冲突(相同的日志索引,不同的任期),则删除该位置及其之后的日志
  4. 追加所有不在从节点中的日志
  5. 如果 leaderCommit 大于自身的 commitIndex,则更新 commitIndexmin(leaderCommit, 最后一条新的日志的索引)

RequestVote RPC

RequestVote 在选主时由候选节点发起,用于向其他节点获取选票。

参数:

  • term:候选节点的任期
  • candidateId:候选节点的 id,在前面提到每个节点需要持久化 votedFor,从而避免二次投票
  • lastLogIndex:候选节点的最后一条日志的索引
  • lastLogTerm:候选节点的最后一条日志的任期

返回:

  • term:其他节点的任期,如果候选节点发现自己的任期比其他节点的小,那么根据规则它就不能成为主节点,从而退回到从节点状态,并更新当前的任期
  • voteGrantedtrue 表示候选节点获得了一张选票,false 表示没有获得选票

接受者实现:

  1. 如果候选节点的 term 小于自身的任期,则返回 voteGrantedfalse
  2. 如果 votedFor 为空或者等于 candidateId,并且候选节点的日志相比自身的日志一样新或者较新,则返回 voteGrantedtrue

InstallSnapshot RPC

InstallSnapshot 由主节点发起,用于向其他从节点发送快照块(chunk)。主节点始终会按顺序发送 chunk

参数:

  • term:主节点的任期
  • leaderId:主节点 id,当客户端连接上一个从节点时,从节点可以将主节点的 id 发给客户端,客户端就能和主节点建立连接
  • lastIncludedIndex:快照所对应的的最后一条日志的索引,快照执行成功后,这个索引以及之前的日志就都可以被删除
  • lastIncludedTerm:快照所对应的的最后一条日志的任期
  • offset:当前 chunk 在整个快照数据中的偏移位置
  • data[]:当前 chunk 的原始数据,起始于 offset
  • done:如果当前 chunk 是最后一个则为 true,否则为 false

返回:

  • term:当前从节点的任期,主节点收到以后如果发现自己的任期小于从节点的任期,就说明自己是个过期的主节点,从而会转为从节点状态,并更新当前的任期

接受者实现:

  1. 如果主节点的任期小于从节点的任期,则直接返回
  2. 如果当前 chunk 是第一个(offset 为0),则新建一个快照文件
  3. offset 为起点,将 data[] 写入文件
  4. 如果 donefalse,则继续等待下一个 chunk
  5. 当所有 chunk 接受完毕后,保存快照文件,丢弃所有旧的快照文件
  6. 如果 lastIncludedIndex/lastIncludedTerm 对应的日志在自身中存在,则保留 lastIncludedIndex 之后的日志并返回
  7. 删除全部的日志(符合第6条情况的日志除外)
  8. 将快照的内容应用到状态机中,并加载快照中的集群配置

各节点需要遵循的规则

所有节点:

  • 如果 commitIndex 大于 lastApplied,则自增 lastApplied,并应用 log[lastApplied] 到状态机
  • 如果某个 RPC 的请求或响应中的 term T 大于 currentTerm,则设置 currentTerm = T,并转为从节点状态

从节点:

  • 响应来自候选节点和主节点的 RPC 请求
  • 如果在 election timeout 期间内没有收到来自当前主节点的 AppendEntries 的请求以及候选节点的 RequestVote 的请求,则转为候选节点

候选节点:

  • 由从节点转换为候选节点后,开始选主:
    • currentTerm 自增
    • 给自己投票,设置 votedFor 为自己的 id
    • 重置 election timeout
    • 并行给其他节点发送 RequestVote 请求
  • 如果候选节点收到了过半数的选票,则转换为主节点
  • 如果候选节点收到了某个主节点的 AppendEntries 的请求(任期校验有效),则转换为从节点
  • 如果在一个 election timeout 期间内没有完成选主,则回到第一步重新开始选主

主节点:

  • 一旦成为主节点,就开始向其他节点发送空的 AppendEntries 请求(心跳),并在空闲时周而复始的发送,避免其他节点进入选主过程
  • 如果收到来自客户端的请求,则追加一条新的日志,并在将日志应用到状态机后返回结果给客户端
  • 如果最后一条日志的索引大于等于某个从节点的 nextIndex,说明从节点的日志和主节点的日志不一致,则主节点会将 [nextIndex, 当前主节点的最后一条日志的索引] 范围内的日志通过 AppendEntries 请求发给从节点:
    • 如果执行成功,则更新该从节点对应的 nextIndexmatchIndex
    • 如果执行失败,说明当前 nextIndex 还不是最优解,将 nextIndex 减1后继续重试
  • 如果存在某个数字 N 满足 N > commitIndex,且过半数的从节点的 matchIndex >= N,以及 log[N].term 等于 currentTerm,则将 commitIndex 设置为 N。这说明 N 是实质上已提交的日志索引,但是主节点还不知道

参考

介绍

和一般描述的应用级别的主从备份不同,本文描述的是虚拟机的主从备份。主从备份作为一种常见的容错实现手段,当主节点异常时,从节点能取代主节点从而保证系统依然可用。作为从节点,它的状态必须尽可能的与主节点随时保持一致,这样当主节点异常时从节点能马上取代主节点,而客户端也不会感知到异常,同时也没有数据丢失。其中一种同步主从节点状态的方式是持续将主节点的所有修改发送给从节点,这里的修改包括 CPU、内存以及 IO 设备。然而,采用这种同步方式需要大量的网络带宽,尤其是发送内存的修改。

另一种只需要耗费少量带宽的方式是状态机(state machine)同步。该方法将主从同步抽象为确定性状态机(deterministic state machine)同步问题,在确定性状态机模型下,对于两个初始状态一样的状态机来说,按照相同的顺序执行相同的一系列输入指令后,最后的状态也一定是相同的。然而,对于大部分的服务来说,存在某些非确定性的操作,例如生成一个随机数,这时候就需要额外的协调使得主从间依然是同步的,即从节点也要生成一模一样的随机数。不过,处理这种情况所需要维护的额外信息相比于主节点状态的修改(主要是内存的修改)来说不值一提。

对于物理机来说,随着主频的增加,同步主从间的确定性操作也愈发困难。然而对于运行在 hypervisor 上的虚拟机来说却非常适合实现状态机同步。一个虚拟机本身就可以看做一个明确的状态机,它的所有操作就是被虚拟化的机器的操作(包括所有的设备)。和物理机一样,虚拟机也存在一些非确定性的操作(例如读取当前时间或者发送一个中断),所以也需要发送额外的信息给从节点来保证主从同步。因为 hypervisor 掌管着虚拟机的执行,包括发送所有的输入给被虚拟化的机器,所以它能捕获到执行非确定性操作的所有需要的信息,从而能正确的在从节点上执行重放操作。

因此,基于状态机同步的主从同步方式可以在不需要修改硬件的情况下在廉价的硬件上实现,使得容错技术适用于最新的微处理器。另外,对带宽较低的要求使得长距离的虚拟机主从同步成为了可能。例如,可以在跨校园间不同的物理机上做主从同步,相比于同大厦内的主从同步更为可靠。

目前在 VMware vSphere 4.0 平台上已经实现了这种容错技术,该平台能高效完整的虚拟化 x86 架构的机器。因为 VMware vSphere 实现了一个完全的 x86 虚拟机,所以可以自动的对任何 x86 的操作系统和应用提供容错支持。通过确定性重放(deterministic replay),系统可以记录下主节点的执行并且确保能在从节点执行相同的操作。VMware vSphere Fault Tolerance (FT) 在此基础之上增加了额外的功能和协议来支持构建一个可完全容错的系统。除了对硬件的容错外,当主节点异常时,系统能自动的在本地集群中启动一台可用的从节点来接管主节点。在该篇论文发表的时候,确定性重放技术和 VMware FT 仅支持单核的虚拟机。受限于严重的性能问题,多核虚拟机的重放支持仍在进行中,因为在多核场景下,几乎每一个对共享内存的访问都是一个非确定性的操作。

BressoudSchneider 针对惠普的 PA-RISC 平台的虚拟机容错做了个原型实现。VMware 的实现与其类似,不过出于性能的考虑做了些根本的修改以及调研了一些其他实现方案。另外,为了能构建一个高效、可用的容错系统来支持用户的企业级应用,VMware 还设计和实现了许多其他组件以及解决一些实际的问题。和大多数实际的系统要解决的问题一样,这里的容错针对的是 fail-stop 的异常,即在造成外部可见的不正确的行为前可被监测到的异常,例如磁盘空间不足、网络无法连通等等,而诸如应用程序的 bug 或者人为失误等则不属于 fail-stop 异常,系统也无法进行容错。

基础设计

alt

上图展示了支持容错的虚拟机的基本配置。对于每一台需要支持容错的虚拟机(primary VM),系统会在其他物理机上同时运行一台备份虚拟机(backup VM),备份虚拟机和主虚拟机会保持同步,并执行和主虚拟机相同的指令,不过会存在一定的延迟。这两台虚拟机被称为处于 virtual lockstep 状态。同时,虚拟机连接着相同的共享存储,输入和输出都可以被主从虚拟机访问。不过,只有主虚拟机才会暴露在网络中,所以所有的网络输入都只会发送给主虚拟机。同样的,其他所有的输入(例如键盘和鼠标输入)也都只会发送给主虚拟机。

主虚拟机收到的所有输入都会通过 logging channel 发送给从虚拟机。对系统来说,主要的输入负载就是网络和磁盘。为了保证从虚拟机能和主虚拟机执行相同的非确定性操作,还需要发送一些额外的信息给从虚拟机。从结果上来说,从虚拟机会始终执行和主虚拟机相同的操作。不过,所有从虚拟机的输出都会被 hypervisor 丢弃,只有主虚拟机的输出才会返回给客户端。后面会提到,主从虚拟机间的通信会遵循一个特定的协议,包括从虚拟机对消息的确认,来保证当主虚拟机异常时不会发生数据丢失。

为了监测主虚拟机或者从虚拟机是否发生异常,系统会通过和主从虚拟机间的心跳以及 logging channel 的流量来判断。另外,系统必须保证在任一时间只有一台主虚拟机或者从虚拟机作为对外执行的入口,即使主虚拟机和从虚拟机间失联发生脑裂的场景。

确定性重放的实现

在前面提到过,主从虚拟机的同步可以抽象为确定性状态机同步问题。如果两个确定性状态机以相同的初始状态启动,并且按照相同的顺序执行相同的输入,那么这两个状态机会经历相同的状态流转并输出相同的结果。一台虚拟机会有一系列的输入,包括网络包,磁盘读取,以及键盘和鼠标输入。而非确定性的事件(例如虚拟中断(virtual interrupts))和非确定性的操作(例如读取当前处理器的时钟周期数)也会影响虚拟机的内部状态。这就给重放执行一台运行着任意操作系统和任意服务的虚拟机带来了3个挑战:

  1. 需要正确的捕捉到主虚拟机的所有输入和非确定性的操作
  2. 需要正确的将输入和非确定性操作在从虚拟机上重放
  3. 不能影响系统性能

另外,x86 处理器中很多复杂的操作往往伴有副作用,因此也是非确定性的操作,如何捕捉到这些非确定性的操作并正确的在从虚拟机上重放也是一个挑战。

VMware vSphere 平台为 x86 虚拟机提供了上述的重放功能。确定性重放技术会将主虚拟机的所有输入和所有可能的非确定性操作写入到日志中。从虚拟机就可以读取日志并执行和主虚拟机一样的操作。对于非确定性的操作来说,系统会写入一些额外的信息来保证重放时生成相同的虚拟机状态和输出。对于非确定性的事件例如计时器或者 IO 完成中断,在事件发生时所执行的指令也会记录在日志中。在重放时,事件会和指令一同出现在指令流中。借助联合 AMDIntel 开发的 hardware performance counter(一组特殊寄存器用来记录硬件相关事件发生的次数)和其他技术,VMware 确定性重放技术能高效的记录和重放非确定性的事件。

BressoundSchneider 在其实现中提到将虚拟机的执行以 epoch 为单位进行切分,其中所有的非确定性操作例如中断都放在 epoch 的最后。这个想法是出于批量处理的考虑,因为单独将每一个中断和中断发生时对应的指令重放执行代价较大。不过 VMware 的实现足够高效使得不需要借助 epoch 来实现确定性重放,每一个中断都能被准确的记录并伴随着发生中断时的指令一起执行。

FT 协议

VMware FT 使用确定性重放技术将主虚拟机的执行流记录到日志中,不过主虚拟机并不是将日志写入到磁盘上,而是通过 logging channel 将日志发送给从虚拟机。从虚拟机能实时的读取日志并将其重放,从而执行和主虚拟机一样的操作。然而,双方在 logging channel 的通信必须遵循 FT 协议来保证容错。其中一条基本的要求是:

Output Requirement:当主虚拟机异常,从虚拟机接管执行时,从虚拟机的输出必须和先前主虚拟机已经发送给客户端的输出一致。

当异常发生时(主虚拟机异常,从虚拟机接管执行),从虚拟机的执行可能会和在没有异常发生时主虚拟机的执行不同,因为在执行时会有很多非确定性的操作。然而,只要从虚拟机的输出满足 Output Requirement,则在主从切换时就不会有外部可见的状态或者数据丢失,而客户端也不会感知到中断或者服务的不一致。

通过主虚拟机的延迟输出,保证从虚拟机确认收到了所有日志后,主虚拟机才将输出返回给客户端来实现 Output Requirement。一个先决的条件是主虚拟机在执行输出操作前,从虚拟机必须已经收到所有的日志。这些日志能保证从虚拟机执行到主虚拟机最新的执行点。然而,假设当主虚拟机刚开始执行输出操作时发生了异常,此时发生了主从切换,从虚拟机必须先将未处理完的日志进行重放,然后才能 go live(不再执行重放,接管成为主虚拟机)。如果在这之前从虚拟机 go live,可能会有一些非确定性的事件(例如计时器中断)在从虚拟机执行输出操作前改变了执行的路径。

针对上述的要求,最简单的实现 Output Requirement 的方式是为每一条输出操作创建一条特殊的日志,从而可以通过以下规则来保证 Output Requirement

Output Rule:在从虚拟机确认收到输出操作对应的日志前,主虚拟机不能执行输出操作。

如果从虚拟机收到了所有的日志,包括输出操作对应的日志,那么从虚拟机就能重放出和主虚拟机在执行输出操作时一模一样的状态,当主虚拟机异常时,从虚拟机就能恢复到主虚拟机执行输出操作前一致的状态。相反的,如果从虚拟机在没有收到日志前就接管了主虚拟机的操作,那么它的状态就和主虚拟机不一致,从而导致最终的输出也不一致。

注意 Output Rule 并没有要求在从虚拟机确认收到输出日志前停止主虚拟机的执行。这里只是延迟了主虚拟机的输出,它依然可以执行其他指令。因为操作系统以异步中断的方式来通知非阻塞网络和磁盘输出的完成,主虚拟机可以在这期间轻易的执行其他指令,而不用阻塞等待。相反的,在其他的一些实现中,在从虚拟机确认收到输出日志前,主虚拟机必须完全停止等待。

alt

上图展示了 FT 协议的要求。主虚拟机到从虚拟机的箭头表示日志的发送,从虚拟机到主虚拟机的箭头表示日志的确认。所有异步事件,输入和输出的操作都必须发送给从虚拟机并得到确认。只有当从虚拟机确认了某条输出操作的日志后,主虚拟机才能执行输出操作。所以只要遵循了 Output Rule,从虚拟机在接管执行时就能保持和主虚拟机一致的状态。

不过在异常发生时,VMware FT 并不能保证所有的输出只发送一次。在缺少两阶段提交的帮助下,从虚拟机不能知晓主虚拟机在发送某条输出之前还是之后发生了异常。不过,网络协议(包括常见的 TCP 协议)在设计时就已经考虑了包的丢失和重复包的情况,所以这里无需特殊处理。另外,在主虚拟机异常时发送给主虚拟机的输入也有可能丢失,因此从虚拟机也会丢失这部分的输入。不过,即使在主虚拟机没有异常的情况下,网络包本身就有可能丢失,所以这里同样也不需要特殊处理,不管是网络协议、操作系统还是应用程序,在设计和编写时本身已经考虑到了包丢失的情况。

监测和响应异常

之前提到过,当主虚拟机或者从虚拟机发生异常时,双方都必须能快速响应。当从虚拟机异常时,主虚拟机会进入 go live 模式,即不再记录执行日志,以常规的方式执行。当主虚拟机异常时,从虚拟机也会进入 go live 模式,不过相比于主虚拟机略微复杂些。因为从虚拟机在执行上本身就落后于主虚拟机,在主虚拟机异常时,从虚拟机已经收到和确认了一部分执行日志,但是还没有执行重放,此时从虚拟机的状态和主虚拟机还是不一致的。所以,从虚拟机必须先将暂存的日志进行重放,当所有重放都执行完成后,从虚拟机就会进行 go live 模式,正式接管主虚拟机(此时缺少一个从虚拟机)。因为此时这台从虚拟机不再是从虚拟机,被虚拟化的操作系统所执行的输出操作都会发送给客户端。在这个转换期间,可能也需要某些设备执行一些特定的操作来保证后续输出的正确性。特别是对于网络输出来说,VMware FT 会自动的将新的主虚拟机的 MAC 地址在网络中广播,使得物理交换机知道最新的主虚拟机的地址。另外,后文会提到新的主虚拟机可能会重新发起一些磁盘 IO 操作。

有很多种方式来监测主虚拟机和从虚拟机的异常。VMware FT 通过 UDP 心跳来监测启用了容错的虚拟机是否发生了异常。另外,VMware FT 还会监控 logging channel 中的流量,包括主虚拟机发送给从虚拟机的日志,以及从虚拟机的消息确认回执。因为操作系统本身存在时钟中断,所以理论上来说 logging channel 中的流量应该是连续不断的。因此,如果监测到 logging channel 中没有流量了,那么就可以推断出某台虚拟机发生了异常。如果没有心跳或者 logging channel 中没有流量超过一段指定的时间(近似几秒钟),那么系统就会声明这台虚拟机发生了异常。

然而,这种异常监测机制可能会引发脑裂问题。如果从虚拟机不再接收到来自主虚拟机的心跳,那么有可能说明主虚拟机发生了异常,但也有可能只是双方间的网络断开。如果此时从虚拟机进入 go live 模式,由于此时主虚拟机依然存活,就有可能给客户端造成数据损坏或其他问题。因此,当监测到异常时必须保证只有一台主虚拟机或者从虚拟机进入 go live 模式。为了解决脑裂问题,VMware FT 借助了虚拟机所连接的共享存储。当主虚拟机或者从虚拟机希望进入 go live 模式时,它会向共享存储发起一个原子性的 test-and-set 操作。如果操作成功,那么当前虚拟机可以进入 go live 模式,如果操作失败,说明已经有其他虚拟机先进入了 go live 模式,所以当前虚拟机就将自己挂起。如果虚拟机访问共享存储失败,那么它会一直等待直到访问成功。如果共享存储由于网络问题造成无法访问,那么虚拟机本身也做不了什么因为它的虚拟磁盘就挂载在共享存储上。所以使用共享存储来解决脑裂问题不会带来其他可用性问题。

最后一个设计的点是如果虚拟机发生了异常,使得某台虚拟机进入了 go live 模式,那么 VMware FT 会自动在其他物理机上启动一台新的备份虚拟机。

FT 的实际实现

上节主要描述了 FT 的基础设计和协议。然而,为了构建一个可用,健壮和自动化的系统,还需要设计和实现很多其他的组件。

启动和重启 FT 虚拟机

其中一个至关重要的组件是如何启动一台有着和主虚拟机一模一样状态的备份虚拟机。这个同时也会在当某台虚拟机异常需要重新启动一台备份虚拟机时用到。因此,这个组件必须能够在主虚拟机处于任意状态的时候复制一台一模一样的备份虚拟机(而不仅仅是初始状态)。另外,这个启动操作还不能影响到主虚拟机的执行,因为这有可能影响到当前所有连接着的客户端。

对于 VMware FT 来说,它借用了 VMware vSphere 平台已有的 VMotion 的功能。VMware VMotion 能以极小的代价将一台运行中的虚拟机迁移到另一台机器上——虚拟机的暂停时间一般在一秒内。VMware FTVMotion 做了些改动使得在不销毁当前虚拟机的情况下,在远程服务器上复制一台和当前虚拟机一模一样的虚拟机。即修改版的 FT VMotion 做的是虚拟机的复制而不是迁移。FT VMotion 也会同时建立一条 logging channel,源虚拟机作为主虚拟机就会将执行日志写入到 logging channel,而复制后的虚拟机作为从虚拟机就会读取日志开始重放执行。和常规的 VMotion 一样,FT VMotion 也能将对主虚拟机的暂停控制在一秒以内。因此,对某台运行中的虚拟机开启 FT 功能非常简单,且没有破坏性。

另一个启动一台备份虚拟机要考虑的点是选择在哪台物理机上启动。支持容错的虚拟机运行在某个可访问共享存储的集群中,所以本质上虚拟机可以运行在任意一台机器上。这个灵活性使得 VMware vSphere 可以轻易的为一台或多台异常的虚拟机启动新的备份虚拟机。VMware vSphere 实现了一套集群服务来管理集群中的资源。当虚拟机发生异常主虚拟机需要一台新的虚拟机来维持容错时,主虚拟机会通知集群服务需要一台新的备份虚拟机。此时集群服务会根据资源利用率和其他条件来决定在哪台机器上重启新的备份虚拟机,然后会由 FT VMotion 创建一台新的备份虚拟机。一般来说,VMware FT 可以在发生异常的几分钟内恢复某台虚拟机的冗余功能,而不会造成任何中断。

管理 Logging Channel

在管理 logging channel 的流量上有很多有趣的实现细节。在 VMware 的实现里,hypervisor 为主虚拟机和从虚拟机维护了一大块日志缓冲区。在主虚拟机执行时,它将执行日志发送到日志缓冲区中,类似的,从虚拟机从日志缓冲区中消费日志。每当主虚拟机的日志缓冲区中有数据,系统就会将其发送到 logging channel 中,相应的在另一边则会将其放入到从虚拟机的日志缓冲区中。当从虚拟机从 logging channel 中读取到日志并将其放入日志缓冲区后,它就会向主虚拟机发送一个已读消息的回执。VMare FT 就可以根据这个已读回执决定哪些输出操作可以执行了。下图展示了这个过程:

alt

当从虚拟机从日志缓冲区读取不到任何日志时(日志缓冲区为空),它就会暂停执行直到下一条日志到达。因为从虚拟机不和外界交互,这个暂停对客户端没有任何影响。类似的,如果主虚拟机往日志缓冲区中写日志发现日志缓冲区满时,它必须停止执行直到日志缓冲区被消费。这个暂停保证了主虚拟机以一个可控的速度生产执行日志。不过,这个暂停会影响客户端的响应,直到主虚拟机可以继续写日志并恢复执行。所以,在实现时必须考虑如何尽量避免主虚拟机的日志缓冲区写满。

其中一个原因造成主虚拟机的日志缓冲区写满是因为从虚拟机执行的太慢从而造成消费日志太慢。一般来说,从虚拟机必须以和主虚拟机记录执行日志一样的速度来执行重放。幸运的是,在 VMware FT 的实现下,记录执行日志和重放所需要的时间基本是相同的。不过,如果从虚拟机所在的机器存在和其他虚拟机资源竞争(资源超卖),不管 hypervisor 的虚拟机调度多么高效,从虚拟机都有可能得不到足够的 CPU 和内存资源来保证和主虚拟机一样的速度执行重放。

除了主虚拟机日志缓冲区满造成的不可控暂停外,还有一个原因也要求主从虚拟机间的状态不能差太远。当主虚拟机异常时,从虚拟机必须尽快的将所有的执行日志进行重放,达到和主虚拟机一样的状态,然后接管主虚拟机向客户端提供服务。结束重放的时间基本上等于异常发生时从虚拟机落后主虚拟机的时间,所以从虚拟机进入 go live 模式需要的时间就基本上等于检测出异常的时间加上当前从虚拟机落后的时间。因此,从虚拟机不能落后主虚拟机太多(大于一秒),否则这会大大增加故障切换的时间。

因此,VMware FT 有另一套机制来保证从虚拟机不会落后主虚拟机太多。在主从虚拟机间的通信协议里,还会发送额外的信息来计算两者间的执行时间差。一般来说这个时间差在100毫秒以内。如果从虚拟机开始明显落后主虚拟机(例如大于1秒),那么 VMware FT 会通知调度器降低主虚拟机的 CPU 资源配额(初始减少几个百分点)来延缓主虚拟机的执行。VMware FT 会根据从虚拟机返回的落后时间来不断调整主虚拟机的 CPU 资源配额,如果从虚拟机一直落后,那么 VMware FT 会逐渐减少主虚拟机的 CPU 资源配额。相反的,如果从虚拟机开始赶上了主虚拟机的执行速度,那么 VMware FT 会逐渐增加主虚拟机的 CPU 资源配额,直到两者的执行时间差到达一个合理的值。

不过在实际场景中减慢主虚拟机执行的速度非常少见,一般只会发生在系统承受极大负载的情况下。

FT 虚拟机上的操作

另一个实际中要考虑的问题是处理针对主虚拟机的一系列控制操作。例如,如果主虚拟机主动关机了,从虚拟机也需要同步关机,而不是进入 go live 模式。另外,所有对主虚拟机的资源修改(例如增加 CPU 资源配额)都必须应用到从虚拟机上。针对这些操作,系统会将其转化为特殊的执行日志发送到 logging channel,从虚拟机收到后也会将其正确的重放。

一般来说,大部分对虚拟机的操作都只应该在主虚拟机上发起。然后 VMware FT 会将其转化为日志发送给从虚拟机来进行重放。唯一可以独立的在主虚拟机和从虚拟机上执行的操作是 VMotion。即主虚拟机和从虚拟机都可以独立的被复制到其他机器上。VMware FT 保证了在复制虚拟机时不会将其复制到一台已经运行了其他虚拟机的机器上,因为这无法提供有效的容错保证。

复制一台主虚拟机要比迁移一台普通的虚拟机复杂些,因为从虚拟机需要先和主虚拟机断开连接,然后之后在合适的时间和新的主虚拟机建立连接。从虚拟机的复制有类似的问题,不过也略微复杂些。对于普通的虚拟机迁移来说,系统会要求当前所有进行中的磁盘 IO 在切换前执行完成。对于主虚拟机来说,它可以等待所有进行中的磁盘 IO 的完成通知。而对于从虚拟机来说,它不能简单的在某个时间让所有的磁盘 IO 都结束,因为从虚拟机还在执行重放,也需要等待重放涉及的磁盘 IO 完成。而另一方面,主虚拟机在执行时有可能一直伴有磁盘 IOVMware FT 有一个特有的方法来解决这个问题,当从虚拟机在执行切换前,会通过 logging channel 向主虚拟机发送请求要求暂时结束所有的磁盘 IO,主虚拟机收到请求后会天然的将其转化为执行日志发送到 logging channel,从虚拟机的磁盘 IO 也因此会伴随着重放而终止。

磁盘 IO 的实现问题

VMware 在实现时遇到了一些和磁盘 IO 相关的问题。首先,因为磁盘操作是非阻塞的所以可以并行执行,同一时间访问同一块磁盘区域就会导致非确定性结果。另外,磁盘 IO 会通过 DMADirect memory access) 来直接访问虚拟机的内存,因此同一时间的磁盘操作如果访问到了同一块内存页也会导致非确定性结果。VMware 通过监测这样的 IO 竞争(实际场景中极少)并强制让其在主虚拟机和从虚拟机中串行执行来解决这个问题。

第二,磁盘操作访问内存可能和应用程序访问内存产生竞争,因为磁盘操作可以通过 DMA 直接访问内存。例如,如果应用程序正在访问某个内存块,而此时磁盘也在写入这个内存块,则会发生非确定性的结果。虽然这种情况同样很少,但也仍然需要能够监测并解决。其中一种解决方案是当磁盘正在访问某个内存页时暂时对该页设置页保护。当应用程序尝试访问页保护的内存页时,会触发一个陷阱(trap)使得操作系统能够暂停执行直到磁盘操作完成。不过,修改 MMUMemory management unit)的页保护机制代价较大,VMware 借助 bounce buffer 来解决这个问题。bounce buffer 是一块和磁盘操作正在访问的内存一样大小的临时缓冲区。磁盘的读操作被修改为磁盘先将数据复制到 bounce buffer 中,然后虚拟机读取 bounce buffer 中的数据,而且只有在磁盘 IO 完成后才会将 bounce buffer 中的数据复制到虚拟机内存中。类似的,对于磁盘写操作,数据会先写到 bounce buffer 中,磁盘再将数据从 bounce buffer 中复制到磁盘上。使用 bounce buffer 的情况下会降低磁盘的吞吐,不过实际中还没有发现造成可见的性能损耗。

第三,还有些问题发生于主虚拟机正在执行某些磁盘 IO(未完成),然后主虚拟机异常,从虚拟机接管执行。对于从虚拟机来说,它并不知道这些磁盘 IO 是否已发送给磁盘或者已成功执行。另外,由于这些磁盘 IO 还没有在从虚拟机上发起过,所以也不会相应的 IO 完成的通知,但是在操作系统的角度指令已经发出了,但是从虚拟机上的操作系统也收不到 IO 完成的通知,最终会终止或者重置这个过程。在这种情况下,系统会为每一个 IO 操作发送一个失败的通知,即使某个 IO 操作实际成功了而返回失败也是可以接受的。然而,由于从虚拟机上的操作系统可能不能很好的响应 IO 失败的通知,所以在从虚拟机 go live 阶段会重新发起这些 IO 请求。因为系统已经消除了并发 IO 间的竞争,所以这些 IO 操作可以重新发起即使它们之前已经成功执行了(操作是幂等的)。

网络 IO 的实现问题

VMware vSphere 为虚拟机的网络提供了很多的性能优化。部分优化基于 hypervisor 能异步更新虚拟机的网络设备的状态。例如,在虚拟机还在执行时 hypervisor 就可以直接更新接收缓冲区。不过,这种异步更新同样也带来了非确定性。除非能保证在异步更新的时间点在主虚拟机上执行的指令能严格一致的在从虚拟机上重放,否则从虚拟机的状态就会和主虚拟机不一致。

FT 中对网络代码改动最大的一块就是禁止了异步网络更新优化。从原先的异步更新缓冲区修改为强制陷入到陷阱(trap)中,hypervisor 响应后将更新记录到日志中,然后再将其应用到虚拟机上。类似的,异步从缓冲区中拉取数据包也同样被禁止了,同样由 hypervisor 托管执行。

消除了异步的网络设备状态更新以及前面所提到的 Output Rule 带来的延迟输出,为网络性能优化又提出了挑战。VMware 通过两个方面来优化网络性能。第一,实现了集群优化来减少虚拟机的陷阱和中断。当虚拟机在接收网络包时,hypervisor 可以在一个陷阱、中断内处理多组数据包来减少陷阱和中断的次数。

第二个网络优化是降低数据包延迟发送的时间。前面提到,只有当从虚拟机确认收到所有输出操作的日志后,主虚拟机才能执行输出操作。所以减少数据包延迟发送的时间等价于减少日志发送和确认的时间。这里主要的优化点是发送和接受消息回执的过程中确保不会触发线程切换。另外,当主虚拟机将某个数据包加入发送队列时,系统会强制刷新主虚拟机的日志缓冲区到 logging channel 中。

其他设计方案

本节主要描述了 VMware 在实现 VMware FT 时调研和考虑的其他一些方案。

共享磁盘和非共享磁盘

在当前的实现中,主虚拟机和从虚拟机共享一个存储。因此当发生主从切换时,从虚拟机上的数据天然是和主虚拟机上的数据一致的。共享存储相对于主从虚拟机来说是一个外部系统,所以任何对共享存储的写入都被视为和外部的通信。因此,只有主虚拟机被允许写入到共享存储,而且写入必须遵循 Output Rule 规则。

另一种设计是主虚拟机和从虚拟机各自有一套独立的存储。在这个设计中,从虚拟机会将所有的输出写入到自己的存储中,所以从虚拟机的数据也能和主虚拟机保持同步。下图展示了这种设计下的配置:

alt

在非共享磁盘的场景下,每个虚拟机的存储被视为该虚拟机内部状态的一部分。因此,虚拟机的输出没有必要遵循 Output Rule 规则。非共享磁盘的设计在主从虚拟机无法访问共享存储时很有用。这可能时因为共享存储不可用或者过于昂贵,或者主从虚拟机间的物理距离太长。这种方案的一个主要的缺点是主从虚拟机开启容错时必须将双方的磁盘进行初始化同步。另外,双方的磁盘也有可能在异常发生后造成不同步,所以当备份虚拟机重启后,双方磁盘需要再次同步。所以,FT VMotion 不只要同步主从虚拟机的状态,还要同步磁盘的状态。

在非共享磁盘的场景下,系统就不能借助共享存储来解决脑裂问题。在这种场景下,系统可借助其他的外部组件,例如某个主从虚拟机都可以连接的第三方服务器。如果主从服务器属于某个多于两个节点的集群,那么就可以使用某个选举算法来选择谁能进入 go live 模式。在这种场景下,如果某台虚拟机获得了大多数节点的投票,那么它就可以进入 go live 模式。

在从虚拟机上执行磁盘读操作

在当前的实现中,从虚拟机从来不会从虚拟磁盘中读取数据(不论是共享存储还是非共享存储)。因为磁盘读也是一种输入,所以会自然的将磁盘读取后的结果以日志的形式通过 logging channel 发送给从虚拟机。

另一种设计是涉及到磁盘读操作的执行由从虚拟机自行从磁盘中读取,主虚拟机不再向 logging channel 中发送从磁盘读取到的数据。对于磁盘密集型的系统来说,这种设计可以大幅降低 logging channel 的负载。不过,这种设计也存在一些问题。这可能会降低从虚拟机的执行速度,因为从虚拟机在重放时必须先等待所有依赖的磁盘 IO 操作完成。

另外,当磁盘读失败时还需要额外的处理。如果主虚拟机读取磁盘成功了,而从虚拟机读取磁盘失败了,那么从虚拟机就需要进行重试直到成功,因为从虚拟机必须拿到和主虚拟机内存中一样的数据。相反的,如果主虚拟机读取磁盘失败了,则相应主虚拟机中目标内存中的数据就必须通过 logging channel 发送给从虚拟机,因为此时主虚拟机中内存中的数据是不确定的。

最后,在共享存储的模式下如果让从虚拟机执行磁盘读也有个问题。如果主虚拟机在某个磁盘区域执行了读操作,然后马上又对相同的区域执行了写操作,那么这个写操作就必须等到从虚拟机读取完成后再执行。这个依赖也能够被监测和正确处理,不过也给系统实现增加了额外的复杂度。

在实际性能测试中,从虚拟机执行磁盘读操作的情况下应用程序的吞吐会降低 1-4%,但是也能有效降低 logging channel 的负载。因此,在 logging channel 的带宽有限的情况下,可以考虑使用从虚拟机磁盘读操作。

参考

介绍

MapReduce: Simplified Data Processing on Large Clusters 中提到,MapReduce 任务的输入输出构建在 GFS 之上,GFSGoogle 内部开发的一个分布式文件系统,用于应对大型的数据密集型应用。在 GFS 之前,业界已经存在了一些分布式文件系统的实现,为什么 Google 还要再实现一套?因为基于 Google 内部应用的特点,有别于传统的分布式文件系统,除了考虑性能、可扩展性、可靠性和可用性之外,GFS 在设计时还考虑了以下三个方面:

  1. 组件异常经常出现而不是偶尔出现。GFS 构建在成百上千台廉价的机器上,并同时被同等数量的客户端访问。在这个量级规模下,在任何时候某个组件都有可能发生异常以及发生异常后无法自动恢复。这里的异常不只包括硬件的异常,还包括软件的异常以及人为的错误。因此,对于异常的监控和检测,容错,以及异常的自动恢复是系统不可或缺的一个部分。
  2. 以传统的分布式文件系统的视角来看,Google 要处理的都是大文件,几个G的文件随处可见。
  3. 对于 Google 的数据应用来说,大部分对文件的写操作是追加操作而不是覆盖操作。对文件的随机写几乎可以说是不存在。文件一旦写入完成后,基本上就不会被再次修改,剩下的都是读操作,且大部分场景下是顺序读。MapReduce 系统就是这个应用场景的典型例子,map 任务持续顺序追加生成中间结果文件,reduce 任务不断的从中间结果文件中顺序读取。根据这个特点,追加写就成为了系统性能优化以及写操作原子性保证的主要设计方向。
  4. 将应用程序和文件系统的 API 协同设计有利于增加系统的灵活性。例如,GFS 提供了原子性的追加写操作,多个客户端可以并发追加写,而无需在应用程序层面进行加锁。

设计

假设

本节主要描述了 GFS 设计的几个出发点:

  • 整个系统构建在一批廉价且经常出现异常的硬件上,所以设计上必须考虑对异常的监测和容错,以及异常发生后能够恢复到一个合适的程度。
  • 需要能够高效管理几个G的大文件,系统也支持存储小文件,不过不会对其作特殊的优化。
  • 系统需要支持两种文件读取方式:大量的流式读和少量的随机读。在大量的流式读场景下,每次读取大小一般在几百KB,或者1MB或更多。来自同一个客户端的连续读取一般是读取文件的某一段连续内容。而少量的随机读一般是在文件的任意位置读取几KB。对于性能敏感的应用程序来说,一般会将多个随机读根据读取的位置排序后批量读取,避免磁盘来回的随机寻址。
  • 系统需要支持大量连续的文件追加写操作。对文件追加写的单次大小一般和流式读的单次大小差不多。文件一旦写入完成后就几乎不会再被修改。系统同样需要支持少量的随机写,不过和少量的随机读类似,随机写也不强调性能。
  • 当有多个客户端对同一个文件进行追加写的时候,系统必须能高效的实现清晰明确的执行语义,例如是保证每个客户端至少成功追加写一次,还是至多一次或者其他。GFS 中的文件经常会充当某个生产者消费者场景下的缓冲队列,一边会有多个客户端不断往文件中追加写,一边也会有一个客户端同时进行流式读取(或在写入完成后读取),所以系统必须保证追加写操作的原子性。
  • 整个系统的吞吐的重要性大于延迟,大部分 Google 的应用程序主要是大批量的文件处理,只有少量的程序会对单次的读或写有性能要求,所以系统的吞吐是第一位。

接口

虽然 GFS 并未完全实现标准的文件系统 API,如 POSIX,但仍提供了常见的文件系统接口,如创建(create)、删除(delete)、打开(open)、关闭(close)、读取(read)和写入(write)文件。类似于本地文件系统,文件在 GFS 内通过树状目录的形式组织,每个文件通过文件路径来唯一确定,不过这里的目录是逻辑上的概念,并不会映射到某个物理文件系统目录上。

除此之外,GFS 还支持快照(snapshot)和追加写(record append)的操作。快照能够低成本的复制一个文件或一个目录。追加写允许多个客户端并发的对同一个文件追加写入,并保证每个客户端写入的原子性。

架构

一个 GFS 集群由一个主节点(master)和多个 chunkserver 组成,并被多个客户端(client)访问。不管是主节点还是 chunkserver 或客户端,都运行在廉价的 Linux 机器上。可以简单的将 chunkserver 和客户端运行在同一台机器上,如果系统资源允许或者能够容忍客户端代码潜在的不可靠性的话。

每个文件在存入 GFS 时会被切分为固定大小的块(chunk),每个 chunk 在创建时会被主节点分配一个全局不可变的64位 chunk handlechunkserverchunk 保存在本地文件系统中,每个 chunk 对应着本地的一个 Linux 文件,客户端通过指定 chunk handle 和文件偏移范围来读取或者写入 chunk。为了保证可靠性,每个 chunk 会复制到多台 chunkserver 上。GFS 默认为每个 chunk 生成3份副本,不过用户也可以为某些命名空间下的文件指定不同的副本数量。

主节点保存了全部的系统元数据,包括文件命名空间、访问控制信息、每个文件和对应 chunk 的映射、每个 chunk 所在的位置等。同时主节点还负责 chunk 的租约管理、不再使用的 chunk 的垃圾回收、chunkserver 间的 chunk 迁移等。主节点也会定期的向 chunkserver 发送心跳用于向 chunkserver 发送指令和收集 chunkserver 当前的状态。

GFS 的客户端代码会集成到用户应用程序中,它负责实现文件系统 API 以及和主节点、 chunkserver 通信来读取或写入文件。GFS 客户端通过主节点获取文件的元数据信息,然而文件的读取和写入都只和 chunkserver 通信,从而减少了主节点的负担。

不管是 GFS 客户端还是 chunkserver 都不会缓存文件数据。客户端缓存收益不大是因为 Google 大部分的应用程序是对大文件的流式读取或者文件内容过大无法被缓存。不考虑缓存简化了客户端和整个系统的实现,因为引入缓存就要考虑缓存一致性问题。不过客户端会缓存文件的元数据信息,例如某个 chunk 的位置信息。chunkserver 不缓存是因为 chunk 是作为 Linux 文件保存在本地文件系统中,操作系统本身已经提供了一层文件访问缓存,没有必要再加一层。

单主节点

单主节点同样简化了整个系统的设计,方便主节点高效管理 chunk,因为所有需要的信息都保存在主节点内存中。为了避免对文件的读写使得主节点成为瓶颈,客户端读写文件时直接和 chunkserver 通信而不会经过主节点中转,不过在开始读写前,客户端会先和主节点通信获取需要的 chunk 对应的 chunkserver 列表,并将这个数据缓存一段时间来减少后续和主节点的通信。

结合下面的流程图我们来看一下一次读操作是如何执行的:

alt

  1. 客户端根据固定的每个 chunk 的大小和想要读取的文件偏移位置,计算出 chunk 的索引。
  2. 客户端将文件名和 chunk 的索引发给主节点,主节点收到请求后返回对应的 chunk handle 和保存了该份 chunkchunkserver 列表。
  3. 客户端收到主节点返回结果后,以文件名和 chunk 的索引组合作为键,将返回结果缓存。
  4. 客户端将 chunk handle 和想要读取的文件内容偏移范围发给其中一台 chunkserver,一般来说,客户端会选择距离最近的 chunkserver,由于缓存了 chunkserver 列表信息,后续对同一个 chunk 的读请求就不需要再次和主节点通信,直到缓存过期或者文件被再次打开。
  5. 一般来说,客户端和主节点通信时会在一次请求中要求多个 chunk 的信息,主节点同样也可在一次响应中返回多个 chunk 的信息,这也有利于减少客户端和主节点的通信次数。

chunk 大小

每个 chunk 的大小是一个关键的设计选择。GFSchunk 大小为 64 MB,远大于一般文件系统的数据块的大小。每个 chunkLinux 文件的形式保存在 chunkserver 上,但其实际占用的大小是动态增长的,从而避免了大量的文件碎片(例如实际只需要十几K的文件却固定分配了 64 MB 的大小)。

那么,GFS 为什么要选择 64 MB 这样较大的数呢?主要是因为:

  1. chunk 的大小越大,每个文件拆分成的 chunk 的数量就越少,客户端需要读取或写入的 chunk 数量也就越少,和主节点通信的次数也就越少。对于 Google 的数据应用来说,大部分都是顺序读写,客户端和主节点交互的次数是线性的时间复杂度,减少 chunk 的总个数有助于降低整个的时间复杂度。即使是对于少数的随机写,由于 chunk 大小足够大,客户端也能够将TB级别的数据对应的所有 chunk 的信息缓存起来。
  2. 一个 chunk 的大小越大,那么客户端对其可操作次数就越多,客户端就可以和某个 chunkserver 维持一段较长时间的 TCP 连接,减少和不同的 chunkserver 建立连接的次数。
  3. 主节点需要维护每个 chunk 对应的 chunkserver 列表,这个也是个线性的空间复杂度,较大的 chunk 大小能减少 chunk 的总个数,从而减少元数据的大小,主节点就可以将所有的元数据放在内存中。

另一方面,即使将 chunk 的大小设置的较大,以及采用了懒分配的策略,也依然存在缺点。对于一个小文件来说,它可能只有几个甚至是一个 chunk,如果这个文件被访问的频率较大,它就有可能成为热点数据,保存了对应的 chunkchunkserver 就要承受更大的流量。不过在 Google 的实际应用场景中,热点数据并没有成为一个大问题,因为大部分的系统面向的是大文件的流式读取,流量能较平均的分发到各个 chunkserver

不过,Google 确实有遇到过一个热点数据问题,有个批处理系统会先将一个可执行文件写入到 GFS 中,这个可执行文件的大小只有一个 chunk,然后所有客户端会同时读取这个可执行文件并执行,这就造成了几台 chunkserver 同时承接了大量的请求。Google 通过两个方面来解决这个问题,第一针对这个文件设置一个较大的副本数量,让更多的 chunkserver 来分散流量,即水平扩展;第二交替启动批处理系统的客户端,避免同一时间的大量请求。另一种长期的解决方案是在这种场景下允许客户端从其他已读取完毕的客户端那读取文件。

元数据

主节点主要保存了三类元数据:每个文件和 chunk 所属的命名空间、每个文件到对应所有 chunk 的映射、每个 chunk 对应的 chunkserver 列表。所有的元数据都保存在主节点的内存中,前两种元数据被修改时还会以日志的形式保存在本地日志文件中,并备份到其他服务器上。通过日志文件的方式使得主节点能够轻松的修改元数据,并且在主节点崩溃重启后能恢复数据,当然极端情况下主节点有可能没有保存最新的元数据到日志文件中就崩溃了,这里后文会有说明主节点会在持久化完成后才返回结果给客户端,所以客户端不会看到未持久化的元数据。主节点并不会对 chunkserver 列表进行持久化,而是在启动时主动拉取所有 chunkserver 的信息,以及每当一个新的 chunkserver 加入到集群中时,更新元数据,因为 chunkserver 是个动态更新的数据,即使持久化了也需要和当前实时的数据作比对。

数据结构

由于元数据保存在主节点内存中,所以涉及主节点的操作都是非常快的。另一方面,主节点也能够轻易的每隔一段时间遍历所有的元数据,这个主要有三个目的,第一是扫描不再需要的 chunk 进行垃圾回收;第二如果某个 chunkserver 失联了,能将其保存的 chunk 备份到其他 chunkserver 上;第三将某些流量失衡或者本地磁盘空间不够的 chunkserver 下的 chunk 转移到其他 chunkserver 下。

将元数据保存在内存中的一个缺陷是受限于主节点的内存容量,能够保存的 chunk 的元数据的个数是有限的,从而整个 GFS 的容量大小是有限的。不过在实践中,这个问题还没有成为瓶颈,每个 chunk 对应的元数据大小约小于64字节,由于 GFS 面向的主要是大文件,所以基本上每个文件也就最后一个 chunk 没有塞满,所以空间浪费较少。类似的,由于使用了前缀压缩,每个文件的命名空间元数据的大小也小于64字节。对于一个 128 GB 内存的主节点来说,假设全部用来保存 chunk 的元数据,则理论上保存的所有 chunk 的大小为 128 GB / 64 B * 64 MB = 128 PB,虽然实际上元数据不只 chunk 一种,但整体上来说也能够维护 PB 级别的数据。

如果需要 GFS 支撑更大的容量,相比于将元数据全部保存在内存中所带来的的简洁、可靠、性能和灵活上的收益,扩展主节点的内存所需要的成本不值一提。

chunk 的保存位置

主节点不会持久化每个 chunk 对应的 chunkserver 列表,它会在启动时主动拉取所有的 chunkserver 信息。在主节点运行后,它可以通过和 chunkserver 的心跳来实时更新 chunkserver 的状态信息。

在最初的设计中,Google 的工程师试过将 chunkchunkserver 的映射关系进行持久化,但是最终还是采用了更简洁的设计,即在主节点启动时主动拉取所有的 chunkserver 信息并在之后定期更新内存中的数据。这就消除了频繁的数据同步,毕竟在集群环境下,chunkserver 加入或者离开集群,修改主机名,发生异常或重启等操作会经常发生。

另一方面,只有 chunkserver 才真正知道自己是否拥有某个 chunk,所以在主节点上持久化一份 chunkchunkserver 的映射关系意义不大,因为这份数据很大概率是不一致的,毕竟 chunkserver 会发生异常然后失联或者运维重命名了一台 chunkserver

操作日志

操作日志记录了元数据的重要历史更新,它是 GFS 的关键。不仅因为它是元数据的唯一持久化备份,同时它也提供了系统在并发操作下各操作的执行顺序。各个版本的文件和其对应的 chunk 的操作记录都被唯一的标识在日志文件中。

所以操作日志必须保证可靠性存储,以及在元数据持久化完成之前不能将最新的元数据更新暴露给客户端。否则就有可能丢失最新的客户端操作,即使 chunkserver 还存活着。因此,GFS 会将操作日志备份在多台机器上,并且只有在所有的副本机器都持久化完成后才会返回结果给客户端。同时主节点会对操作日志进行批量持久化以降低频繁持久化对系统整体吞吐的影响。

当主节点崩溃重启后会通过重放操作日志来恢复崩溃前的状态,然而如果每次都从第一条日志开始重放,主节点崩溃重启到可用需要的时间会越来越久,因此当操作日志的大小增长到一定程度的时候,主节点会为当前的元数据创建一个检查点,当主节点崩溃恢复后,可以先加载最新的检查点数据,然后再重放在这个检查点之后生成的操作日志。检查点是一个类似于 B 树的数据结构,可以轻易的映射到内存数据结构中,并且方便根据文件的命名空间检索。这就加快了主节点崩溃恢复的速度和提高了系统的可用性。

因为构建检查点需要时间,所以在创建检查点时需要确保不影响正在进行的文件修改。主节点会先创建一个新的操作日志文件,然后由一个新的线程去创建检查点,这个线程会依据新的操作日志文件创建前的日志生成检查点。对于一个百万级别数量文件的系统来说,大概需要1分钟的时间创建检查点,检查点创建完成后同样会持久化到本地磁盘和副本机器上。

主节点崩溃恢复只需要最新完整的检查点文件以及后续的操作日志文件,在这之前的检查点文件和操作日志文件都可以删除,不过为了提高容错性还是会保留一部分文件。如果生成检查点的时候发生异常并不会影响崩溃恢复的正确性,因为恢复的代码会校验检查点文件的完整性并跳过损坏的检查点文件。

一致性模型

GFS 提供了弱一致性模型,在能较好的支撑 Google 内部各分布式应用的同时也兼顾了简洁和高效的实现。

GFS 的保证

文件命名空间的修改是原子的(例如创建一个文件)。对命名空间的修改会由主节点加锁执行,这就保证了修改的原子性和正确性,同时主节点的操作日志记录了这些修改的全局顺序。

而某块文件区域修改后的状态则取决于修改的类型,是否成功或失败,以及是否存在并发的修改。某块文件区域是一致的(consistent)表示不管客户端从哪个 chunkserver 读取这块的数据,每个客户端看到的内容始终是相同的。某块文件区域是已定义的(defined)表示经过一次修改后,首先这块文件区域是一致的,并且客户端读到的数据就是自己修改的。如果对某块文件区域的修改不受其他并发请求的客户端的影响,那么这块文件区域在修改成功后是已定义的(也是一致的),所有的客户端都能看到对应的修改。而如果有多个客户端并发的对同一块文件区域修改,那么最终的结果是未定义的,但是是一致的,所有的客户端读到的数据都是相同的,但是并不知道读到的数据是哪个客户端修改的,一般来说,最后这块文件区域的内容很可能是多个客户端并发修改后混合的结果。一次失败的修改会使得这块文件区域不一致(因此也是未定义的),不同的客户端在不同时间读取到的数据是不同的,因为多个 chunkserver 上保存的数据不一致。

对文件的修改包括在随机位置的写和在文件末尾的追加写两种,随机写指的是数据会写入到应用程序指定的某个文件偏移的位置,追加写指的是在并发的情况下能保证数据会原子性的至少一次追加写入到文件末尾,不过最后数据写入的实际偏移位置是由 GFS 来决定(相反的,一次常规的追加写指的是在客户端以为是文件末尾的地方写入数据。)。追加写完成后,系统会返回相应的偏移量给客户端,这个偏移量表示了某块已定义的文件区域的开始,对于当前客户端来说,这块区域的数据就是自己写入的,对于其他客户端来说,它们读到的数据也始终是相同的。不过,GFS 有可能在修改 chunk 时插入对齐数据或者重复的数据,例如现在有个追加写操作,chunkserver1 需要写入的偏移量位置是100,并且写入完成了,chunkserver2 需要写入的偏移量位置也是100,但是 chunkserver2 写入失败了,客户端会进行重试,那么 chunkserver1 会在偏移量101的地方再次写入相同的数据,但是由于之前 chunkserver2 没有写入成功,它的偏移量位置还是100,那么为了保证所有客户端根据偏移量读取到的数据是相同的,就需要先补齐 chunkserver2 偏移量位置100的数据,然后在偏移量位置101写入新的数据,如果这次所有 chunkserver 都写入成功了,那么就会把偏移量101返回给客户端,而不是100。所以我们看到,客户端想要保存的原始文件,和最终从 GFS 取回的文件数据并不是每个比特都一致的,存在某些文件区域数据不一致的情况。

当一系列文件修改执行完成后,被修改的文件区域保证了是已定义的,并且包含了最后一次修改的内容。GFS 通过两个方面来保证这一点:第一,对 chunk 的所有修改在所有 chunkserver 上的执行顺序是一样的;第二,通过 chunk 的版本号来识别所有版本落后的 chunkserver,这些 chunkserver 有可能是因为在失联期间 chunk 内容发生了修改。对于版本号落后的 chunkserver 则不会参与 chunk 的修改并且主节点也不会将其作为有效的 chunkserver 返回给客户端,这些 chunkserver 所持有的 chunk 会尽早的被垃圾回收。

然而在前面提到,客户端从主节点获取某个 chunkchunkserver 列表后,会在一段时间内将其缓存,如果这段时间内某个 chunkserver 包含的 chunk 的版本号落后了,那么客户端读到的数据会存在短暂的不一致,这个时间段就取决于客户端缓存的过期时间,以及客户端下次打开文件的时间,因为重新打开文件时客户端会从主节点重新拉取 chunkserver 的信息从而刷新了缓存。然而,鉴于大部分文件的修改是追加写,这里我理解可能在流式读的场景下,读完 chunk 的某段内容后会返回新的偏移量,而对于版本号落后的 chunkserver 来说,它返回的偏移量可能已经读过了,所以此时客户端知道这个 chunkserver 可能过期了,所以会尝试再次和主节点通信,从而获取新的 chunkserver 列表。

然而即使对文件的修改执行成功了,系统其他组件如硬件的异常也有可能导致文件损坏或摧毁。GFS 通过主节点和 chunkserver 的心跳来监测异常的机器,并通过校验和来验证 chunk 的数据是否损坏。如果出现了数据损坏或者某台 chunkserver 失联了,那么 GFS 会从其他有效的 chunkserver 那重新复制数据,来保证副本数量维持在指定的阈值上。所以,只有当某个 chunk 对应的 chunkserver 全都异常了,同时 GFS 还没来得及反应过来(一般来说是几分钟内)复制数据,chunk 才会发生永久丢失。即使在这种情况下,对于应用程序来说它看到的是不可用的状态,而不是数据损坏:应用程序收到的是明确的错误信息而不是损坏的数据。

最后,来看一下每种操作下的一致性保证:

  • 随机写
    • 串行执行成功:已定义(defined),在串行执行场景下,某个时间只会有一个客户端执行写入,客户端在执行成功后读取到的就是自己刚刚写入的,所以是已定义的。
    • 并行执行成功:一致但是未定义(consistent but undefined),某个时间在并发场景下多个客户端写入的偏移范围可能重合,最后文件区域的数据可能是多个客户端写入混合后的产物,每个客户端写入后读取到的不一定是自己写入的,所以是未定义的,但由于写入成功,所以是一致的。
    • 异常:不一致(inconsistent),写入失败时,有的 chunkserver 可能写入成功,有的未成功,不同客户端读取同一个偏移量的数据就会不一致。
  • 追加写
    • 串行执行成功:已定义但是穿插着不一致(defined, interspersed with inconsistent),由于追加写至少写入一次的保证,一次追加写的成功背后可能包含着重试,所以在某些偏移量下各个 chunkserver 上的 chunk 的数据会不一致,但是追加写返回的是一个写入成功的偏移量,不会把不一致的偏移量返回,这个新的偏移量下的数据就是客户端自己写入的,且每个客户端读取这个偏移量的内容都是相同的,所以是已定义的。
    • 并行执行成功:同上,追加写返回的偏移量是由 GFS 决定的,所以多个并发客户端追加写入不会相互覆盖。
    • 异常:不一致(inconsistent),同随机写。

对应用程序的影响

借助追加写、检查点、可自我校验和识别的数据写入,GFS 的应用可以较好的适应 GFS 的弱一致性模型。

在实际应用场景中,GFS 的应用基本是通过追加写而不是覆盖写来修改文件。在某个典型应用中,某个文件的内容完全由某个生产者写入,在写入完成后能原子性的将其重命名,或者周期性的为至今写入成功的数据建立检查点。检查点同时也可包含应用程序级别的校验和(不同于 GFS 的校验和,应用程序保存的数据对于 GFS 来说是一致的,但对于应用来说可能是不完整的)。对于读文件的消费者来说,它们只需要读取到最新的检查点即可,因为到最新的检查点的数据可以认为是已定义的(defined),并可以通过校验和来验证文件数据的完整性。如上节所属,虽然追加写在失败时也会存在不一致的情况,但结合检查点的方式已经能较好的满足实际的业务场景。另外,追加写也远比随机写高效和健壮。借助检查点生产者可以在崩溃恢复后从检查点开始继续写入,消费者只根据检查点来读取数据,而不会越过检查点读到以应用的视角来说不完整的数据。

在另一个例子中,多个生产者并发的向某个文件进行追加写,追加写至少写入成功一次的保证使得每个生产者的写入都得以保留。不过,消费者在读取文件时就需要额外处理对齐数据和重复数据,每一条追加写可以看做一条记录,生产者在写入时可以附加一个校验和用于消费者校验数据的完整性,对于对齐数据或不完整的数据来说,它们必然没有校验和,因此消费者就可以跳过这些异常记录。另外,每一条记录都有一个唯一的标识符,重复的记录有着相同的标识符,如果消费者不能容忍重复的数据(例如消费者的代码没有做到幂等),则可以通过这个标识符来对数据去重。除了去重以外,上述的功能都已包含在客户端库中,无需应用端自行实现,从而使得在应用层面每一个消费者都能读到同一份数据(当然,可能会有一点重复数据)。

系统交互

在单主节点的设计下,GFS 尽可能的让主节点少参与到数据交互中,本节主要描述了客户端、主节点和 chunkserver 在数据修改、原子性的追加写和快照中的实现。

租约和修改的执行顺序

一次修改(mutation)指的是修改了某个 chunk 的元数据或者内容。每个修改都会应用到该 chunk 的所有 chunkserver 上。在修改前,主节点会挑选某个 chunkserver 作为 primary,为其分配一个租约(lease)。对某个 chunk 的修改可能同时会有多个,所以 primary 会对所有的修改安排一个执行顺序,所有其他的 chunkserver 都会按照这个执行顺序应用修改。因此,从全局来看,对同一个 chunk 的修改的顺序首先取决于主节点分配租约的顺序,在某个租约有效期内的执行顺序,则取决于 primary 安排的执行顺序。

租约的设计是为了减少主节点协调的负担。一个租约初始设置了60秒的过期时间,不过如果在过期前该 chunk 还未完成修改,primary 可向主节点申请租约续期。这个申请可以附带在 chunkserver 和主节点的心跳监测中,而无需额外通信。不过有时候主节点可能会在租约有效期内回收租约(例如,某个文件正在重命名时主节点需要暂停所有的修改)。即使主节点和当前的 primary 失联了,主节点依然可以在该租约过期后安全的重新分配一个租约给新的 primary

同时,租约也避免了脑裂的问题,如果没有租约的时间限制,主节点先指定了一个 primary,然而由于网络原因主节点认为这个 primary 失联了,就会重新分配一个 primary,此时就有两个 chunkserver 认为自己是 primary。而在租约过期前,主节点不会分配新的 primary 就保证了同一时间只有一个 primary

下图描述了一次随机写的执行流程:

alt

  1. 客户端询问主节点当前持有租约的 primary 和其他的 chunkserver 的地址,如果当前没有一台 chunkserver 持有租约,则主节点会挑选一个 primary 并分配租约。
  2. 主节点返回当前的 primary 和其他的 chunkserversecondary) 的地址。客户端收到响应后将其缓存供后续修改时使用,只有当 primary 无响应或者 primary 回复不再持有租约时,客户端才会再次询问主节点 primary 的信息。
  3. 客户端将需要写入的数据发送给所有的 chunkserver,这一步没有执行顺序的要求。每个 chunkserver 收到数据后会将其暂存在一个 LRU 缓存中直到该数据被取出或过期。从控制流角度来说,是由 primarysecondary 发送指令,而发送待写入的数据不关心谁是 primary,下文会说到这里 GFS 将数据流和控制流进行解耦,基于网络的拓扑结构来实现数据的最优传输。
  4. 当所有 chunkserver 都收到了客户端的数据后,客户端会向 primary 发送一个写入请求,这个请求中标记了之前发送的那份数据。primary 将该时段收到的所有修改请求设置一个串行的执行顺序,并对每一个修改分配一个执行编号,然后将修改写入到本地的 chunk 中。
  5. primary 将写请求分发给其他所有的 secondarysecondary 收到请求后根据执行编号同样将修改按照和 primary 一样的执行顺序写入到自己的 chunk 中。
  6. 每个 secondary 写入完成后会回复 primary 告知写入完成。
  7. primary 收到所有 secondary 的回复后,再回复给客户端告知写入成功。如果途中任何一个 chunkserver 出现异常都会通知客户端,当发生异常时,可能 primary 已经写入成功了,但是某个 secondary 只写入成功一部分(如果是 primary 写入异常,就不会分发写请求给其他所有的 secondary)。因此此次客户端请求就认为写入失败,多个 chunkserver 间出现了数据不一致,此时客户端会进行重试,它会尝试重新执行第3步到第7步,如果尝试几次后依然失败,则会重新从第1步开始执行。

如果客户端一次随机写的数据量较大或者当前 chunk 的剩余空间无法容纳全部的数据则需要跨 chunk 写入,GFS 的客户端库会将一次写请求拆分为多个写请求处理。每个单独的写请求都遵循上述的流程执行,不过由于存在并发客户端写的问题,最终写入的数据有可能多个客户端间相互覆盖,不过由于每个 secondary 都按照相同的执行顺序写入成功,最终各个 chunkserver 上的数据都是相同的,这个也就是之前描述的一致但是未定义状态(consistent but undefined)。

数据流

在前面提到,在第3步发送数据时,GFS 解耦了控制流和数据流以更有效率的利用网络。在控制流下,请求从客户端发送到 primary,再由 primary 分发给其他所有的 secondary,在数据流下,GFS 会选择最优的路径以接力的方式在各个 chunkserver 间传递数据。目的是为了充分的利用每台机器的网络带宽,同时避免网络瓶颈和高延迟的链路,从而降低推送全部数据的整体延迟。

为了尽可能的充分利用每台机器的网络带宽,GFS 选择了线性接力的方式在各个 chunkserver 间传递数据,而不是其他的分发方式(如树状分发,例如以一台机器发送给3台机器为例,发送的机器的出口带宽就被3台机器共享)。

为了尽可能的避免网络瓶颈和高延迟的链路,每个机器会选择在网络结构下距离最近且还未收到数据的机器传递数据。假设现在客户端需要推送数据给 S1S4 四台机器,客户端先将数据发送给距离最近的机器,比如说是 S1S1 同样将数据转发给距离 S1 最近的机器,比如说是 S2,依此类推,最后数据到达 S4Google 内部的网络拓扑结构足够简单使得两台机器间的距离能够较准确的根据 IP 地址估算出来。

最后,GFSTCP 连接上构建了一条流式接力传递的数据流来降低数据传递的延迟。每个 chunkserver 一旦收到一定大小的数据后就马上将其转发给距离最近的其他 chunkserver。收到数据就马上转发并不会降低收数据的速率。在没有网络拥堵的情况下,假设 T 是当前网络的吞吐,L 是两台机器间的传输速度,那么要传输 B 字节给 Rchunkserver 需要的时间为 B / T + RL,其中 B / T 是完整传输 B 个字节给第一台机器的时间,RL 表示第一台机器发送的第一个字节经过接力到达最后一台机器所需要的时间。在 Google 的网络环境下,T100 MbpsL 远小于 1 ms,所以 RL 的时间基本可以忽略,因此传输 1 MB 的数据理论上约需要 80 ms((1 * 1024 * 1024 * 8 / 100 * 1000000) * 1000 ms)。

原子性追加写

GFS 提供了原子性的追加写操作(record append)。在传统的随机写操作下,客户端会指定要写入的文件偏移位置,在多个客户端并发写的情况下,最终该文件区域的内容为多个客户端并发修改后混合的结果。而在追加写场景下,客户端不指定偏移位置,只提供需要写入的数据,GFS 能够原子性的至少一次将其追加到文件中,并指定一个数据写入后的偏移位置,然后将其返回给客户端。这个操作类似于在 Unix 下多个进程以 O_APPEND 的模式并发写入文件而无需担心竞争问题。

追加写在 Google 的分布式应用中使用的非常频繁,多个客户端会并发的对同一个文件追加写入。在传统的随机写操作下,客户端需要额外的同步机制例如实现一个分布式锁来保证写入的线程安全性。在 Google 的应用场景下,文件一般是作为一个多生产者/单消费者的缓冲队列或者包含了多客户端合并后的结果,所以追加写已经能满足需求。

追加写也是一种修改,它的执行流程和随机写基本是相同的,不过对于 primary 来说多了些额外的处理逻辑。首先客户端将需要追加写的数据发送给对应文件的最后一个 chunk 的所有 chunkserver,然后发送写请求给 primaryprimary 会检查当次写入是否会造成最后 chunk 的大小超过 64 MB。如果会超过,那么 primary 会先将当前 chunk 使用特殊字符或其他手段填充到 64 MB,同时告知所有其他的 secondary 也进行同样的操作,接着回复客户端要求针对下一个新的 chunk 发送同样的追加写请求(追加写要求追加写入的偏移位置不超过 chunk 最大大小的四分之一处,从而使得由于数据填充带来的 chunk 碎片在一个可接受的水平。)。如果追加写入不会造成超过 chunk 的最大大小,那么 primary 会先将数据追加到自己的 chunk 中,产生一个新的偏移量,然后再通知其他所有的 secondary,将数据追加到相同的偏移量上,最后全部 chunkserver 执行成功后再将新的偏移量返回给客户端。

如果任何一个 chunkserver 追加写失败了,客户端会发起重试。因此,多个 chunkserver 下同一个 chunk 的数据可能会有数据不一致的情况,某条记录有可能全部或者部分重复。GFS 并不保证每个 chunkserver 上的 chunk 的每个字节都一样。它只保证了每个追加写能原子性的至少写入一次,因为一次追加写执行成功依赖于所有的 chunkserver 成功写入到指定的偏移位置上。此外,在一次追加写执行成功后,后续的追加写必然是写在一个更高的文件偏移位置或者新的 chunk 上,即使主节点挑选了一个新的 primary。在 GFS 的一致性模型下,成功执行了追加写所在的偏移区域的数据是已定义的(也是一致的),而剩下存在写失败的偏移区域的数据是不一致的,前文有提到过这些不一致的区域可以通过校验和来剔除。

快照

快照能够几乎瞬时的复制一个文件或者文件夹,而尽可能的降低对进行中的修改的影响。在实际应用中,一般使用快照对某个巨大的数据集快速创建一个分支(以及分支的分支),或者为当前的文件状态创建一个检查点,方便对当前的文件进行验证修改,在修改完成后提交或回滚。

类似于 AFSGFS 也采用了写时复制(copy-on-write)的技术来实现快照。当主节点收到某个文件的快照请求时,会首先回收所有该文件的 chunk 所关联的租约,这就保证了后续所有对这些 chunk 的修改都需要先申请租约。这就给了主节点有复制一份 chunk 的机会。

当所有涉及的租约被回收或者过期后,主节点首先将快照操作写入操作日志,然后修改内存元数据,复制一份新的目标文件或文件夹的元数据,这个新复制的元数据和源文件的元数据都指向相同的 chunk

在快照生成后,假设此时有一个客户端希望写入 chunk C,它会先询问主节点返回当前持有租约的 primary,主节点发现要修改的 chunk C 有多于1个的元数据引用,说明这个 chunk 发生了快照。主节点首先会为这个文件创建一个新的 chunk C',然后要求所有当前持有 chunk Cchunkserver 在本地创建一个文件作为 chunk C'。因为创建 chunk C' 的机器和持有 chunk C 的机器是同一台机器,所以创建 chunk C' 只是单纯的复制 chunk C,不涉及任何网络 IOGoogle 使用的磁盘 IO 速度大概是3倍于 100 MB 的网络 IO 速度)。接下来的操作就和前面描述的客户端申请写入某个 chunk 一样了,主节点返回 chunk C'chunk handle 和对应持有租约的 primary 给客户端,客户端并不会感知这个返回的 chunk 是从某个已有的 chunk 复制而来的。

主节点操作

所有涉及命名空间的操作都由主节点执行。另外,主节点还负责 chunk 的管理:每个 chunk 应该放在哪台服务器上、创建新的 chunk 及其副本、协调各 chunkserver 保证每个 chunk 的副本数量满足阈值、保证 chunkserver 访问的负载均衡、回收不再使用的 chunk 等等。

命名空间管理和锁

很多主节点的操作需要较长的时间完成,例如,一次快照操作就需要先回收所有 chunk 涉及的租约。同时,又需要降低不同的主节点操作间的影响。因此,GFS 允许不同的主节点操作并发执行,并通过命名空间加锁来保证线程安全。

和传统的文件系统不同,GFS 的文件夹不支持列出该文件夹内的所有文件,以及不支持文件或文件夹的别名(例如 Unix 的硬链接和软链接)。GFS 通过命名空间作为文件到元数据的映射索引,借助前缀压缩使得整个命名空间映射可以高效的保存在内存中。命名空间下的每个节点(代表一个文件名或者文件夹名)都持有一把读写锁。

主节点每次执行操作前都会先获取一批锁,例如如果当前操作涉及命名空间 /d1/d2/.../dn/leaf,主节点会先获取文件夹 /d1/d1/d2,…,/d1/d2/.../dn 的读锁,以及 /d1/d2/.../dn/leaf 的读锁或写锁。这里的 leaf 对应的是某个文件或文件夹。

下面来举例说明 GFS 的锁机制如何保证当创建 /home/user 的快照到 /save/user 时避免同时创建文件 /home/user/foo。首先快照操作会获取 /home/save 的读锁,以及 home/user/save/user 的写锁。而文件创建的操作会获取 /home/home/user 的读锁,以及 /home/user/foo 的写锁。可以看到,一个获取 home/user 的写锁,另一个获取 home/user 的读锁,所以这两个操作是互斥的,不会同时发生。写文件并不需要获取文件的父目录的写锁,因为文件夹对于 GFS 来说是个逻辑上的概念,背后没有类似 inode 的数据结构需要共享保护。获取文件夹的读锁已经足够保证不会有另一个客户端能够获取文件夹的写锁从而删除了这个文件夹。

这种锁机制的一个优点是允许对同一个文件夹下的不同文件进行并发修改。例如,可以在同一个文件夹下并发创建不同的文件,每一个创建操作都会获取该文件夹的读锁以及所要创建的文件的写锁。文件夹的读锁能有效避免这个文件夹被删除、重命名或者创建快照。文件的写锁则避免了同一个文件被创建两次。

因为整个命名空间会包含很多的节点,所以每个节点的读写锁采用了懒汉初始化的方式,并且当不需要的时候能及时删除。另外,锁的获取会按照命名空间的层级顺序以及同一层级下节点名称的字母顺序来获取,从而避免死锁。

副本的放置

一个 GFS 集群一般由多个机柜上的几百台机器组成,这些 chunkserver 又会被几百台在同一个或不同机柜上的客户端访问,如果是两台不同机柜上的机器间的通信可能会经过1个或多个网络交换机。另外不同机柜间的入口和出口带宽一般会小于同机柜内的机器间的带宽。这种多级的分布式对分发数据的扩展性,可靠性和可用性提出了挑战。

所以副本的放置选择策略主要出于两个目的:最大化数据的可靠性和可用性,最大化利用网络带宽。为了实现这两点,仅仅将所有的副本分发到不同的机器上还不够,因为这只对磁盘或者机器异常进行了容错,以及最大化利用了每台机器的带宽,但是还缺少可用性的考虑。因此,需要将副本分发到不同机柜的不同机器上,这样如果当某台机柜上的机器都因故下线了(例如一些共享资源如网络交换机或者供电设备发生异常),系统仍然是可用的。所以在跨机柜的访问下,某个 chunk 的读操作会充分利用不同机柜间的带宽,另一方面 chunk 的写操作就需要跨机柜访问,这也是跨机柜存储的权衡利弊的一个体现。

副本的创建、重复制和负载均衡

chunk 的副本的创建出于三个目的:创建一个新的 chunk 时需要保证存储的可靠性,保证每个 chunk 的副本数量满足阈值,保证 chunkserver 访问的负载是均衡的。

当主节点创建一个 chunk 时,它会决定在哪个 chunkserver 上创建空的 chunk,它主要基于以下几点考虑:

  1. 选择磁盘空间利用率低于平均值的 chunkserver,这保证了长期来看每台 chunkserver 的磁盘空间利用率维持在一个相近的水平。
  2. 避免每台 chunkserver 存在过多最近创建的 chunk。虽然创建一个 chunk 比较简单,但是可以预见的是随之而来这个 chunk 会面临大量的写入。
  3. 如前面所述,每个 chunk 的副本需要跨机柜存储。

当某个 chunk 的副本数量低于用户指定的阈值时,主节点会复制一个 chunk 到某台 chunkserver 上。这种情况一般有以下几种原因:

  1. 某台 chunkserver 异常失联了。
  2. chunkserver 反馈其保存的 chunk 数据已损坏。
  3. chunkserver 的某个磁盘由于异常被禁用了。
  4. 用户要求增加副本的数量。

主节点在复制一个 chunk 时会按照优先级处理。其中一个是距离满足指定的副本数量阈值还差多少。例如如果某个 chunk 还差两个副本就比只差一个副本的 chunk 有着更高的复制优先级。另外,复制还在使用的 chunk 的优先级高于复制最近被删除的 chunk。最后,为了避免缺少副本带给应用的影响,主节点会优先处理那些阻塞了客户端执行的 chunk

主节点根据优先级决定复制哪个 chunk 之后,会选择几台 chunkserver 让其直接从拥有完整副本的 chunkserver 上复制数据。这种情况下新的 chunk 副本的存放位置的选择和前文所述类似:尽量保证每台 chunkserver 的磁盘空间利用率维持在一个相近的水平,避免副本的复制集中在一台 chunkserver 上,尽量保证副本跨机柜存储。为了避免副本复制的流量压垮客户端的流量,主节点会限制整个集群以及每台 chunkserver 同一时间执行的副本复制操作的数量。另外,每台 chunkserver 也会限制花在副本复制上的带宽。

最后,主节点会定期检查各 chunkserver 的流量或者磁盘空间利用率是否均衡:如果发现了不均衡则会在各 chunkserver 间移动某些 chunk 以达到磁盘空间利用率或流量的均衡。根据上述的策略,对于一台新加入集群的 chunkserver 来说,主节点会逐渐的将其填满 chunk 而不是马上让其承接大量新的 chunk 然后伴随着大量的写流量。另外,主节点也需要决定从哪台 chunkserver 上移除 chunk,一般的,主节点会优先选择磁盘可用空间率低于平均水平的 chunkserver,从而使得每台 chunkserver 的磁盘空间利用率维持在一个相近的水平。

垃圾回收

当某个文件被删除后,GFS 不会马上释放它所占用的空间,而是在常规的垃圾回收期间在文件和 chunk 层面延迟删除,这种策略使得系统更加简洁和可靠。

机制

当某个文件被应用删除后,和其他修改一样主节点首先会将该操作记录到操作日志中。不过,GFS 并不会马上删除该文件,而是将该文件重命名为一个隐藏的名字,同时这个隐藏的名字包含了文件删除时的时间戳。在主节点常规扫描系统的命名空间的期间,它会删除命名空间中已经存在了3天(时间间隔可配置)的隐藏文件。在这之前,这个文件依然是可以通过重命名后的名字读取的,或者将其重命名回原来的名字来撤销删除。一旦主节点将其从命名空间中删除,其对应的元数据也会一并删除,从而切断了这个文件和任何 chunk 的关联。

类似的,主节点也会定期扫描所有 chunk 的命名空间,一旦发现没有任何关联的 chunk(即没有被任何文件引用的 chunk),主节点就会删除这些 chunk 的元数据。之后,在主节点和 chunkserver 的心跳监测中,chunkserver 会向主节点反馈其所拥有的 chunk,主节点会返回所有已经不在元数据中的 chunk 标识,chunkserver 收到回复后就可以删除掉这些不需要的 chunk

讨论

虽然分布式的垃圾回收是个困难的问题,往往需要一套特定编程语言下的复杂解决方案,在 GFS 的实现中却很简单。主节点可以轻易的标识出所有 chunk 的引用,它们全都保存在文件到 chunk 的映射中。同时主节点也可轻易的标识出所有的 chunk 副本,它们都是 chunkserver 上某个指定文件夹下的 Linux 文件。任何一个不在主节点中记录的副本都可以被回收。

GFS 采用的延迟垃圾回收的方式相比于立即删除的方式存在以下几个优点:

  1. 对于组件异常经常发生的大型分布式系统来说,延迟删除更为简洁和可靠。创建 chunk 时有可能在部分 chunkserver 上成功在部分 chunkserver 上失败,这就导致创建 chunk 的这次操作整体是失败的,从而使得主节点不会记录该 chunk,后续也无法知晓这些部分创建成功的 chunk。如果采用立即删除的方式,需要由主节点向 chunkserver 发送删除 chunk 的消息并需要对发送失败的消息进行重发。而延迟垃圾回收的方式提供了一个统一、可靠的方式来删除无用的 chunk
  2. 主节点将 chunk 的删除合并到了主节点的日常维护中进行,例如对命名空间的定期扫描以及主节点和 chunkserver 的心跳监测。这就可以将 chunk 的删除进行批量处理从而摊薄了运行的成本。另外,垃圾回收只会在主节点较为空闲的时候进行,主节点会更优先的响应客户端的请求。
  3. 延迟删除给不可逆转的误删除提供了缓冲。

在实际应用中,延迟删除的主要缺点是不利于磁盘剩余空间紧张的 chunkserver 释放磁盘空间。尤其是对不停的创建然后删除临时文件的应用来说,不能够马上利用那些不需要的临时文件的空间。GFS 通过加快回收某个重复删除的文件来缓解这个问题。同时,GFS 也允许用户针对不同的命名空间设置不同的副本数量阈值和垃圾回收策略。例如,用户可指定某个目录下的所有文件的 chunk 都不需要额外的副本,以及当文件删除时系统会立即直接删除。

过期的副本检测

在某个 chunk 所属的 chunkserver 异常下线期间,其他 chunkserver 上的 chunk 发生了修改,则该 chunkserver 再次上线后该 chunk 的数据版本已落后。主节点会记录每一个 chunk 最新的版本号,从而能识别过期的副本。

每当主节点为某一个 chunk 分配一个租约时,它会先更新 chunk 的版本号,并通知所有当前持有最新版本 chunkchunkserver 也更新版本号。主节点和 chunkserver 都会将这个版本号持久化,这里的持久化顺序应该是 chunkserver 先,主节点后,否则主节点先持久化版本号然后此时下线了,再次上线后造成没有任何一个 chunkserver 持有最新的版本号。只有当持久化完成之后,主节点才会将 primary 发送给客户端,之后才能开始写入。如果此时某个 chunkserver 下线了,那么它所持有的 chunk 的版本号就无法更新,当这个 chunkserver 再次上线时,它会向主节点报告所持有的 chunk 以及对应的版本号,主节点就能知道这个 chunkserver 持有了过期的副本。如果主节点发现某个 chunk 的副本的版本号大于自己内存中的版本号,那么主节点就知道自己在更新版本号期间下线了,直接使用 chunkserver 的版本号作为最新的版本号。

主节点会在垃圾回收期间删除过期的副本。在这之前,如果有客户端请求该 chunk 的信息,主节点会忽视持有过期副本的 chunkserver,不会将其返回给客户端。其次,主节点在返回给客户端的 chunk 信息中也包含了版本号,在要求某台 chunkserver 去复制另一台 chunkserver 上的 chunk 数据时也同样附带了版本号,这样客户端或者 chunkserver 在读取 chunk 数据时就能比较读到的数据是否是最近的版本。

容错和诊断

Google 在设计 GFS 遇到的最大问题之一是如何应对频繁的组件异常。各组件的质量和数量决定了异常是经常出现而不是偶尔出现:既不能完全信任机器,也不能完全信任磁盘。组件异常可能造成系统不可用,或者更糟的是数据损坏。本节主要描述 GFS 如何面对这些挑战以及开发了哪些工具来定位问题。

高可用

一个 GFS 集群由数百台机器组成,某些机器必然会在任意时间发生异常。GFS 通过快速恢复和副本这两个手段实现了系统的高可用。

快速恢复

主节点和 chunkserver 都是设计成不管以什么样的方式终止,都能在数秒内恢复终止前的状态。实际上,GFS 并不会刻意区分正常的终止还是异常的终止,各服务器本身日常就会通过杀死进程来终止。客户端或其他服务器会因此经历一段短暂的中断,因为进行中的请求会因为服务器的终止而超时,继而客户端会对重启后的服务器进行重连,然后重试。

chunk 副本

如之前描述,每个 chunk 会备份到不同机柜上的不同机器。用户可指定不同命名空间配置不同的备份策略。默认每个 chunk 有3个副本。主节点通过 chunkserver 间复制 chunk 来保证每个 chunk 有足够的副本数量,避免 chunkserver 由于下线或者数据损坏造成 chunk 副本数量不足。虽然依靠副本已经能较好的支撑需求,Google 的工程师也在考虑跨服务器的冗余手段如奇偶校验或者 erasure code 来应对日渐增长的只读存储需求。在当前这个松耦合的系统中实现更复杂的冗余策略会更有挑战性,不过也是可控的,因为大部分的文件操作都是追加写和读而不是随机写。

主节点副本

GFS 通过备份主节点的状态来实现可靠性。主节点的操作日志和检查点都会备份到多台机器上。一次文件的修改只有当主节点和所有备份服务器都写入到本地日志之后才认为是已提交的。从实现的简单性考虑,只会有一个主节点机器负责所有的元数据修改以及在后台运行的一些日常维护活动来修改主节点的内部状态,例如垃圾回收。当这个主节点发生异常时,它几乎能马上重启。如果整个机器或者磁盘发生异常,GFS 之外的监控设施能在其他机器上借助操作日志副本重新启动一个新的主节点进程。对于客户端来说不需要有任何改动,因为客户端和主节点的通信是通过主机名进行的(例如 gfs-test),其对应的是一个 DNS 别名,当主节点更换机器时,同步修改别名映射即可。

另外,即使主节点异常了,还有影子主节点也提供对本地文件系统的只读访问功能。因为这个影子主节点不是严格的镜像节点,所以它们的文件系统上的数据可能会略落后于主节点,一般来说是几秒。影子主节点依然能够提供当前未在修改的文件查询功能,或者应用程序不介意查询到一部分过期的数据,例如 chunkserver 列表小于实际运行的数量。因为文件的内容是通过 chunkserver 读取,和主节点无关,所以客户端不会读取到过期的数据。可能过期的数据是元数据,例如文件夹的内容或者权限控制信息。

为了保证自己持续更新,影子主节点会不断从某台操作日志副本机器读取日志,并以和主节点一样的执行顺序将其重放到内存中的数据结构中。和主节点一样,影子主节点在启动时也会拉取 chunkserver 信息来定位 chunk 的副本位置,并定期与其进行心跳监测。它依赖主节点来更新 chunk 副本的位置,如主节点创建和删除副本。

数据完整性

每台 chunkserver 使用校验和来检测损坏的数据。鉴于一个 GFS 集群有几千个磁盘运行在几百台机器上,磁盘异常经常会导致文件读写时数据损坏或丢失。虽然可以从其他 chunkserver 恢复,但是通过比较多个 chunkserver 上的副本来检测数据损坏显然是不实际的。另外,在一致性模型中提到,每个副本的数据不是每个比特都一致的,由于追加写的原子性保证造成异常重试的存在,每个副本的数据很大可能是不同的。所以,每台 chunkserver 必须能够自己独立通过校验和来检测数据是否损坏。

每个 chunk 切分为每块大小为 64 KB 的数据块,每个数据块对应一个32位的校验和。和其他元数据一样,校验和也保存在内存中,并随日志持久化到本地磁盘中,且和用户数据分开存放。

对于读操作,chunkserver 会根据读取的字节范围先根据校验和校验所覆盖的数据块的数据是否完整,然后再返回数据给客户端或其他的 chunkserver。因此,每台 chunkserver 不会把损坏的数据同步到其他 chunkserver 上。如果 chunkserver 发现某个数据块发生了损坏,则会返回一个异常给调用方,并向主节点报告。调用方收到回复后会重新选择一个 chunkserver 进行读操作,而主节点则会安排这个 chunkserver 从其他 chunkserver 那复制 chunk。当新的 chunk 就绪后,主节点会通知这个 chunkserver 删除损坏的 chunk

校验和对读操作带来的影响较小,主要有几个方面的原因。大部分的一次读操作只涉及了几个数据块,所以需要读取和验证的校验和数量也较少。另外,客户端在读取时,会尽量保证正好在数据块的边界读取,而不是跨数据块。最后,校验和验证不涉及 IO,以及校验和的计算可以和数据读取 IO 同时进行。

针对追加写,校验和计算对此做了大量的优化,因为追加写是主要的写场景。计算校验和时,GFS 会复用当前最后一个数据块的校验和,一个数据块的大小是 64 KB,可能目前只写入了 30 KB,那么本次往剩下的 34 KB 写入时,则无需从头开始计算这个数据块的校验和。对于新填充的数据块依然还是需要完整计算校验和。即使当前这个写入了 30 KB 的数据块损坏了,计算校验和时无法知晓,最终这个数据块的校验和与保存的数据已经不匹配,这个损坏也会在下一次读操作中被调用方感知。

相反的,对于随机写操作,如果当前的随机写覆盖了某个 chunk 已有的数据,则必须先对所覆盖的数据块进行校验和验证,确保当前的数据未损坏,然后再执行写入和计算新的校验和。如果不进校校验就写入,那么有可能在写入之外的地方已经发生了数据损坏。因为一次随机写可能写不满最后一个数据块,假设这个数据块是损坏的,如果写前校验就能发现这个问题。不过这里并没有理解为什么不能将完整性校验推迟到后续的读操作时,可能是因为追加写和流式读是 GFS 应用的主要方式,完整性校验推迟到读操作也能很快发现问题,而随机写是很少量的场景,写时校验能更快的发现问题。

在系统空闲的时候,chunkserver 会扫描和校验所有不活跃的 chunk,从而能够发现那些不经常被访问但是已损坏的 chunk(因为数据完整性检测发生在读写操作时)。一旦发现这样的 chunk,主节点会创建一个新的 chunk 然后指示 chunkserver 复制一份有效的 chunk,最后再删除损坏的 chunk。这就避免因为一些不活跃但已损坏的副本使得主节点认为这些 chunk 依然满足副本数量阈值的要求。

诊断工具

通过详尽的诊断日志能有效的帮助问题隔离,调试,性能分析,而且带来的成本较小。如果没有日志,则很难理解各机器间无法复现的交互。针对某些重要的事件(如 chunkserver 的下线和上线) GFS 会生成相应的诊断日志,同时还会记录所有的 RPC 请求和响应。诊断日志的删除对系统的正确性没有任何影响。然而在磁盘空间允许的情况下,还是会尽可能保留多的诊断日志。

除了正在读取或写入的文件,诊断日志记录了其他所有 RPC 的请求和响应。通过比对各机器上的 RPC 日志,就能重现当时系统交互的场景,从而进行问题定位。同时这些日志也能辅助压力测试和性能分析。

诊断日志带来的性能影响很小(在其带来的价值面前不值一提),因为采用了异步顺序的方式写入日志。同时,最近某段时间内的事件也会保存在内存中用于线上的持续监控。

参考

概览

Lab 1 需要我们实现一个单机多线程、多进程的 MapReduce 程序,通过 test-mr.sh 可以看到该实验会先启动一个主节点进程,然后再启动多个工作节点进程。

主节点

主节点的入口是 mrcoordinator.go,通过 go run mrcoordinator.go pg*.txt 可运行一个主节点程序,其中 pg*.txt 是本次 MapReduce 程序的输入数据。同时,主节点和工作节点间通过 RPC 进行交互,本实验需要我们实现以下 RPC 接口:

  1. 实验中主节点的任务分发采用的是拉模式,工作节点会定期向主节点请求一个任务,这个任务可以是 map 任务也可以是 reduce 任务,由主节点根据当前任务的状态决定应该推送 map 任务还是 reduce 任务。由于 reduce 任务会不间断的拉取中间结果数据,这里为了方便处理,当某个工作节点正在处理 reduce 任务时,在向主节点请求任务时可指明要求分发对应的 reduce 任务
  2. map 任务完成后需要通知主节点生成的中间结果文件地址,主节点收到请求后需要将中间结果文件地址保存到内部数据结构中,供后续发送给对应 reduce 任务
  3. reduce 任务完成后同样需要通知主节点任务完成,否则主节点无法知晓所有 reduce 任务是否已完成,从而无法退出主节点

任务管理

主节点需要维护所有 map 任务和 reduce 任务的状态,由于工作节点有可能失败,所以主节点需要同时记录每个任务已执行了多久,如果超过一定时间还没有收到任务完成的通知,则认为执行这个任务的工作节点已失联,然后需要将该任务重新分配给其他工作节点,在本实验中主节点等待每个任务完成的时间为10秒。

工作节点

工作节点的入口是 mrworker.go,通过 go run mrworker.go wc.so 可运行一个工作节点程序,其中 wc.so 是本次 MapReduce 程序用到的用户自定义 mapreduce 函数。工作节点只有一个 Worker 主方法,需要不断向主节点轮询请求任务,那么工作节点什么时候结束轮询?根据实验建议有两种方法,一种是当本次 MapReduce 任务完成后,主节点进程退出,当工作节点再次请求主节点任务时,RPC 请求必然失败,此时工作节点可认为本次任务已完成主节点已退出,从而结束轮询并退出;另一种是当主节点完成后,在一定时间内,在收到工作节点新的任务请求时,返回一个要求工作节点退出的标识(或者也可抽象为一种任务类型),工作节点收到 RPC 响应后退出,主节点在等待时间到了之后也进行退出。

空任务

当工作节点向主节点申请任务,而此时主节点没有可分发的任务时,主节点可返回一个自定义任务类型的任务,来表示空任务,工作节点收到响应后则直接睡眠等待下次唤醒。

map 任务

map 任务负责调用用户自定义的 map 函数,生成一组中间结果数据,然后将中间结果数据保存为文件,实验建议的文件名是 mr-X-Y,其中 X 表示 map 任务的编号,Y 表示 reduce 任务的编号,map 任务编号的范围可简单使用 [1, 输入文件的数量] 来表示,reduce 任务编号的范围为 [1, nReduce]。每个 map 任务会生成 R 个中间结果文件,实验已提供了分片函数,对中间结果的每个键使用 ihash(key) % nReduce 来决定写入到哪个中间结果文件中。

从数据结构角度来说,所有的中间结果数据是一个 nReduce * nMap 的二维矩阵,每一行对应一个 reduce 任务。

由于 map 节点有可能执行失败,为避免 reduce 节点读取到的是未写入完成的中间结果文件,在写入中间结果文件时可以先写入一个临时文件,在写入完成后再重命名为最终的文件名。在本实验中,可以使用 ioutil.TempFile 来创建临时文件,以及使用 os.Rename 来原子性的重命名文件。

在当前 map 任务的中间结果文件写入完成后,需要通过 RPC 请求通知主节点所有中间结果文件的文件地址。在本实验中,各个工作节点运行在同一台机器上,实验要求保存 map 任务的中间结果文件到当前文件夹,这样 reduce 任务就能通过中间结果文件的文件名读取中间结果数据。

对中间结果数据的写文件可以借助 Goencoding/json 模块,例如:

1
2
3
enc := json.NewEncoder(file)
for _, kv := ... {
err := enc.Encode(&kv)

然后 reduce 任务读取文件:

1
2
3
4
5
6
7
8
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}

reduce 任务

reduce 任务负责将所有中间结果数据按照键排序后应用用户自定义的 reduce 函数,并生成最终输出文件,文件格式为 mr-out-X,其中 X 表示 reduce 任务的编号。文件中的每一行对应一个 reduce 函数调用的结果,需要按照 Go%v %v 形式对键值对进行格式化。

在收到所有的中间结果数据之前,reduce 任务无法开始执行,所以在这个期间当工作节点请求 reduce 任务时,如果主节点暂时没有新的 reduce 任务可分发,则可返回一个空任务,工作节点收到响应后则暂时等待一段时间再重新请求任务。reduce 每次收到中间结果数据后会暂存在内存中,如果暂存的中间结果数据的数量等于 map 任务的数量(这个值可以放在 RPC 响应中),则说明所有中间结果数据已经接收完毕,可以开始执行 reduce 任务。

注意

mrworker.go 中注释描述通过 go run mrworker.go wc.so 来运行工作节点,不过如果构建 wc.so 时开启了竞争检测(-race),则运行 mrworker.go 时也同样需要开启竞争检测,否则会提示 cannot load plugin wc.so,如果打印 err 则会显示 plugin.Open("wc"): plugin was built with a different version of package runtime

同样的,如果使用 GoLand 调试程序,由于在调试模式下 GoLand 编译 Go 文件时会增加 -gcflags="all=-N -l" 参数,所以也需要在打包 wc.so 时增加相同的参数。

活锁

在最后的 crash test 遇到个类似活锁的问题,在前文提到,如果某个工作节点开始了 reduce 任务但是还没有接收全部的中间结果数据,则该节点下次申请任务时会继续申请该 reduce 任务(普通工作节点对申请的任务类型没有要求)。在 crash test 下,工作节点在执行用户自定义的 mapreduce 函数时有一定概率结束进程,假设现在有4个工作节点,其中一个在执行 map 任务,另外三个各自在执行 reduce 任务(轮询获取中间结果数据),不幸的是
这个时候 map 节点挂了,此时 test-mr.sh 会自动再启动一个工作节点,然后更不幸的是挂掉的 map 任务在主节点看来还没有到超时时间,所以主节点此时不会分配 map 任务给新的节点(假设没有其他 map 任务),会再分配一个 reduce 任务给新的节点,至此所有工作节点都在执行 reduce 任务,又都在等待中间结果数据完成,却又不可能完成。

造成这个问题的主要原因是任务分配顺序,上述问题下的任务分配顺序是:

  1. 指定的 reduce 任务
  2. 空闲或超时的 map 任务
  3. 空闲或超时的 reduce 任务

解决方法就是把前两个换下顺序即可,即:

  1. 空闲或超时的 map 任务
  2. 指定的 reduce 任务
  3. 空闲或超时的 reduce 任务

参考

Lab 1 虽然是个单机器多线程、多进程的程序,但主节点和工作节点的交互依然通过 RPC 实现,Go 本身也提供了开箱即用的 RPC 功能,下面将通过一个简单的求和服务来了解在 Go 中如何实现一个 RPC 服务。

定义请求体和响应体

请求体和响应体都非常简单,SumRequest 中包含要求和的两个数字,SumReply 中存放求和的结果:

1
2
3
4
5
6
7
8
9
10
package pb

type SumRequest struct {
A int
B int
}

type SumReply struct {
Result int
}

服务端

首先定义服务类 SumService 和提供的方法:

1
2
3
4
5
6
7
8
type SumService struct {
}

func (sumService *SumService) Sum(sumRequest *pb.SumRequest, sumReplay *pb.SumReply) error {
sumReplay.Result = sumRequest.A + sumRequest.B

return nil
}

SumService 只有一个 Sum 方法,接收 SumRequestSumReply 两个参数,求和后将结果放回到 SumReply 中(Sum 的方法签名必须是这样的形式,即两个入参和一个 error 类型的出参,具体规则见下文描述)。

然后进行服务注册:

1
2
3
sumService := &SumService{}
rpc.Register(sumService)
rpc.HandleHTTP()

通过 rpc.Register 这个方法可以知道,一个服务类及其提供的方法必须满足以下条件才能注册成功:

  1. 服务类必须是公共的
  2. 服务类提供的方法必须是公共的
  3. 服务类提供的方法入参必须是两个,一个表示请求,一个表示响应(从编码的角度来说方法入参是两个,但是实际代码是判断是否等于3个,因为在这种场景下定义的方法的第一个入参类似于 Java 中的 this
  4. 服务类提供的方法的第一个参数类型必须是公共的或者是 Go 内置的数据类型
  5. 服务类提供的方法的第二个参数类型也必须是公共的或者是 Go 内置的数据类型,且必须是指针类型
  6. 服务类提供的方法的出参个数只能是1个
  7. 服务类提供的方法的出参类型必须是 error

rpc.HandleHTTP() 表示通过 HTTP 作为客户端和服务端间的通信协议,当客户端发起一个 RPC 调用时,本质上是将要调用的方法和参数包装成一个 HTTP 请求,服务端收到 HTTP 请求后,解码出要调用的本地方法名称和入参,然后调用本地方法,在本地方法调用完成后再将结果写入到 HTTP 响应中,客户端收到响应后,再解析出远程调用的结果。

rpc.HandleHTTP() 本质上是个 HTTP 路由注册,实际上是调用 Handle(pattern string, handler Handler) 方法,当请求路由匹配 pattern 时,会调用对应的 handler 执行,对于 Go RPC 来说,固定路由路径是 /_goRPC_

所以,在完成 HTTP 路由注册后,还需要配合开启一个 HTTP 服务,这样才能接受远程服务调用:

1
2
3
4
5
6
7
8
listener, err := net.Listen("tcp", ":1234")

if err != nil {
log.Fatal("listen error:", err)
}

fmt.Println("Listening on port 1234")
http.Serve(listener, nil)

http.Serve 方法中对于每一个客户端的连接,最终会分配一个 goroutine 来调用 HandlerServeHTTP(ResponseWriter, *Request) 方法来处理,对于 GoRPC 包来说,则可以实现该方法来处理 RPC 请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ServeHTTP implements an http.Handler that answers RPC requests.
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "CONNECT" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
io.WriteString(w, "405 must CONNECT\n")
return
}
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
return
}
io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
server.ServeConn(conn)
}

客户端

对于客户端来说,发起远程方法调用前需要先和服务端建立连接:

1
2
3
4
5
6
7
client, err := rpc.DialHTTP("tcp", ":1234")

if err != nil {
log.Fatal("dialing:", err)
}

defer client.Close()

该方法同时返回了一个 RPC 客户端类,内部同时负责对 RPC 请求的编码和解码。

然后通过 client.Call 来发起远程调用:

1
2
3
4
5
6
7
8
9
10
11
12
sumRequest := &pb.SumRequest{
A: 1,
B: 2,
}
sumReply := &pb.SumReply{}
err = client.Call("SumService.Sum", sumRequest, sumReply)

if err != nil {
log.Fatal("call error:", err)
}

fmt.Println("Result:", sumReply.Result)

这里的调用一共有三个参数,第一个是被调用的远程方法名,需要是 类名.方法名 的形式,后两个则是远程方法的约定入参。

完整的代码可参考 go-rpc-demo

参考

Lab 1 提供了一个串行化的示例 MapReduce 程序,整体分两部分,第一部分是用户自定义的 mapreduce 函数,第二部分是框架代码。

用户自定义 map 和 reduce 函数

以单词计数应用 wc.go 为例,对于 map 函数来说,它的输入键值对类型为 <string, string>,中间结果数据类型为框架定义的 KeyValue 类型,本质上也是个 <string, string> 类型。map 函数首先将文件内容拆分为单词,然后遍历每个单词,输出对应中间结果数据 <w, "1">

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//
// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
//
func Map(filename string, contents string) []mr.KeyValue {
// function to detect word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }

// split contents into an array of words.
words := strings.FieldsFunc(contents, ff)

kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}

reduce 函数的输出类型为 string,其逻辑较为简单,中间结果数组的长度就是单词的个数:

1
2
3
4
5
6
7
8
9
//
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
//
func Reduce(key string, values []string) string {
// return the number of occurrences of this word.
return strconv.Itoa(len(values))
}

框架代码

我们可以通过 go run -race mrsequential.go wc.so pg*.txt 来运行串行化的 MapReduce 程序,这里的 wc.so 内包含了用户自定义的 mapreduce 函数,pg*.txt 则是本次 MapReduce 程序的原始输入数据。

首先,根据入参提供的插件找到用户自定义的 mapreduce 函数:

1
mapf, reducef := loadPlugin(os.Args[1])

接着,依次读取输入文件的内容,并调用用户自定义的 map 函数,生成一组中间结果数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//
// read each input file,
// pass it to Map,
// accumulate the intermediate Map output.
//
intermediate := []mr.KeyValue{}
for _, filename := range os.Args[2:] {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
}

然后,对所有中间结果数据按照键进行排序,而在 MapReduce 论文中,中间结果数据会经过分片函数分发给不同的 reduce 节点,由 reduce 节点自行排序处理:

1
2
3
4
5
6
7
//
// a big difference from real MapReduce is that all the
// intermediate data is in one place, intermediate[],
// rather than being partitioned into NxM buckets.
//

sort.Sort(ByKey(intermediate))

最后,遍历所有中间结果数据,对同键的中间结果数据调用用户自定义的 reduce 函数,并将结果写入到最终输出文件中,同样的,这里也只有一个最终输出文件而不是多个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
oname := "mr-out-0"
ofile, _ := os.Create(oname)

//
// call Reduce on each distinct key in intermediate[],
// and print the result to mr-out-0.
//
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)

// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

i = j
}

ofile.Close()

至此,一个串行化的 MapReduce 程序就完成了。

参考

Lab 1 中遇到的第一个命令是 go build -race -buildmode=plugin ../mrapps/wc.go,其中 -buildmode=plugin 表示以插件的形式打包源文件,这里的 wc.go 是用户实现的 mapreduce 方法,这体现了面向接口编程的思想,只要用户编写的 mapreduce 方法遵循统一的签名,则可以在不重新编译 MapReduce 框架代码的情况下,实时替换运行不同的用户应用。

假设有个 sum.go 文件,里面只有一个 Sum 方法:

1
2
3
4
5
package main

func Sum(a int, b int) int {
return a + b
}

sum.go 以插件形式编译:

1
go build -buildmode=plugin sum.go

会生成一个 sum.so 文件。

接着,在 main.go 中就可以通过 plugin.Open 读取 sum.so

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"log"
"plugin"
)

func main() {
fileName := "sum.so"
p, err := plugin.Open(fileName)

if err != nil {
log.Fatalf("cannot load plugin %v", fileName)
}

sumSymbol, err := p.Lookup("Sum")

if err != nil {
log.Fatalf("cannot find Map in %v", fileName)
}

sum := sumSymbol.(func(int, int) int)

fmt.Println("1 + 2 is", sum(1, 2))
}

然后通过 Lookup 根据方法名找到 Sum 方法,按照指定方法签名转换后即可进行调用。而如果需要换一个 Sum 的实现,则无需重新编译 main.go

参考:

Lab 1 中遇到的第一个命令是 go build -race -buildmode=plugin ../mrapps/wc.go,其中 -race 表示启用 Go 的竞争检测,在多线程编程时,数据竞争是必须要考虑的问题,而数据竞争的问题往往难以察觉和排查,所以 Go 内置了竞争检测的工具来帮助开发人员提前发现问题。另外,MIT 6.824 是一门分布式系统课程,必然会涉及多线程编程,所以竞争检测也是校验 Lab 程序正确性的一种方式。

介绍

Go 借助 goroutine 来实现并发编程,所以数据竞争发生在多个 goroutine 并发读写同一个共享变量时,并且至少有一个 goroutine 存在写操作。来看一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import "fmt"

func main() {
done := make(chan bool)
m := make(map[string]string)
m["name"] = "world"
go func() {
m["name"] = "data race"
done <- true
}()
fmt.Println("Hello,", m["name"])
<-done
}

其中 m 是一个共享变量,被两个 goroutine 并发读写,将上述代码保存为文件 racy.go,然后开启竞争检测执行:

1
go run -race racy.go

会输出类似如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Hello, world
==================
WARNING: DATA RACE
Write at 0x00c00007e180 by goroutine 7:
runtime.mapassign_faststr()
/usr/local/go/src/runtime/map_faststr.go:203 +0x0
main.main.func1()
/path/to/racy.go:10 +0x50

Previous read at 0x00c00007e180 by main goroutine:
runtime.mapaccess1_faststr()
/usr/local/go/src/runtime/map_faststr.go:13 +0x0
main.main()
/path/to/racy.go:13 +0x16b

Goroutine 7 (running) created at:
main.main()
/path/to/racy.go:9 +0x14e
==================
==================
WARNING: DATA RACE
Write at 0x00c000114088 by goroutine 7:
main.main.func1()
/path/to/racy.go:10 +0x5c

Previous read at 0x00c000114088 by main goroutine:
main.main()
/path/to/racy.go:13 +0x175

Goroutine 7 (running) created at:
main.main()
/path/to/racy.go:9 +0x14e
==================
Found 2 data race(s)
exit status 66

竞争检测会提示第13行和第10行存在数据竞争,一共涉及两个 goroutine,一个是主 goroutine,另一个是手动创建的 goroutine

典型数据竞争场景

循环计数器竞争

这个例子中每次循环时会创建一个 goroutine,每个 goroutine 会打印循环计数器 i 的值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"sync"
)

func main() {
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
fmt.Println(i) // Not the 'i' you are looking for.
wg.Done()
}()
}
wg.Wait()
}

我们想要的结果是能输出 01234 这5个值(实际顺序并不一定是 01234),但由于主 goroutinei 的更新和被创建的 goroutinei 的读取之间存在数据竞争,最终的输出结果可能是 55555 也可能是其他值。

如果要修复这个问题,每次创建 goroutine 时复制一份 i 再传给 goroutine 即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"sync"
)

func main() {
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func(j int) {
fmt.Println(j) // Good. Read local copy of the loop counter.
wg.Done()
}(i)
}
wg.Wait()
}

意外地共享变量

在日常开发中,我们可能不经意间在多个 goroutine 间共享了某个变量。在下面的例子中,首先 f1, err := os.Create("file1") 会创建一个 err 变量,接着在第一个 goroutine 中对 file1 写入时会对 err 进行更新(_, err = f1.Write(data)),然而在主 goroutine 中创建 file2 时同样会对 err 进行更新(f2, err := os.Create("file2"),这里虽然用的是 :=,不过 err 并不是一个新的变量,在同一个作用域下是不允许重复对某个同名变量使用 := 创建的,因为 f2 是一个新的变量,所以这里可用 :=),这就产生了数据竞争:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import "os"

// ParallelWrite writes data to file1 and file2, returns the errors.
func ParallelWrite(data []byte) chan error {
res := make(chan error, 2)
f1, err := os.Create("file1")
if err != nil {
res <- err
} else {
go func() {
// This err is shared with the main goroutine,
// so the write races with the write below.
_, err = f1.Write(data)
res <- err
f1.Close()
}()
}
f2, err := os.Create("file2") // The second conflicting write to err.
if err != nil {
res <- err
} else {
go func() {
_, err = f2.Write(data)
res <- err
f2.Close()
}()
}
return res
}

func main() {
err := ParallelWrite([]byte("Hello, world!"))
<-err
}

修复方法也很简单,在每个 goroutine 中使用 := 创建 err 变量即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import "os"

// ParallelWrite writes data to file1 and file2, returns the errors.
func ParallelWrite(data []byte) chan error {
res := make(chan error, 2)
f1, err := os.Create("file1")
if err != nil {
res <- err
} else {
go func() {
// This err is shared with the main goroutine,
// so the write races with the write below.
_, err := f1.Write(data)
res <- err
f1.Close()
}()
}
f2, err := os.Create("file2") // The second conflicting write to err.
if err != nil {
res <- err
} else {
go func() {
_, err := f2.Write(data)
res <- err
f2.Close()
}()
}
return res
}

func main() {
err := ParallelWrite([]byte("Hello, world!"))
<-err
}

未受保护的全局变量

某个包下对外暴露了 RegisterServiceLookupService 两个方法,而这两个方法会对同一个 map 变量进行读写,客户端调用时有可能多个 goroutine 并发调用,从而存在数据竞争:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"net"
)

var service map[string]net.Addr = make(map[string]net.Addr)

func RegisterService(name string, addr net.Addr) {
service[name] = addr
}

func LookupService(name string) net.Addr {
return service[name]
}

func main() {
go func() {
RegisterService("hello", &net.IPAddr{IP: net.ParseIP("127.0.0.1")})
}()
go func() {
fmt.Println(LookupService("hello"))
}()
}

可以通过 sync.Mutex 来保证可见性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"net"
"sync"
)

var (
service map[string]net.Addr = make(map[string]net.Addr)
serviceMu sync.Mutex
)

func RegisterService(name string, addr net.Addr) {
serviceMu.Lock()
defer serviceMu.Unlock()
service[name] = addr
}

func LookupService(name string) net.Addr {
serviceMu.Lock()
defer serviceMu.Unlock()
return service[name]
}

func main() {
go func() {
RegisterService("hello", &net.IPAddr{IP: net.ParseIP("127.0.0.1")})
}()
go func() {
fmt.Println(LookupService("hello"))
}()
}

未受保护的基本类型变量

除了 map 这样的复杂数据类型外,基本类型变量同样会存在数据竞争,如 boolintint64。例如在下面的例子中,主 goroutinew.last 的更新和创建的 goroutine 中对 w.last 的读取间存在数据竞争:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"os"
"time"
)

type Watchdog struct{ last int64 }

func (w *Watchdog) KeepAlive() {
w.last = time.Now().UnixNano() // First conflicting access.
}

func (w *Watchdog) Start() {
go func() {
for {
time.Sleep(time.Second)
// Second conflicting access.
if w.last < time.Now().Add(-10*time.Second).UnixNano() {
fmt.Println("No keepalives for 10 seconds. Dying.")
os.Exit(1)
}
}
}()
}

func main() {
watchdog := &Watchdog{}
watchdog.Start()
watchdog.KeepAlive()
select {}
}

我们依然可以借助 channelsync.Mutex 来修复这个问题,不过类似于 JavaGo 中同样有相应的原子变量来处理基本类型的并发读写,上述例子就可以通过原子包下的 atomic.StoreInt64atomic.LoadInt64 来解决:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"os"
"sync/atomic"
"time"
)

type Watchdog struct{ last int64 }

func (w *Watchdog) KeepAlive() {
atomic.StoreInt64(&w.last, time.Now().UnixNano())
}

func (w *Watchdog) Start() {
go func() {
for {
time.Sleep(time.Second)
if atomic.LoadInt64(&w.last) < time.Now().Add(-10*time.Second).UnixNano() {
fmt.Println("No keepalives for 10 seconds. Dying.")
os.Exit(1)
}
}
}()
}

func main() {
watchdog := &Watchdog{}
watchdog.Start()
watchdog.KeepAlive()
select {}
}

未同步的 send 和 close 操作

虽然对一个 channel 的发送和相应的读取完成之间存在 happens-before 的关系,但是对 channel 的发送和关闭间并没有 happens-before 的保证,依然存在数据竞争:

1
2
3
4
5
6
7
8
9
10
11
package main

func main() {
c := make(chan struct{}) // or buffered channel

// The race detector cannot derive the happens before relation
// for the following send and close operations. These two operations
// are unsynchronized and happen concurrently.
go func() { c <- struct{}{} }()
close(c)
}

所以在关闭 channel 前,增加对 channel 的读取操作来保证数据发送完成:

1
2
3
4
5
6
7
8
9
package main

func main() {
c := make(chan struct{}) // or buffered channel

go func() { c <- struct{}{} }()
<-c
close(c)
}

竞争检测的开销

在开启竞争检测的情况下,程序的内存使用可能会增加5到10倍,性能可能会增加2到20倍,所以一般建议在压力测试或集成测试阶段开启,或者线上集群选择某台机器单独开启。

参考

看汇编语言时看到,标志寄存器中 CF 标志位表示无符号数运算时是否向最高有效位外的更高位产生进位或借位,而 OF 标志位表示有符号数运算时是否产生溢出。这里存在两个疑问:

  1. 对于 CPU 来说,它并不区分处理的是无符号数还是有符号数,那什么时候设置 CF,什么时候设置 OF
  2. CF 表示进位时也是一种溢出,能否和 OF 共用一个

CF

首先来看 CF 进位的例子,这里我们以8位无符号数为例,其最大值为255,那么计算 255 + 1 则会产生进位。可以通过一段简单的汇编代码进行验证:

1
2
3
4
5
6
7
8
.section .text
.globl _start
_start:
mov $255, %al
add $1, %al
movl $1, %eax
movl $0, %ebx
int $0x80

在上述代码中,al 是一个8位寄存器,是 eax 寄存器的低8位,这里首先将255放到 al 寄存器内,然后对 al 寄存器中的值加1并放回到 al 寄存器中,即实现 255 +1 的运算。

最后的 int $0x80 中的 int 表示 interrupt,即中断,当发生一个中断时会有一个与之对应的中断处理程序来处理,这里的 $0x80 就是声明由哪个中断处理程序处理,在 Linux 中,$0x80 对应的是操作系统内核,用于发起一个系统调用,而具体发起哪个系统调用则由 eax 中的值决定,这就是 movl $1, %eax 的作用,1对应的系统调用是 exit,用于退出程序,而程序退出时会伴有一个状态码,这个状态码的值来自于 ebx,也就是 movl $0, %ebx 的作用,这里使用0来表示程序正常退出。

接下来我们借助 gdb 来观察程序运行时 CF 的值的变化。首先将上述代码保存为 demo.s 后进行编译:

1
as demo.s -o demo.o -gstabs+

这里的 -gstabs+ 表示生成机器码时同时生成调试信息,如果没有这个选项后续 gdb 加载时会提示 (No debugging symbols found in ./demo)

然后进行链接:

1
ld demo.o -o demo

这个时候就可以通过 gdb 加载生成的可执行文件:

1
gdb ./demo

alt

然后输入 break 4 在代码第四行设置一个断点,即 mov $255, %al 处,最后输入 run 开始调试执行:

alt

此时可输入 layout reg 来观察各寄存器内的值,我们需要关注的是 eflags 寄存器,它展示了哪些标志位生效了:

alt

或者通过执行 info registers eflags 来查看 eflags 的值:

1
2
(gdb) info registers eflags
eflags 0x202 [ IF ]

目前只有一个 IF 标志位,它用于表示是否响应中断。

接着,输入 next 来执行当前断点所在处的指令,可以看到,执行后 rax 寄存器内的值变成了255(rax 是64位 CPU 下的一个通用寄存器,32位 CPU 下对应为 eax):

alt

再输入一次 next 来执行加法运算,此时 rax 中的值变为了0(实际的二进制结果应该是100000000,因为 al 寄存器最多只能表示8位,所以最高位的1无法表示,最终结果为0),eflags 中出现了 CF 标志位,说明发生了进位:

alt

rax 中的值为0也说明了加法运算后产生的进位并不会体现在比参与运算的寄存器位数更多的寄存器中,否则 rax 中的值应该是256。

再来看借位,将程序稍加修改执行一个 1 - 2 的运算:

1
2
3
4
5
6
7
8
.section .text
.globl _start
_start:
mov $1, %al
sub $2, %al
movl $1, %eax
movl $0, %ebx
int $0x80

最后 rax 中的值为255(存在高位借位的情况下最后的二进制结果为11111111,解释为无符号数为255),eflags 中同样出现了 CF 标志位。

alt

所以,CF 的标记取决于两个二进制数的运算是否产生进位或借位。

OF

有符号数的溢出分两种情况,一种是运算结果应该是正数却返回负数,另一种是运算结果应该是负数却返回正数。

首先来看两个正数运算得到负数的例子,同样对代码稍加修改实现 127 + 1 的运算:

1
2
3
4
5
6
7
8
.section .text
.globl _start
_start:
mov $127, %al
add $1, %al
movl $1, %eax
movl $0, %ebx
int $0x80

最后 rax 中的值为128(对应二进制表示为10000000,以有符号数的角度来看,其值为-128,即两个正数相加得到一个负数),eflags 中出现了 OF 标志位,说明发生了溢出:

alt

从有符号数的角度来看,参与运算的两个数的符号位都是0,相加后符号位却是1,所以 OF 设置为1。

再来看两个负数运算得到正数的例子,再次对代码稍加修改实现 -128 - 1 的运算,-128的二进制补码表示为10000000,即无符号数角度下的128,-1的二进制补码表示为11111111,即无符号数角度下的255:

1
2
3
4
5
6
7
8
.section .text
.globl _start
_start:
mov $128, %al
add $255, %al
movl $1, %eax
movl $0, %ebx
int $0x80

最后 rax 中的值为127(对应二进制表示为01111111,以有符号数的角度来看,其值为127,即两个负数相加得到一个正数),eflags 中出现了 OF 标志位,说明发生了溢出:

alt

从有符号数的角度来看,参与运算的两个数的符号位都是1,相加后符号位却是0,所以 OF 设置为1。

所以,OF 的标记取决于运算结果的符号位是否发生变化,这里的变化指的是两个相同符号位的数的运算结果是一个不同符号位的数。

比较

注意到前面有符号数 -128 - 1 运算的例子中,最后 CFOF 都被设置为了1,说明 CFOF 并不是互斥的关系,在这个例子中即发生了进位又发生了符号位的变更,也就是说如果满足了设置 CF 的条件,那么 CF 就是1,如果满足了设置 OF 的条件,那么 OF 就是1。因此,回到文章开头的问题,CPU 并不是去判断该设置 CF 还是 OF,而是只要条件满足就会设置对应的标志位,而具体应该关注哪个标志位,则交由编译器去判断,因为对 CPU 而言它处理的只是比特运算,只有编译器知道当前的运算数是无符号数还是有符号数。

另外,CFOF 也不能合二为一,无法相互替代,例如两个无符号数相加 CF 有可能是0,但是 OF 却是1,如 127 + 1;两个有符号数相加 OF 有可能是0,但是 CF 却是1,如 -1 - 1。也有可能 CFOF 都是1,如有符号数运算 -128 - 1

参考

0%