介绍

和一般描述的应用级别的主从备份不同,本文描述的是虚拟机的主从备份。主从备份作为一种常见的容错实现手段,当主节点异常时,从节点能取代主节点从而保证系统依然可用。作为从节点,它的状态必须尽可能的与主节点随时保持一致,这样当主节点异常时从节点能马上取代主节点,而客户端也不会感知到异常,同时也没有数据丢失。其中一种同步主从节点状态的方式是持续将主节点的所有修改发送给从节点,这里的修改包括 CPU、内存以及 IO 设备。然而,采用这种同步方式需要大量的网络带宽,尤其是发送内存的修改。

另一种只需要耗费少量带宽的方式是状态机(state machine)同步。该方法将主从同步抽象为确定性状态机(deterministic state machine)同步问题,在确定性状态机模型下,对于两个初始状态一样的状态机来说,按照相同的顺序执行相同的一系列输入指令后,最后的状态也一定是相同的。然而,对于大部分的服务来说,存在某些非确定性的操作,例如生成一个随机数,这时候就需要额外的协调使得主从间依然是同步的,即从节点也要生成一模一样的随机数。不过,处理这种情况所需要维护的额外信息相比于主节点状态的修改(主要是内存的修改)来说不值一提。

对于物理机来说,随着主频的增加,同步主从间的确定性操作也愈发困难。然而对于运行在 hypervisor 上的虚拟机来说却非常适合实现状态机同步。一个虚拟机本身就可以看做一个明确的状态机,它的所有操作就是被虚拟化的机器的操作(包括所有的设备)。和物理机一样,虚拟机也存在一些非确定性的操作(例如读取当前时间或者发送一个中断),所以也需要发送额外的信息给从节点来保证主从同步。因为 hypervisor 掌管着虚拟机的执行,包括发送所有的输入给被虚拟化的机器,所以它能捕获到执行非确定性操作的所有需要的信息,从而能正确的在从节点上执行重放操作。

因此,基于状态机同步的主从同步方式可以在不需要修改硬件的情况下在廉价的硬件上实现,使得容错技术适用于最新的微处理器。另外,对带宽较低的要求使得长距离的虚拟机主从同步成为了可能。例如,可以在跨校园间不同的物理机上做主从同步,相比于同大厦内的主从同步更为可靠。

目前在 VMware vSphere 4.0 平台上已经实现了这种容错技术,该平台能高效完整的虚拟化 x86 架构的机器。因为 VMware vSphere 实现了一个完全的 x86 虚拟机,所以可以自动的对任何 x86 的操作系统和应用提供容错支持。通过确定性重放(deterministic replay),系统可以记录下主节点的执行并且确保能在从节点执行相同的操作。VMware vSphere Fault Tolerance (FT) 在此基础之上增加了额外的功能和协议来支持构建一个可完全容错的系统。除了对硬件的容错外,当主节点异常时,系统能自动的在本地集群中启动一台可用的从节点来接管主节点。在该篇论文发表的时候,确定性重放技术和 VMware FT 仅支持单核的虚拟机。受限于严重的性能问题,多核虚拟机的重放支持仍在进行中,因为在多核场景下,几乎每一个对共享内存的访问都是一个非确定性的操作。

BressoudSchneider 针对惠普的 PA-RISC 平台的虚拟机容错做了个原型实现。VMware 的实现与其类似,不过出于性能的考虑做了些根本的修改以及调研了一些其他实现方案。另外,为了能构建一个高效、可用的容错系统来支持用户的企业级应用,VMware 还设计和实现了许多其他组件以及解决一些实际的问题。和大多数实际的系统要解决的问题一样,这里的容错针对的是 fail-stop 的异常,即在造成外部可见的不正确的行为前可被监测到的异常,例如磁盘空间不足、网络无法连通等等,而诸如应用程序的 bug 或者人为失误等则不属于 fail-stop 异常,系统也无法进行容错。

基础设计

alt

上图展示了支持容错的虚拟机的基本配置。对于每一台需要支持容错的虚拟机(primary VM),系统会在其他物理机上同时运行一台备份虚拟机(backup VM),备份虚拟机和主虚拟机会保持同步,并执行和主虚拟机相同的指令,不过会存在一定的延迟。这两台虚拟机被称为处于 virtual lockstep 状态。同时,虚拟机连接着相同的共享存储,输入和输出都可以被主从虚拟机访问。不过,只有主虚拟机才会暴露在网络中,所以所有的网络输入都只会发送给主虚拟机。同样的,其他所有的输入(例如键盘和鼠标输入)也都只会发送给主虚拟机。

主虚拟机收到的所有输入都会通过 logging channel 发送给从虚拟机。对系统来说,主要的输入负载就是网络和磁盘。为了保证从虚拟机能和主虚拟机执行相同的非确定性操作,还需要发送一些额外的信息给从虚拟机。从结果上来说,从虚拟机会始终执行和主虚拟机相同的操作。不过,所有从虚拟机的输出都会被 hypervisor 丢弃,只有主虚拟机的输出才会返回给客户端。后面会提到,主从虚拟机间的通信会遵循一个特定的协议,包括从虚拟机对消息的确认,来保证当主虚拟机异常时不会发生数据丢失。

为了监测主虚拟机或者从虚拟机是否发生异常,系统会通过和主从虚拟机间的心跳以及 logging channel 的流量来判断。另外,系统必须保证在任一时间只有一台主虚拟机或者从虚拟机作为对外执行的入口,即使主虚拟机和从虚拟机间失联发生脑裂的场景。

确定性重放的实现

在前面提到过,主从虚拟机的同步可以抽象为确定性状态机同步问题。如果两个确定性状态机以相同的初始状态启动,并且按照相同的顺序执行相同的输入,那么这两个状态机会经历相同的状态流转并输出相同的结果。一台虚拟机会有一系列的输入,包括网络包,磁盘读取,以及键盘和鼠标输入。而非确定性的事件(例如虚拟中断(virtual interrupts))和非确定性的操作(例如读取当前处理器的时钟周期数)也会影响虚拟机的内部状态。这就给重放执行一台运行着任意操作系统和任意服务的虚拟机带来了3个挑战:

  1. 需要正确的捕捉到主虚拟机的所有输入和非确定性的操作
  2. 需要正确的将输入和非确定性操作在从虚拟机上重放
  3. 不能影响系统性能

另外,x86 处理器中很多复杂的操作往往伴有副作用,因此也是非确定性的操作,如何捕捉到这些非确定性的操作并正确的在从虚拟机上重放也是一个挑战。

VMware vSphere 平台为 x86 虚拟机提供了上述的重放功能。确定性重放技术会将主虚拟机的所有输入和所有可能的非确定性操作写入到日志中。从虚拟机就可以读取日志并执行和主虚拟机一样的操作。对于非确定性的操作来说,系统会写入一些额外的信息来保证重放时生成相同的虚拟机状态和输出。对于非确定性的事件例如计时器或者 IO 完成中断,在事件发生时所执行的指令也会记录在日志中。在重放时,事件会和指令一同出现在指令流中。借助联合 AMDIntel 开发的 hardware performance counter(一组特殊寄存器用来记录硬件相关事件发生的次数)和其他技术,VMware 确定性重放技术能高效的记录和重放非确定性的事件。

BressoundSchneider 在其实现中提到将虚拟机的执行以 epoch 为单位进行切分,其中所有的非确定性操作例如中断都放在 epoch 的最后。这个想法是出于批量处理的考虑,因为单独将每一个中断和中断发生时对应的指令重放执行代价较大。不过 VMware 的实现足够高效使得不需要借助 epoch 来实现确定性重放,每一个中断都能被准确的记录并伴随着发生中断时的指令一起执行。

FT 协议

VMware FT 使用确定性重放技术将主虚拟机的执行流记录到日志中,不过主虚拟机并不是将日志写入到磁盘上,而是通过 logging channel 将日志发送给从虚拟机。从虚拟机能实时的读取日志并将其重放,从而执行和主虚拟机一样的操作。然而,双方在 logging channel 的通信必须遵循 FT 协议来保证容错。其中一条基本的要求是:

Output Requirement:当主虚拟机异常,从虚拟机接管执行时,从虚拟机的输出必须和先前主虚拟机已经发送给客户端的输出一致。

当异常发生时(主虚拟机异常,从虚拟机接管执行),从虚拟机的执行可能会和在没有异常发生时主虚拟机的执行不同,因为在执行时会有很多非确定性的操作。然而,只要从虚拟机的输出满足 Output Requirement,则在主从切换时就不会有外部可见的状态或者数据丢失,而客户端也不会感知到中断或者服务的不一致。

通过主虚拟机的延迟输出,保证从虚拟机确认收到了所有日志后,主虚拟机才将输出返回给客户端来实现 Output Requirement。一个先决的条件是主虚拟机在执行输出操作前,从虚拟机必须已经收到所有的日志。这些日志能保证从虚拟机执行到主虚拟机最新的执行点。然而,假设当主虚拟机刚开始执行输出操作时发生了异常,此时发生了主从切换,从虚拟机必须先将未处理完的日志进行重放,然后才能 go live(不再执行重放,接管成为主虚拟机)。如果在这之前从虚拟机 go live,可能会有一些非确定性的事件(例如计时器中断)在从虚拟机执行输出操作前改变了执行的路径。

针对上述的要求,最简单的实现 Output Requirement 的方式是为每一条输出操作创建一条特殊的日志,从而可以通过以下规则来保证 Output Requirement

Output Rule:在从虚拟机确认收到输出操作对应的日志前,主虚拟机不能执行输出操作。

如果从虚拟机收到了所有的日志,包括输出操作对应的日志,那么从虚拟机就能重放出和主虚拟机在执行输出操作时一模一样的状态,当主虚拟机异常时,从虚拟机就能恢复到主虚拟机执行输出操作前一致的状态。相反的,如果从虚拟机在没有收到日志前就接管了主虚拟机的操作,那么它的状态就和主虚拟机不一致,从而导致最终的输出也不一致。

注意 Output Rule 并没有要求在从虚拟机确认收到输出日志前停止主虚拟机的执行。这里只是延迟了主虚拟机的输出,它依然可以执行其他指令。因为操作系统以异步中断的方式来通知非阻塞网络和磁盘输出的完成,主虚拟机可以在这期间轻易的执行其他指令,而不用阻塞等待。相反的,在其他的一些实现中,在从虚拟机确认收到输出日志前,主虚拟机必须完全停止等待。

alt

上图展示了 FT 协议的要求。主虚拟机到从虚拟机的箭头表示日志的发送,从虚拟机到主虚拟机的箭头表示日志的确认。所有异步事件,输入和输出的操作都必须发送给从虚拟机并得到确认。只有当从虚拟机确认了某条输出操作的日志后,主虚拟机才能执行输出操作。所以只要遵循了 Output Rule,从虚拟机在接管执行时就能保持和主虚拟机一致的状态。

不过在异常发生时,VMware FT 并不能保证所有的输出只发送一次。在缺少两阶段提交的帮助下,从虚拟机不能知晓主虚拟机在发送某条输出之前还是之后发生了异常。不过,网络协议(包括常见的 TCP 协议)在设计时就已经考虑了包的丢失和重复包的情况,所以这里无需特殊处理。另外,在主虚拟机异常时发送给主虚拟机的输入也有可能丢失,因此从虚拟机也会丢失这部分的输入。不过,即使在主虚拟机没有异常的情况下,网络包本身就有可能丢失,所以这里同样也不需要特殊处理,不管是网络协议、操作系统还是应用程序,在设计和编写时本身已经考虑到了包丢失的情况。

监测和响应异常

之前提到过,当主虚拟机或者从虚拟机发生异常时,双方都必须能快速响应。当从虚拟机异常时,主虚拟机会进入 go live 模式,即不再记录执行日志,以常规的方式执行。当主虚拟机异常时,从虚拟机也会进入 go live 模式,不过相比于主虚拟机略微复杂些。因为从虚拟机在执行上本身就落后于主虚拟机,在主虚拟机异常时,从虚拟机已经收到和确认了一部分执行日志,但是还没有执行重放,此时从虚拟机的状态和主虚拟机还是不一致的。所以,从虚拟机必须先将暂存的日志进行重放,当所有重放都执行完成后,从虚拟机就会进行 go live 模式,正式接管主虚拟机(此时缺少一个从虚拟机)。因为此时这台从虚拟机不再是从虚拟机,被虚拟化的操作系统所执行的输出操作都会发送给客户端。在这个转换期间,可能也需要某些设备执行一些特定的操作来保证后续输出的正确性。特别是对于网络输出来说,VMware FT 会自动的将新的主虚拟机的 MAC 地址在网络中广播,使得物理交换机知道最新的主虚拟机的地址。另外,后文会提到新的主虚拟机可能会重新发起一些磁盘 IO 操作。

有很多种方式来监测主虚拟机和从虚拟机的异常。VMware FT 通过 UDP 心跳来监测启用了容错的虚拟机是否发生了异常。另外,VMware FT 还会监控 logging channel 中的流量,包括主虚拟机发送给从虚拟机的日志,以及从虚拟机的消息确认回执。因为操作系统本身存在时钟中断,所以理论上来说 logging channel 中的流量应该是连续不断的。因此,如果监测到 logging channel 中没有流量了,那么就可以推断出某台虚拟机发生了异常。如果没有心跳或者 logging channel 中没有流量超过一段指定的时间(近似几秒钟),那么系统就会声明这台虚拟机发生了异常。

然而,这种异常监测机制可能会引发脑裂问题。如果从虚拟机不再接收到来自主虚拟机的心跳,那么有可能说明主虚拟机发生了异常,但也有可能只是双方间的网络断开。如果此时从虚拟机进入 go live 模式,由于此时主虚拟机依然存活,就有可能给客户端造成数据损坏或其他问题。因此,当监测到异常时必须保证只有一台主虚拟机或者从虚拟机进入 go live 模式。为了解决脑裂问题,VMware FT 借助了虚拟机所连接的共享存储。当主虚拟机或者从虚拟机希望进入 go live 模式时,它会向共享存储发起一个原子性的 test-and-set 操作。如果操作成功,那么当前虚拟机可以进入 go live 模式,如果操作失败,说明已经有其他虚拟机先进入了 go live 模式,所以当前虚拟机就将自己挂起。如果虚拟机访问共享存储失败,那么它会一直等待直到访问成功。如果共享存储由于网络问题造成无法访问,那么虚拟机本身也做不了什么因为它的虚拟磁盘就挂载在共享存储上。所以使用共享存储来解决脑裂问题不会带来其他可用性问题。

最后一个设计的点是如果虚拟机发生了异常,使得某台虚拟机进入了 go live 模式,那么 VMware FT 会自动在其他物理机上启动一台新的备份虚拟机。

FT 的实际实现

上节主要描述了 FT 的基础设计和协议。然而,为了构建一个可用,健壮和自动化的系统,还需要设计和实现很多其他的组件。

启动和重启 FT 虚拟机

其中一个至关重要的组件是如何启动一台有着和主虚拟机一模一样状态的备份虚拟机。这个同时也会在当某台虚拟机异常需要重新启动一台备份虚拟机时用到。因此,这个组件必须能够在主虚拟机处于任意状态的时候复制一台一模一样的备份虚拟机(而不仅仅是初始状态)。另外,这个启动操作还不能影响到主虚拟机的执行,因为这有可能影响到当前所有连接着的客户端。

对于 VMware FT 来说,它借用了 VMware vSphere 平台已有的 VMotion 的功能。VMware VMotion 能以极小的代价将一台运行中的虚拟机迁移到另一台机器上——虚拟机的暂停时间一般在一秒内。VMware FTVMotion 做了些改动使得在不销毁当前虚拟机的情况下,在远程服务器上复制一台和当前虚拟机一模一样的虚拟机。即修改版的 FT VMotion 做的是虚拟机的复制而不是迁移。FT VMotion 也会同时建立一条 logging channel,源虚拟机作为主虚拟机就会将执行日志写入到 logging channel,而复制后的虚拟机作为从虚拟机就会读取日志开始重放执行。和常规的 VMotion 一样,FT VMotion 也能将对主虚拟机的暂停控制在一秒以内。因此,对某台运行中的虚拟机开启 FT 功能非常简单,且没有破坏性。

另一个启动一台备份虚拟机要考虑的点是选择在哪台物理机上启动。支持容错的虚拟机运行在某个可访问共享存储的集群中,所以本质上虚拟机可以运行在任意一台机器上。这个灵活性使得 VMware vSphere 可以轻易的为一台或多台异常的虚拟机启动新的备份虚拟机。VMware vSphere 实现了一套集群服务来管理集群中的资源。当虚拟机发生异常主虚拟机需要一台新的虚拟机来维持容错时,主虚拟机会通知集群服务需要一台新的备份虚拟机。此时集群服务会根据资源利用率和其他条件来决定在哪台机器上重启新的备份虚拟机,然后会由 FT VMotion 创建一台新的备份虚拟机。一般来说,VMware FT 可以在发生异常的几分钟内恢复某台虚拟机的冗余功能,而不会造成任何中断。

管理 Logging Channel

在管理 logging channel 的流量上有很多有趣的实现细节。在 VMware 的实现里,hypervisor 为主虚拟机和从虚拟机维护了一大块日志缓冲区。在主虚拟机执行时,它将执行日志发送到日志缓冲区中,类似的,从虚拟机从日志缓冲区中消费日志。每当主虚拟机的日志缓冲区中有数据,系统就会将其发送到 logging channel 中,相应的在另一边则会将其放入到从虚拟机的日志缓冲区中。当从虚拟机从 logging channel 中读取到日志并将其放入日志缓冲区后,它就会向主虚拟机发送一个已读消息的回执。VMare FT 就可以根据这个已读回执决定哪些输出操作可以执行了。下图展示了这个过程:

alt

当从虚拟机从日志缓冲区读取不到任何日志时(日志缓冲区为空),它就会暂停执行直到下一条日志到达。因为从虚拟机不和外界交互,这个暂停对客户端没有任何影响。类似的,如果主虚拟机往日志缓冲区中写日志发现日志缓冲区满时,它必须停止执行直到日志缓冲区被消费。这个暂停保证了主虚拟机以一个可控的速度生产执行日志。不过,这个暂停会影响客户端的响应,直到主虚拟机可以继续写日志并恢复执行。所以,在实现时必须考虑如何尽量避免主虚拟机的日志缓冲区写满。

其中一个原因造成主虚拟机的日志缓冲区写满是因为从虚拟机执行的太慢从而造成消费日志太慢。一般来说,从虚拟机必须以和主虚拟机记录执行日志一样的速度来执行重放。幸运的是,在 VMware FT 的实现下,记录执行日志和重放所需要的时间基本是相同的。不过,如果从虚拟机所在的机器存在和其他虚拟机资源竞争(资源超卖),不管 hypervisor 的虚拟机调度多么高效,从虚拟机都有可能得不到足够的 CPU 和内存资源来保证和主虚拟机一样的速度执行重放。

除了主虚拟机日志缓冲区满造成的不可控暂停外,还有一个原因也要求主从虚拟机间的状态不能差太远。当主虚拟机异常时,从虚拟机必须尽快的将所有的执行日志进行重放,达到和主虚拟机一样的状态,然后接管主虚拟机向客户端提供服务。结束重放的时间基本上等于异常发生时从虚拟机落后主虚拟机的时间,所以从虚拟机进入 go live 模式需要的时间就基本上等于检测出异常的时间加上当前从虚拟机落后的时间。因此,从虚拟机不能落后主虚拟机太多(大于一秒),否则这会大大增加故障切换的时间。

因此,VMware FT 有另一套机制来保证从虚拟机不会落后主虚拟机太多。在主从虚拟机间的通信协议里,还会发送额外的信息来计算两者间的执行时间差。一般来说这个时间差在100毫秒以内。如果从虚拟机开始明显落后主虚拟机(例如大于1秒),那么 VMware FT 会通知调度器降低主虚拟机的 CPU 资源配额(初始减少几个百分点)来延缓主虚拟机的执行。VMware FT 会根据从虚拟机返回的落后时间来不断调整主虚拟机的 CPU 资源配额,如果从虚拟机一直落后,那么 VMware FT 会逐渐减少主虚拟机的 CPU 资源配额。相反的,如果从虚拟机开始赶上了主虚拟机的执行速度,那么 VMware FT 会逐渐增加主虚拟机的 CPU 资源配额,直到两者的执行时间差到达一个合理的值。

不过在实际场景中减慢主虚拟机执行的速度非常少见,一般只会发生在系统承受极大负载的情况下。

FT 虚拟机上的操作

另一个实际中要考虑的问题是处理针对主虚拟机的一系列控制操作。例如,如果主虚拟机主动关机了,从虚拟机也需要同步关机,而不是进入 go live 模式。另外,所有对主虚拟机的资源修改(例如增加 CPU 资源配额)都必须应用到从虚拟机上。针对这些操作,系统会将其转化为特殊的执行日志发送到 logging channel,从虚拟机收到后也会将其正确的重放。

一般来说,大部分对虚拟机的操作都只应该在主虚拟机上发起。然后 VMware FT 会将其转化为日志发送给从虚拟机来进行重放。唯一可以独立的在主虚拟机和从虚拟机上执行的操作是 VMotion。即主虚拟机和从虚拟机都可以独立的被复制到其他机器上。VMware FT 保证了在复制虚拟机时不会将其复制到一台已经运行了其他虚拟机的机器上,因为这无法提供有效的容错保证。

复制一台主虚拟机要比迁移一台普通的虚拟机复杂些,因为从虚拟机需要先和主虚拟机断开连接,然后之后在合适的时间和新的主虚拟机建立连接。从虚拟机的复制有类似的问题,不过也略微复杂些。对于普通的虚拟机迁移来说,系统会要求当前所有进行中的磁盘 IO 在切换前执行完成。对于主虚拟机来说,它可以等待所有进行中的磁盘 IO 的完成通知。而对于从虚拟机来说,它不能简单的在某个时间让所有的磁盘 IO 都结束,因为从虚拟机还在执行重放,也需要等待重放涉及的磁盘 IO 完成。而另一方面,主虚拟机在执行时有可能一直伴有磁盘 IOVMware FT 有一个特有的方法来解决这个问题,当从虚拟机在执行切换前,会通过 logging channel 向主虚拟机发送请求要求暂时结束所有的磁盘 IO,主虚拟机收到请求后会天然的将其转化为执行日志发送到 logging channel,从虚拟机的磁盘 IO 也因此会伴随着重放而终止。

磁盘 IO 的实现问题

VMware 在实现时遇到了一些和磁盘 IO 相关的问题。首先,因为磁盘操作是非阻塞的所以可以并行执行,同一时间访问同一块磁盘区域就会导致非确定性结果。另外,磁盘 IO 会通过 DMADirect memory access) 来直接访问虚拟机的内存,因此同一时间的磁盘操作如果访问到了同一块内存页也会导致非确定性结果。VMware 通过监测这样的 IO 竞争(实际场景中极少)并强制让其在主虚拟机和从虚拟机中串行执行来解决这个问题。

第二,磁盘操作访问内存可能和应用程序访问内存产生竞争,因为磁盘操作可以通过 DMA 直接访问内存。例如,如果应用程序正在访问某个内存块,而此时磁盘也在写入这个内存块,则会发生非确定性的结果。虽然这种情况同样很少,但也仍然需要能够监测并解决。其中一种解决方案是当磁盘正在访问某个内存页时暂时对该页设置页保护。当应用程序尝试访问页保护的内存页时,会触发一个陷阱(trap)使得操作系统能够暂停执行直到磁盘操作完成。不过,修改 MMUMemory management unit)的页保护机制代价较大,VMware 借助 bounce buffer 来解决这个问题。bounce buffer 是一块和磁盘操作正在访问的内存一样大小的临时缓冲区。磁盘的读操作被修改为磁盘先将数据复制到 bounce buffer 中,然后虚拟机读取 bounce buffer 中的数据,而且只有在磁盘 IO 完成后才会将 bounce buffer 中的数据复制到虚拟机内存中。类似的,对于磁盘写操作,数据会先写到 bounce buffer 中,磁盘再将数据从 bounce buffer 中复制到磁盘上。使用 bounce buffer 的情况下会降低磁盘的吞吐,不过实际中还没有发现造成可见的性能损耗。

第三,还有些问题发生于主虚拟机正在执行某些磁盘 IO(未完成),然后主虚拟机异常,从虚拟机接管执行。对于从虚拟机来说,它并不知道这些磁盘 IO 是否已发送给磁盘或者已成功执行。另外,由于这些磁盘 IO 还没有在从虚拟机上发起过,所以也不会相应的 IO 完成的通知,但是在操作系统的角度指令已经发出了,但是从虚拟机上的操作系统也收不到 IO 完成的通知,最终会终止或者重置这个过程。在这种情况下,系统会为每一个 IO 操作发送一个失败的通知,即使某个 IO 操作实际成功了而返回失败也是可以接受的。然而,由于从虚拟机上的操作系统可能不能很好的响应 IO 失败的通知,所以在从虚拟机 go live 阶段会重新发起这些 IO 请求。因为系统已经消除了并发 IO 间的竞争,所以这些 IO 操作可以重新发起即使它们之前已经成功执行了(操作是幂等的)。

网络 IO 的实现问题

VMware vSphere 为虚拟机的网络提供了很多的性能优化。部分优化基于 hypervisor 能异步更新虚拟机的网络设备的状态。例如,在虚拟机还在执行时 hypervisor 就可以直接更新接收缓冲区。不过,这种异步更新同样也带来了非确定性。除非能保证在异步更新的时间点在主虚拟机上执行的指令能严格一致的在从虚拟机上重放,否则从虚拟机的状态就会和主虚拟机不一致。

FT 中对网络代码改动最大的一块就是禁止了异步网络更新优化。从原先的异步更新缓冲区修改为强制陷入到陷阱(trap)中,hypervisor 响应后将更新记录到日志中,然后再将其应用到虚拟机上。类似的,异步从缓冲区中拉取数据包也同样被禁止了,同样由 hypervisor 托管执行。

消除了异步的网络设备状态更新以及前面所提到的 Output Rule 带来的延迟输出,为网络性能优化又提出了挑战。VMware 通过两个方面来优化网络性能。第一,实现了集群优化来减少虚拟机的陷阱和中断。当虚拟机在接收网络包时,hypervisor 可以在一个陷阱、中断内处理多组数据包来减少陷阱和中断的次数。

第二个网络优化是降低数据包延迟发送的时间。前面提到,只有当从虚拟机确认收到所有输出操作的日志后,主虚拟机才能执行输出操作。所以减少数据包延迟发送的时间等价于减少日志发送和确认的时间。这里主要的优化点是发送和接受消息回执的过程中确保不会触发线程切换。另外,当主虚拟机将某个数据包加入发送队列时,系统会强制刷新主虚拟机的日志缓冲区到 logging channel 中。

其他设计方案

本节主要描述了 VMware 在实现 VMware FT 时调研和考虑的其他一些方案。

共享磁盘和非共享磁盘

在当前的实现中,主虚拟机和从虚拟机共享一个存储。因此当发生主从切换时,从虚拟机上的数据天然是和主虚拟机上的数据一致的。共享存储相对于主从虚拟机来说是一个外部系统,所以任何对共享存储的写入都被视为和外部的通信。因此,只有主虚拟机被允许写入到共享存储,而且写入必须遵循 Output Rule 规则。

另一种设计是主虚拟机和从虚拟机各自有一套独立的存储。在这个设计中,从虚拟机会将所有的输出写入到自己的存储中,所以从虚拟机的数据也能和主虚拟机保持同步。下图展示了这种设计下的配置:

alt

在非共享磁盘的场景下,每个虚拟机的存储被视为该虚拟机内部状态的一部分。因此,虚拟机的输出没有必要遵循 Output Rule 规则。非共享磁盘的设计在主从虚拟机无法访问共享存储时很有用。这可能时因为共享存储不可用或者过于昂贵,或者主从虚拟机间的物理距离太长。这种方案的一个主要的缺点是主从虚拟机开启容错时必须将双方的磁盘进行初始化同步。另外,双方的磁盘也有可能在异常发生后造成不同步,所以当备份虚拟机重启后,双方磁盘需要再次同步。所以,FT VMotion 不只要同步主从虚拟机的状态,还要同步磁盘的状态。

在非共享磁盘的场景下,系统就不能借助共享存储来解决脑裂问题。在这种场景下,系统可借助其他的外部组件,例如某个主从虚拟机都可以连接的第三方服务器。如果主从服务器属于某个多于两个节点的集群,那么就可以使用某个选举算法来选择谁能进入 go live 模式。在这种场景下,如果某台虚拟机获得了大多数节点的投票,那么它就可以进入 go live 模式。

在从虚拟机上执行磁盘读操作

在当前的实现中,从虚拟机从来不会从虚拟磁盘中读取数据(不论是共享存储还是非共享存储)。因为磁盘读也是一种输入,所以会自然的将磁盘读取后的结果以日志的形式通过 logging channel 发送给从虚拟机。

另一种设计是涉及到磁盘读操作的执行由从虚拟机自行从磁盘中读取,主虚拟机不再向 logging channel 中发送从磁盘读取到的数据。对于磁盘密集型的系统来说,这种设计可以大幅降低 logging channel 的负载。不过,这种设计也存在一些问题。这可能会降低从虚拟机的执行速度,因为从虚拟机在重放时必须先等待所有依赖的磁盘 IO 操作完成。

另外,当磁盘读失败时还需要额外的处理。如果主虚拟机读取磁盘成功了,而从虚拟机读取磁盘失败了,那么从虚拟机就需要进行重试直到成功,因为从虚拟机必须拿到和主虚拟机内存中一样的数据。相反的,如果主虚拟机读取磁盘失败了,则相应主虚拟机中目标内存中的数据就必须通过 logging channel 发送给从虚拟机,因为此时主虚拟机中内存中的数据是不确定的。

最后,在共享存储的模式下如果让从虚拟机执行磁盘读也有个问题。如果主虚拟机在某个磁盘区域执行了读操作,然后马上又对相同的区域执行了写操作,那么这个写操作就必须等到从虚拟机读取完成后再执行。这个依赖也能够被监测和正确处理,不过也给系统实现增加了额外的复杂度。

在实际性能测试中,从虚拟机执行磁盘读操作的情况下应用程序的吞吐会降低 1-4%,但是也能有效降低 logging channel 的负载。因此,在 logging channel 的带宽有限的情况下,可以考虑使用从虚拟机磁盘读操作。

参考

介绍

MapReduce: Simplified Data Processing on Large Clusters 中提到,MapReduce 任务的输入输出构建在 GFS 之上,GFSGoogle 内部开发的一个分布式文件系统,用于应对大型的数据密集型应用。在 GFS 之前,业界已经存在了一些分布式文件系统的实现,为什么 Google 还要再实现一套?因为基于 Google 内部应用的特点,有别于传统的分布式文件系统,除了考虑性能、可扩展性、可靠性和可用性之外,GFS 在设计时还考虑了以下三个方面:

  1. 组件异常经常出现而不是偶尔出现。GFS 构建在成百上千台廉价的机器上,并同时被同等数量的客户端访问。在这个量级规模下,在任何时候某个组件都有可能发生异常以及发生异常后无法自动恢复。这里的异常不只包括硬件的异常,还包括软件的异常以及人为的错误。因此,对于异常的监控和检测,容错,以及异常的自动恢复是系统不可或缺的一个部分。
  2. 以传统的分布式文件系统的视角来看,Google 要处理的都是大文件,几个G的文件随处可见。
  3. 对于 Google 的数据应用来说,大部分对文件的写操作是追加操作而不是覆盖操作。对文件的随机写几乎可以说是不存在。文件一旦写入完成后,基本上就不会被再次修改,剩下的都是读操作,且大部分场景下是顺序读。MapReduce 系统就是这个应用场景的典型例子,map 任务持续顺序追加生成中间结果文件,reduce 任务不断的从中间结果文件中顺序读取。根据这个特点,追加写就成为了系统性能优化以及写操作原子性保证的主要设计方向。
  4. 将应用程序和文件系统的 API 协同设计有利于增加系统的灵活性。例如,GFS 提供了原子性的追加写操作,多个客户端可以并发追加写,而无需在应用程序层面进行加锁。

设计

假设

本节主要描述了 GFS 设计的几个出发点:

  • 整个系统构建在一批廉价且经常出现异常的硬件上,所以设计上必须考虑对异常的监测和容错,以及异常发生后能够恢复到一个合适的程度。
  • 需要能够高效管理几个G的大文件,系统也支持存储小文件,不过不会对其作特殊的优化。
  • 系统需要支持两种文件读取方式:大量的流式读和少量的随机读。在大量的流式读场景下,每次读取大小一般在几百KB,或者1MB或更多。来自同一个客户端的连续读取一般是读取文件的某一段连续内容。而少量的随机读一般是在文件的任意位置读取几KB。对于性能敏感的应用程序来说,一般会将多个随机读根据读取的位置排序后批量读取,避免磁盘来回的随机寻址。
  • 系统需要支持大量连续的文件追加写操作。对文件追加写的单次大小一般和流式读的单次大小差不多。文件一旦写入完成后就几乎不会再被修改。系统同样需要支持少量的随机写,不过和少量的随机读类似,随机写也不强调性能。
  • 当有多个客户端对同一个文件进行追加写的时候,系统必须能高效的实现清晰明确的执行语义,例如是保证每个客户端至少成功追加写一次,还是至多一次或者其他。GFS 中的文件经常会充当某个生产者消费者场景下的缓冲队列,一边会有多个客户端不断往文件中追加写,一边也会有一个客户端同时进行流式读取(或在写入完成后读取),所以系统必须保证追加写操作的原子性。
  • 整个系统的吞吐的重要性大于延迟,大部分 Google 的应用程序主要是大批量的文件处理,只有少量的程序会对单次的读或写有性能要求,所以系统的吞吐是第一位。

接口

虽然 GFS 并未完全实现标准的文件系统 API,如 POSIX,但仍提供了常见的文件系统接口,如创建(create)、删除(delete)、打开(open)、关闭(close)、读取(read)和写入(write)文件。类似于本地文件系统,文件在 GFS 内通过树状目录的形式组织,每个文件通过文件路径来唯一确定,不过这里的目录是逻辑上的概念,并不会映射到某个物理文件系统目录上。

除此之外,GFS 还支持快照(snapshot)和追加写(record append)的操作。快照能够低成本的复制一个文件或一个目录。追加写允许多个客户端并发的对同一个文件追加写入,并保证每个客户端写入的原子性。

架构

一个 GFS 集群由一个主节点(master)和多个 chunkserver 组成,并被多个客户端(client)访问。不管是主节点还是 chunkserver 或客户端,都运行在廉价的 Linux 机器上。可以简单的将 chunkserver 和客户端运行在同一台机器上,如果系统资源允许或者能够容忍客户端代码潜在的不可靠性的话。

每个文件在存入 GFS 时会被切分为固定大小的块(chunk),每个 chunk 在创建时会被主节点分配一个全局不可变的64位 chunk handlechunkserverchunk 保存在本地文件系统中,每个 chunk 对应着本地的一个 Linux 文件,客户端通过指定 chunk handle 和文件偏移范围来读取或者写入 chunk。为了保证可靠性,每个 chunk 会复制到多台 chunkserver 上。GFS 默认为每个 chunk 生成3份副本,不过用户也可以为某些命名空间下的文件指定不同的副本数量。

主节点保存了全部的系统元数据,包括文件命名空间、访问控制信息、每个文件和对应 chunk 的映射、每个 chunk 所在的位置等。同时主节点还负责 chunk 的租约管理、不再使用的 chunk 的垃圾回收、chunkserver 间的 chunk 迁移等。主节点也会定期的向 chunkserver 发送心跳用于向 chunkserver 发送指令和收集 chunkserver 当前的状态。

GFS 的客户端代码会集成到用户应用程序中,它负责实现文件系统 API 以及和主节点、 chunkserver 通信来读取或写入文件。GFS 客户端通过主节点获取文件的元数据信息,然而文件的读取和写入都只和 chunkserver 通信,从而减少了主节点的负担。

不管是 GFS 客户端还是 chunkserver 都不会缓存文件数据。客户端缓存收益不大是因为 Google 大部分的应用程序是对大文件的流式读取或者文件内容过大无法被缓存。不考虑缓存简化了客户端和整个系统的实现,因为引入缓存就要考虑缓存一致性问题。不过客户端会缓存文件的元数据信息,例如某个 chunk 的位置信息。chunkserver 不缓存是因为 chunk 是作为 Linux 文件保存在本地文件系统中,操作系统本身已经提供了一层文件访问缓存,没有必要再加一层。

单主节点

单主节点同样简化了整个系统的设计,方便主节点高效管理 chunk,因为所有需要的信息都保存在主节点内存中。为了避免对文件的读写使得主节点成为瓶颈,客户端读写文件时直接和 chunkserver 通信而不会经过主节点中转,不过在开始读写前,客户端会先和主节点通信获取需要的 chunk 对应的 chunkserver 列表,并将这个数据缓存一段时间来减少后续和主节点的通信。

结合下面的流程图我们来看一下一次读操作是如何执行的:

alt

  1. 客户端根据固定的每个 chunk 的大小和想要读取的文件偏移位置,计算出 chunk 的索引。
  2. 客户端将文件名和 chunk 的索引发给主节点,主节点收到请求后返回对应的 chunk handle 和保存了该份 chunkchunkserver 列表。
  3. 客户端收到主节点返回结果后,以文件名和 chunk 的索引组合作为键,将返回结果缓存。
  4. 客户端将 chunk handle 和想要读取的文件内容偏移范围发给其中一台 chunkserver,一般来说,客户端会选择距离最近的 chunkserver,由于缓存了 chunkserver 列表信息,后续对同一个 chunk 的读请求就不需要再次和主节点通信,直到缓存过期或者文件被再次打开。
  5. 一般来说,客户端和主节点通信时会在一次请求中要求多个 chunk 的信息,主节点同样也可在一次响应中返回多个 chunk 的信息,这也有利于减少客户端和主节点的通信次数。

chunk 大小

每个 chunk 的大小是一个关键的设计选择。GFSchunk 大小为 64 MB,远大于一般文件系统的数据块的大小。每个 chunkLinux 文件的形式保存在 chunkserver 上,但其实际占用的大小是动态增长的,从而避免了大量的文件碎片(例如实际只需要十几K的文件却固定分配了 64 MB 的大小)。

那么,GFS 为什么要选择 64 MB 这样较大的数呢?主要是因为:

  1. chunk 的大小越大,每个文件拆分成的 chunk 的数量就越少,客户端需要读取或写入的 chunk 数量也就越少,和主节点通信的次数也就越少。对于 Google 的数据应用来说,大部分都是顺序读写,客户端和主节点交互的次数是线性的时间复杂度,减少 chunk 的总个数有助于降低整个的时间复杂度。即使是对于少数的随机写,由于 chunk 大小足够大,客户端也能够将TB级别的数据对应的所有 chunk 的信息缓存起来。
  2. 一个 chunk 的大小越大,那么客户端对其可操作次数就越多,客户端就可以和某个 chunkserver 维持一段较长时间的 TCP 连接,减少和不同的 chunkserver 建立连接的次数。
  3. 主节点需要维护每个 chunk 对应的 chunkserver 列表,这个也是个线性的空间复杂度,较大的 chunk 大小能减少 chunk 的总个数,从而减少元数据的大小,主节点就可以将所有的元数据放在内存中。

另一方面,即使将 chunk 的大小设置的较大,以及采用了懒分配的策略,也依然存在缺点。对于一个小文件来说,它可能只有几个甚至是一个 chunk,如果这个文件被访问的频率较大,它就有可能成为热点数据,保存了对应的 chunkchunkserver 就要承受更大的流量。不过在 Google 的实际应用场景中,热点数据并没有成为一个大问题,因为大部分的系统面向的是大文件的流式读取,流量能较平均的分发到各个 chunkserver

不过,Google 确实有遇到过一个热点数据问题,有个批处理系统会先将一个可执行文件写入到 GFS 中,这个可执行文件的大小只有一个 chunk,然后所有客户端会同时读取这个可执行文件并执行,这就造成了几台 chunkserver 同时承接了大量的请求。Google 通过两个方面来解决这个问题,第一针对这个文件设置一个较大的副本数量,让更多的 chunkserver 来分散流量,即水平扩展;第二交替启动批处理系统的客户端,避免同一时间的大量请求。另一种长期的解决方案是在这种场景下允许客户端从其他已读取完毕的客户端那读取文件。

元数据

主节点主要保存了三类元数据:每个文件和 chunk 所属的命名空间、每个文件到对应所有 chunk 的映射、每个 chunk 对应的 chunkserver 列表。所有的元数据都保存在主节点的内存中,前两种元数据被修改时还会以日志的形式保存在本地日志文件中,并备份到其他服务器上。通过日志文件的方式使得主节点能够轻松的修改元数据,并且在主节点崩溃重启后能恢复数据,当然极端情况下主节点有可能没有保存最新的元数据到日志文件中就崩溃了,这里后文会有说明主节点会在持久化完成后才返回结果给客户端,所以客户端不会看到未持久化的元数据。主节点并不会对 chunkserver 列表进行持久化,而是在启动时主动拉取所有 chunkserver 的信息,以及每当一个新的 chunkserver 加入到集群中时,更新元数据,因为 chunkserver 是个动态更新的数据,即使持久化了也需要和当前实时的数据作比对。

数据结构

由于元数据保存在主节点内存中,所以涉及主节点的操作都是非常快的。另一方面,主节点也能够轻易的每隔一段时间遍历所有的元数据,这个主要有三个目的,第一是扫描不再需要的 chunk 进行垃圾回收;第二如果某个 chunkserver 失联了,能将其保存的 chunk 备份到其他 chunkserver 上;第三将某些流量失衡或者本地磁盘空间不够的 chunkserver 下的 chunk 转移到其他 chunkserver 下。

将元数据保存在内存中的一个缺陷是受限于主节点的内存容量,能够保存的 chunk 的元数据的个数是有限的,从而整个 GFS 的容量大小是有限的。不过在实践中,这个问题还没有成为瓶颈,每个 chunk 对应的元数据大小约小于64字节,由于 GFS 面向的主要是大文件,所以基本上每个文件也就最后一个 chunk 没有塞满,所以空间浪费较少。类似的,由于使用了前缀压缩,每个文件的命名空间元数据的大小也小于64字节。对于一个 128 GB 内存的主节点来说,假设全部用来保存 chunk 的元数据,则理论上保存的所有 chunk 的大小为 128 GB / 64 B * 64 MB = 128 PB,虽然实际上元数据不只 chunk 一种,但整体上来说也能够维护 PB 级别的数据。

如果需要 GFS 支撑更大的容量,相比于将元数据全部保存在内存中所带来的的简洁、可靠、性能和灵活上的收益,扩展主节点的内存所需要的成本不值一提。

chunk 的保存位置

主节点不会持久化每个 chunk 对应的 chunkserver 列表,它会在启动时主动拉取所有的 chunkserver 信息。在主节点运行后,它可以通过和 chunkserver 的心跳来实时更新 chunkserver 的状态信息。

在最初的设计中,Google 的工程师试过将 chunkchunkserver 的映射关系进行持久化,但是最终还是采用了更简洁的设计,即在主节点启动时主动拉取所有的 chunkserver 信息并在之后定期更新内存中的数据。这就消除了频繁的数据同步,毕竟在集群环境下,chunkserver 加入或者离开集群,修改主机名,发生异常或重启等操作会经常发生。

另一方面,只有 chunkserver 才真正知道自己是否拥有某个 chunk,所以在主节点上持久化一份 chunkchunkserver 的映射关系意义不大,因为这份数据很大概率是不一致的,毕竟 chunkserver 会发生异常然后失联或者运维重命名了一台 chunkserver

操作日志

操作日志记录了元数据的重要历史更新,它是 GFS 的关键。不仅因为它是元数据的唯一持久化备份,同时它也提供了系统在并发操作下各操作的执行顺序。各个版本的文件和其对应的 chunk 的操作记录都被唯一的标识在日志文件中。

所以操作日志必须保证可靠性存储,以及在元数据持久化完成之前不能将最新的元数据更新暴露给客户端。否则就有可能丢失最新的客户端操作,即使 chunkserver 还存活着。因此,GFS 会将操作日志备份在多台机器上,并且只有在所有的副本机器都持久化完成后才会返回结果给客户端。同时主节点会对操作日志进行批量持久化以降低频繁持久化对系统整体吞吐的影响。

当主节点崩溃重启后会通过重放操作日志来恢复崩溃前的状态,然而如果每次都从第一条日志开始重放,主节点崩溃重启到可用需要的时间会越来越久,因此当操作日志的大小增长到一定程度的时候,主节点会为当前的元数据创建一个检查点,当主节点崩溃恢复后,可以先加载最新的检查点数据,然后再重放在这个检查点之后生成的操作日志。检查点是一个类似于 B 树的数据结构,可以轻易的映射到内存数据结构中,并且方便根据文件的命名空间检索。这就加快了主节点崩溃恢复的速度和提高了系统的可用性。

因为构建检查点需要时间,所以在创建检查点时需要确保不影响正在进行的文件修改。主节点会先创建一个新的操作日志文件,然后由一个新的线程去创建检查点,这个线程会依据新的操作日志文件创建前的日志生成检查点。对于一个百万级别数量文件的系统来说,大概需要1分钟的时间创建检查点,检查点创建完成后同样会持久化到本地磁盘和副本机器上。

主节点崩溃恢复只需要最新完整的检查点文件以及后续的操作日志文件,在这之前的检查点文件和操作日志文件都可以删除,不过为了提高容错性还是会保留一部分文件。如果生成检查点的时候发生异常并不会影响崩溃恢复的正确性,因为恢复的代码会校验检查点文件的完整性并跳过损坏的检查点文件。

一致性模型

GFS 提供了弱一致性模型,在能较好的支撑 Google 内部各分布式应用的同时也兼顾了简洁和高效的实现。

GFS 的保证

文件命名空间的修改是原子的(例如创建一个文件)。对命名空间的修改会由主节点加锁执行,这就保证了修改的原子性和正确性,同时主节点的操作日志记录了这些修改的全局顺序。

而某块文件区域修改后的状态则取决于修改的类型,是否成功或失败,以及是否存在并发的修改。某块文件区域是一致的(consistent)表示不管客户端从哪个 chunkserver 读取这块的数据,每个客户端看到的内容始终是相同的。某块文件区域是已定义的(defined)表示经过一次修改后,首先这块文件区域是一致的,并且客户端读到的数据就是自己修改的。如果对某块文件区域的修改不受其他并发请求的客户端的影响,那么这块文件区域在修改成功后是已定义的(也是一致的),所有的客户端都能看到对应的修改。而如果有多个客户端并发的对同一块文件区域修改,那么最终的结果是未定义的,但是是一致的,所有的客户端读到的数据都是相同的,但是并不知道读到的数据是哪个客户端修改的,一般来说,最后这块文件区域的内容很可能是多个客户端并发修改后混合的结果。一次失败的修改会使得这块文件区域不一致(因此也是未定义的),不同的客户端在不同时间读取到的数据是不同的,因为多个 chunkserver 上保存的数据不一致。

对文件的修改包括在随机位置的写和在文件末尾的追加写两种,随机写指的是数据会写入到应用程序指定的某个文件偏移的位置,追加写指的是在并发的情况下能保证数据会原子性的至少一次追加写入到文件末尾,不过最后数据写入的实际偏移位置是由 GFS 来决定(相反的,一次常规的追加写指的是在客户端以为是文件末尾的地方写入数据。)。追加写完成后,系统会返回相应的偏移量给客户端,这个偏移量表示了某块已定义的文件区域的开始,对于当前客户端来说,这块区域的数据就是自己写入的,对于其他客户端来说,它们读到的数据也始终是相同的。不过,GFS 有可能在修改 chunk 时插入对齐数据或者重复的数据,例如现在有个追加写操作,chunkserver1 需要写入的偏移量位置是100,并且写入完成了,chunkserver2 需要写入的偏移量位置也是100,但是 chunkserver2 写入失败了,客户端会进行重试,那么 chunkserver1 会在偏移量101的地方再次写入相同的数据,但是由于之前 chunkserver2 没有写入成功,它的偏移量位置还是100,那么为了保证所有客户端根据偏移量读取到的数据是相同的,就需要先补齐 chunkserver2 偏移量位置100的数据,然后在偏移量位置101写入新的数据,如果这次所有 chunkserver 都写入成功了,那么就会把偏移量101返回给客户端,而不是100。所以我们看到,客户端想要保存的原始文件,和最终从 GFS 取回的文件数据并不是每个比特都一致的,存在某些文件区域数据不一致的情况。

当一系列文件修改执行完成后,被修改的文件区域保证了是已定义的,并且包含了最后一次修改的内容。GFS 通过两个方面来保证这一点:第一,对 chunk 的所有修改在所有 chunkserver 上的执行顺序是一样的;第二,通过 chunk 的版本号来识别所有版本落后的 chunkserver,这些 chunkserver 有可能是因为在失联期间 chunk 内容发生了修改。对于版本号落后的 chunkserver 则不会参与 chunk 的修改并且主节点也不会将其作为有效的 chunkserver 返回给客户端,这些 chunkserver 所持有的 chunk 会尽早的被垃圾回收。

然而在前面提到,客户端从主节点获取某个 chunkchunkserver 列表后,会在一段时间内将其缓存,如果这段时间内某个 chunkserver 包含的 chunk 的版本号落后了,那么客户端读到的数据会存在短暂的不一致,这个时间段就取决于客户端缓存的过期时间,以及客户端下次打开文件的时间,因为重新打开文件时客户端会从主节点重新拉取 chunkserver 的信息从而刷新了缓存。然而,鉴于大部分文件的修改是追加写,这里我理解可能在流式读的场景下,读完 chunk 的某段内容后会返回新的偏移量,而对于版本号落后的 chunkserver 来说,它返回的偏移量可能已经读过了,所以此时客户端知道这个 chunkserver 可能过期了,所以会尝试再次和主节点通信,从而获取新的 chunkserver 列表。

然而即使对文件的修改执行成功了,系统其他组件如硬件的异常也有可能导致文件损坏或摧毁。GFS 通过主节点和 chunkserver 的心跳来监测异常的机器,并通过校验和来验证 chunk 的数据是否损坏。如果出现了数据损坏或者某台 chunkserver 失联了,那么 GFS 会从其他有效的 chunkserver 那重新复制数据,来保证副本数量维持在指定的阈值上。所以,只有当某个 chunk 对应的 chunkserver 全都异常了,同时 GFS 还没来得及反应过来(一般来说是几分钟内)复制数据,chunk 才会发生永久丢失。即使在这种情况下,对于应用程序来说它看到的是不可用的状态,而不是数据损坏:应用程序收到的是明确的错误信息而不是损坏的数据。

最后,来看一下每种操作下的一致性保证:

  • 随机写
    • 串行执行成功:已定义(defined),在串行执行场景下,某个时间只会有一个客户端执行写入,客户端在执行成功后读取到的就是自己刚刚写入的,所以是已定义的。
    • 并行执行成功:一致但是未定义(consistent but undefined),某个时间在并发场景下多个客户端写入的偏移范围可能重合,最后文件区域的数据可能是多个客户端写入混合后的产物,每个客户端写入后读取到的不一定是自己写入的,所以是未定义的,但由于写入成功,所以是一致的。
    • 异常:不一致(inconsistent),写入失败时,有的 chunkserver 可能写入成功,有的未成功,不同客户端读取同一个偏移量的数据就会不一致。
  • 追加写
    • 串行执行成功:已定义但是穿插着不一致(defined, interspersed with inconsistent),由于追加写至少写入一次的保证,一次追加写的成功背后可能包含着重试,所以在某些偏移量下各个 chunkserver 上的 chunk 的数据会不一致,但是追加写返回的是一个写入成功的偏移量,不会把不一致的偏移量返回,这个新的偏移量下的数据就是客户端自己写入的,且每个客户端读取这个偏移量的内容都是相同的,所以是已定义的。
    • 并行执行成功:同上,追加写返回的偏移量是由 GFS 决定的,所以多个并发客户端追加写入不会相互覆盖。
    • 异常:不一致(inconsistent),同随机写。

对应用程序的影响

借助追加写、检查点、可自我校验和识别的数据写入,GFS 的应用可以较好的适应 GFS 的弱一致性模型。

在实际应用场景中,GFS 的应用基本是通过追加写而不是覆盖写来修改文件。在某个典型应用中,某个文件的内容完全由某个生产者写入,在写入完成后能原子性的将其重命名,或者周期性的为至今写入成功的数据建立检查点。检查点同时也可包含应用程序级别的校验和(不同于 GFS 的校验和,应用程序保存的数据对于 GFS 来说是一致的,但对于应用来说可能是不完整的)。对于读文件的消费者来说,它们只需要读取到最新的检查点即可,因为到最新的检查点的数据可以认为是已定义的(defined),并可以通过校验和来验证文件数据的完整性。如上节所属,虽然追加写在失败时也会存在不一致的情况,但结合检查点的方式已经能较好的满足实际的业务场景。另外,追加写也远比随机写高效和健壮。借助检查点生产者可以在崩溃恢复后从检查点开始继续写入,消费者只根据检查点来读取数据,而不会越过检查点读到以应用的视角来说不完整的数据。

在另一个例子中,多个生产者并发的向某个文件进行追加写,追加写至少写入成功一次的保证使得每个生产者的写入都得以保留。不过,消费者在读取文件时就需要额外处理对齐数据和重复数据,每一条追加写可以看做一条记录,生产者在写入时可以附加一个校验和用于消费者校验数据的完整性,对于对齐数据或不完整的数据来说,它们必然没有校验和,因此消费者就可以跳过这些异常记录。另外,每一条记录都有一个唯一的标识符,重复的记录有着相同的标识符,如果消费者不能容忍重复的数据(例如消费者的代码没有做到幂等),则可以通过这个标识符来对数据去重。除了去重以外,上述的功能都已包含在客户端库中,无需应用端自行实现,从而使得在应用层面每一个消费者都能读到同一份数据(当然,可能会有一点重复数据)。

系统交互

在单主节点的设计下,GFS 尽可能的让主节点少参与到数据交互中,本节主要描述了客户端、主节点和 chunkserver 在数据修改、原子性的追加写和快照中的实现。

租约和修改的执行顺序

一次修改(mutation)指的是修改了某个 chunk 的元数据或者内容。每个修改都会应用到该 chunk 的所有 chunkserver 上。在修改前,主节点会挑选某个 chunkserver 作为 primary,为其分配一个租约(lease)。对某个 chunk 的修改可能同时会有多个,所以 primary 会对所有的修改安排一个执行顺序,所有其他的 chunkserver 都会按照这个执行顺序应用修改。因此,从全局来看,对同一个 chunk 的修改的顺序首先取决于主节点分配租约的顺序,在某个租约有效期内的执行顺序,则取决于 primary 安排的执行顺序。

租约的设计是为了减少主节点协调的负担。一个租约初始设置了60秒的过期时间,不过如果在过期前该 chunk 还未完成修改,primary 可向主节点申请租约续期。这个申请可以附带在 chunkserver 和主节点的心跳监测中,而无需额外通信。不过有时候主节点可能会在租约有效期内回收租约(例如,某个文件正在重命名时主节点需要暂停所有的修改)。即使主节点和当前的 primary 失联了,主节点依然可以在该租约过期后安全的重新分配一个租约给新的 primary

同时,租约也避免了脑裂的问题,如果没有租约的时间限制,主节点先指定了一个 primary,然而由于网络原因主节点认为这个 primary 失联了,就会重新分配一个 primary,此时就有两个 chunkserver 认为自己是 primary。而在租约过期前,主节点不会分配新的 primary 就保证了同一时间只有一个 primary

下图描述了一次随机写的执行流程:

alt

  1. 客户端询问主节点当前持有租约的 primary 和其他的 chunkserver 的地址,如果当前没有一台 chunkserver 持有租约,则主节点会挑选一个 primary 并分配租约。
  2. 主节点返回当前的 primary 和其他的 chunkserversecondary) 的地址。客户端收到响应后将其缓存供后续修改时使用,只有当 primary 无响应或者 primary 回复不再持有租约时,客户端才会再次询问主节点 primary 的信息。
  3. 客户端将需要写入的数据发送给所有的 chunkserver,这一步没有执行顺序的要求。每个 chunkserver 收到数据后会将其暂存在一个 LRU 缓存中直到该数据被取出或过期。从控制流角度来说,是由 primarysecondary 发送指令,而发送待写入的数据不关心谁是 primary,下文会说到这里 GFS 将数据流和控制流进行解耦,基于网络的拓扑结构来实现数据的最优传输。
  4. 当所有 chunkserver 都收到了客户端的数据后,客户端会向 primary 发送一个写入请求,这个请求中标记了之前发送的那份数据。primary 将该时段收到的所有修改请求设置一个串行的执行顺序,并对每一个修改分配一个执行编号,然后将修改写入到本地的 chunk 中。
  5. primary 将写请求分发给其他所有的 secondarysecondary 收到请求后根据执行编号同样将修改按照和 primary 一样的执行顺序写入到自己的 chunk 中。
  6. 每个 secondary 写入完成后会回复 primary 告知写入完成。
  7. primary 收到所有 secondary 的回复后,再回复给客户端告知写入成功。如果途中任何一个 chunkserver 出现异常都会通知客户端,当发生异常时,可能 primary 已经写入成功了,但是某个 secondary 只写入成功一部分(如果是 primary 写入异常,就不会分发写请求给其他所有的 secondary)。因此此次客户端请求就认为写入失败,多个 chunkserver 间出现了数据不一致,此时客户端会进行重试,它会尝试重新执行第3步到第7步,如果尝试几次后依然失败,则会重新从第1步开始执行。

如果客户端一次随机写的数据量较大或者当前 chunk 的剩余空间无法容纳全部的数据则需要跨 chunk 写入,GFS 的客户端库会将一次写请求拆分为多个写请求处理。每个单独的写请求都遵循上述的流程执行,不过由于存在并发客户端写的问题,最终写入的数据有可能多个客户端间相互覆盖,不过由于每个 secondary 都按照相同的执行顺序写入成功,最终各个 chunkserver 上的数据都是相同的,这个也就是之前描述的一致但是未定义状态(consistent but undefined)。

数据流

在前面提到,在第3步发送数据时,GFS 解耦了控制流和数据流以更有效率的利用网络。在控制流下,请求从客户端发送到 primary,再由 primary 分发给其他所有的 secondary,在数据流下,GFS 会选择最优的路径以接力的方式在各个 chunkserver 间传递数据。目的是为了充分的利用每台机器的网络带宽,同时避免网络瓶颈和高延迟的链路,从而降低推送全部数据的整体延迟。

为了尽可能的充分利用每台机器的网络带宽,GFS 选择了线性接力的方式在各个 chunkserver 间传递数据,而不是其他的分发方式(如树状分发,例如以一台机器发送给3台机器为例,发送的机器的出口带宽就被3台机器共享)。

为了尽可能的避免网络瓶颈和高延迟的链路,每个机器会选择在网络结构下距离最近且还未收到数据的机器传递数据。假设现在客户端需要推送数据给 S1S4 四台机器,客户端先将数据发送给距离最近的机器,比如说是 S1S1 同样将数据转发给距离 S1 最近的机器,比如说是 S2,依此类推,最后数据到达 S4Google 内部的网络拓扑结构足够简单使得两台机器间的距离能够较准确的根据 IP 地址估算出来。

最后,GFSTCP 连接上构建了一条流式接力传递的数据流来降低数据传递的延迟。每个 chunkserver 一旦收到一定大小的数据后就马上将其转发给距离最近的其他 chunkserver。收到数据就马上转发并不会降低收数据的速率。在没有网络拥堵的情况下,假设 T 是当前网络的吞吐,L 是两台机器间的传输速度,那么要传输 B 字节给 Rchunkserver 需要的时间为 B / T + RL,其中 B / T 是完整传输 B 个字节给第一台机器的时间,RL 表示第一台机器发送的第一个字节经过接力到达最后一台机器所需要的时间。在 Google 的网络环境下,T100 MbpsL 远小于 1 ms,所以 RL 的时间基本可以忽略,因此传输 1 MB 的数据理论上约需要 80 ms((1 * 1024 * 1024 * 8 / 100 * 1000000) * 1000 ms)。

原子性追加写

GFS 提供了原子性的追加写操作(record append)。在传统的随机写操作下,客户端会指定要写入的文件偏移位置,在多个客户端并发写的情况下,最终该文件区域的内容为多个客户端并发修改后混合的结果。而在追加写场景下,客户端不指定偏移位置,只提供需要写入的数据,GFS 能够原子性的至少一次将其追加到文件中,并指定一个数据写入后的偏移位置,然后将其返回给客户端。这个操作类似于在 Unix 下多个进程以 O_APPEND 的模式并发写入文件而无需担心竞争问题。

追加写在 Google 的分布式应用中使用的非常频繁,多个客户端会并发的对同一个文件追加写入。在传统的随机写操作下,客户端需要额外的同步机制例如实现一个分布式锁来保证写入的线程安全性。在 Google 的应用场景下,文件一般是作为一个多生产者/单消费者的缓冲队列或者包含了多客户端合并后的结果,所以追加写已经能满足需求。

追加写也是一种修改,它的执行流程和随机写基本是相同的,不过对于 primary 来说多了些额外的处理逻辑。首先客户端将需要追加写的数据发送给对应文件的最后一个 chunk 的所有 chunkserver,然后发送写请求给 primaryprimary 会检查当次写入是否会造成最后 chunk 的大小超过 64 MB。如果会超过,那么 primary 会先将当前 chunk 使用特殊字符或其他手段填充到 64 MB,同时告知所有其他的 secondary 也进行同样的操作,接着回复客户端要求针对下一个新的 chunk 发送同样的追加写请求(追加写要求追加写入的偏移位置不超过 chunk 最大大小的四分之一处,从而使得由于数据填充带来的 chunk 碎片在一个可接受的水平。)。如果追加写入不会造成超过 chunk 的最大大小,那么 primary 会先将数据追加到自己的 chunk 中,产生一个新的偏移量,然后再通知其他所有的 secondary,将数据追加到相同的偏移量上,最后全部 chunkserver 执行成功后再将新的偏移量返回给客户端。

如果任何一个 chunkserver 追加写失败了,客户端会发起重试。因此,多个 chunkserver 下同一个 chunk 的数据可能会有数据不一致的情况,某条记录有可能全部或者部分重复。GFS 并不保证每个 chunkserver 上的 chunk 的每个字节都一样。它只保证了每个追加写能原子性的至少写入一次,因为一次追加写执行成功依赖于所有的 chunkserver 成功写入到指定的偏移位置上。此外,在一次追加写执行成功后,后续的追加写必然是写在一个更高的文件偏移位置或者新的 chunk 上,即使主节点挑选了一个新的 primary。在 GFS 的一致性模型下,成功执行了追加写所在的偏移区域的数据是已定义的(也是一致的),而剩下存在写失败的偏移区域的数据是不一致的,前文有提到过这些不一致的区域可以通过校验和来剔除。

快照

快照能够几乎瞬时的复制一个文件或者文件夹,而尽可能的降低对进行中的修改的影响。在实际应用中,一般使用快照对某个巨大的数据集快速创建一个分支(以及分支的分支),或者为当前的文件状态创建一个检查点,方便对当前的文件进行验证修改,在修改完成后提交或回滚。

类似于 AFSGFS 也采用了写时复制(copy-on-write)的技术来实现快照。当主节点收到某个文件的快照请求时,会首先回收所有该文件的 chunk 所关联的租约,这就保证了后续所有对这些 chunk 的修改都需要先申请租约。这就给了主节点有复制一份 chunk 的机会。

当所有涉及的租约被回收或者过期后,主节点首先将快照操作写入操作日志,然后修改内存元数据,复制一份新的目标文件或文件夹的元数据,这个新复制的元数据和源文件的元数据都指向相同的 chunk

在快照生成后,假设此时有一个客户端希望写入 chunk C,它会先询问主节点返回当前持有租约的 primary,主节点发现要修改的 chunk C 有多于1个的元数据引用,说明这个 chunk 发生了快照。主节点首先会为这个文件创建一个新的 chunk C',然后要求所有当前持有 chunk Cchunkserver 在本地创建一个文件作为 chunk C'。因为创建 chunk C' 的机器和持有 chunk C 的机器是同一台机器,所以创建 chunk C' 只是单纯的复制 chunk C,不涉及任何网络 IOGoogle 使用的磁盘 IO 速度大概是3倍于 100 MB 的网络 IO 速度)。接下来的操作就和前面描述的客户端申请写入某个 chunk 一样了,主节点返回 chunk C'chunk handle 和对应持有租约的 primary 给客户端,客户端并不会感知这个返回的 chunk 是从某个已有的 chunk 复制而来的。

主节点操作

所有涉及命名空间的操作都由主节点执行。另外,主节点还负责 chunk 的管理:每个 chunk 应该放在哪台服务器上、创建新的 chunk 及其副本、协调各 chunkserver 保证每个 chunk 的副本数量满足阈值、保证 chunkserver 访问的负载均衡、回收不再使用的 chunk 等等。

命名空间管理和锁

很多主节点的操作需要较长的时间完成,例如,一次快照操作就需要先回收所有 chunk 涉及的租约。同时,又需要降低不同的主节点操作间的影响。因此,GFS 允许不同的主节点操作并发执行,并通过命名空间加锁来保证线程安全。

和传统的文件系统不同,GFS 的文件夹不支持列出该文件夹内的所有文件,以及不支持文件或文件夹的别名(例如 Unix 的硬链接和软链接)。GFS 通过命名空间作为文件到元数据的映射索引,借助前缀压缩使得整个命名空间映射可以高效的保存在内存中。命名空间下的每个节点(代表一个文件名或者文件夹名)都持有一把读写锁。

主节点每次执行操作前都会先获取一批锁,例如如果当前操作涉及命名空间 /d1/d2/.../dn/leaf,主节点会先获取文件夹 /d1/d1/d2,…,/d1/d2/.../dn 的读锁,以及 /d1/d2/.../dn/leaf 的读锁或写锁。这里的 leaf 对应的是某个文件或文件夹。

下面来举例说明 GFS 的锁机制如何保证当创建 /home/user 的快照到 /save/user 时避免同时创建文件 /home/user/foo。首先快照操作会获取 /home/save 的读锁,以及 home/user/save/user 的写锁。而文件创建的操作会获取 /home/home/user 的读锁,以及 /home/user/foo 的写锁。可以看到,一个获取 home/user 的写锁,另一个获取 home/user 的读锁,所以这两个操作是互斥的,不会同时发生。写文件并不需要获取文件的父目录的写锁,因为文件夹对于 GFS 来说是个逻辑上的概念,背后没有类似 inode 的数据结构需要共享保护。获取文件夹的读锁已经足够保证不会有另一个客户端能够获取文件夹的写锁从而删除了这个文件夹。

这种锁机制的一个优点是允许对同一个文件夹下的不同文件进行并发修改。例如,可以在同一个文件夹下并发创建不同的文件,每一个创建操作都会获取该文件夹的读锁以及所要创建的文件的写锁。文件夹的读锁能有效避免这个文件夹被删除、重命名或者创建快照。文件的写锁则避免了同一个文件被创建两次。

因为整个命名空间会包含很多的节点,所以每个节点的读写锁采用了懒汉初始化的方式,并且当不需要的时候能及时删除。另外,锁的获取会按照命名空间的层级顺序以及同一层级下节点名称的字母顺序来获取,从而避免死锁。

副本的放置

一个 GFS 集群一般由多个机柜上的几百台机器组成,这些 chunkserver 又会被几百台在同一个或不同机柜上的客户端访问,如果是两台不同机柜上的机器间的通信可能会经过1个或多个网络交换机。另外不同机柜间的入口和出口带宽一般会小于同机柜内的机器间的带宽。这种多级的分布式对分发数据的扩展性,可靠性和可用性提出了挑战。

所以副本的放置选择策略主要出于两个目的:最大化数据的可靠性和可用性,最大化利用网络带宽。为了实现这两点,仅仅将所有的副本分发到不同的机器上还不够,因为这只对磁盘或者机器异常进行了容错,以及最大化利用了每台机器的带宽,但是还缺少可用性的考虑。因此,需要将副本分发到不同机柜的不同机器上,这样如果当某台机柜上的机器都因故下线了(例如一些共享资源如网络交换机或者供电设备发生异常),系统仍然是可用的。所以在跨机柜的访问下,某个 chunk 的读操作会充分利用不同机柜间的带宽,另一方面 chunk 的写操作就需要跨机柜访问,这也是跨机柜存储的权衡利弊的一个体现。

副本的创建、重复制和负载均衡

chunk 的副本的创建出于三个目的:创建一个新的 chunk 时需要保证存储的可靠性,保证每个 chunk 的副本数量满足阈值,保证 chunkserver 访问的负载是均衡的。

当主节点创建一个 chunk 时,它会决定在哪个 chunkserver 上创建空的 chunk,它主要基于以下几点考虑:

  1. 选择磁盘空间利用率低于平均值的 chunkserver,这保证了长期来看每台 chunkserver 的磁盘空间利用率维持在一个相近的水平。
  2. 避免每台 chunkserver 存在过多最近创建的 chunk。虽然创建一个 chunk 比较简单,但是可以预见的是随之而来这个 chunk 会面临大量的写入。
  3. 如前面所述,每个 chunk 的副本需要跨机柜存储。

当某个 chunk 的副本数量低于用户指定的阈值时,主节点会复制一个 chunk 到某台 chunkserver 上。这种情况一般有以下几种原因:

  1. 某台 chunkserver 异常失联了。
  2. chunkserver 反馈其保存的 chunk 数据已损坏。
  3. chunkserver 的某个磁盘由于异常被禁用了。
  4. 用户要求增加副本的数量。

主节点在复制一个 chunk 时会按照优先级处理。其中一个是距离满足指定的副本数量阈值还差多少。例如如果某个 chunk 还差两个副本就比只差一个副本的 chunk 有着更高的复制优先级。另外,复制还在使用的 chunk 的优先级高于复制最近被删除的 chunk。最后,为了避免缺少副本带给应用的影响,主节点会优先处理那些阻塞了客户端执行的 chunk

主节点根据优先级决定复制哪个 chunk 之后,会选择几台 chunkserver 让其直接从拥有完整副本的 chunkserver 上复制数据。这种情况下新的 chunk 副本的存放位置的选择和前文所述类似:尽量保证每台 chunkserver 的磁盘空间利用率维持在一个相近的水平,避免副本的复制集中在一台 chunkserver 上,尽量保证副本跨机柜存储。为了避免副本复制的流量压垮客户端的流量,主节点会限制整个集群以及每台 chunkserver 同一时间执行的副本复制操作的数量。另外,每台 chunkserver 也会限制花在副本复制上的带宽。

最后,主节点会定期检查各 chunkserver 的流量或者磁盘空间利用率是否均衡:如果发现了不均衡则会在各 chunkserver 间移动某些 chunk 以达到磁盘空间利用率或流量的均衡。根据上述的策略,对于一台新加入集群的 chunkserver 来说,主节点会逐渐的将其填满 chunk 而不是马上让其承接大量新的 chunk 然后伴随着大量的写流量。另外,主节点也需要决定从哪台 chunkserver 上移除 chunk,一般的,主节点会优先选择磁盘可用空间率低于平均水平的 chunkserver,从而使得每台 chunkserver 的磁盘空间利用率维持在一个相近的水平。

垃圾回收

当某个文件被删除后,GFS 不会马上释放它所占用的空间,而是在常规的垃圾回收期间在文件和 chunk 层面延迟删除,这种策略使得系统更加简洁和可靠。

机制

当某个文件被应用删除后,和其他修改一样主节点首先会将该操作记录到操作日志中。不过,GFS 并不会马上删除该文件,而是将该文件重命名为一个隐藏的名字,同时这个隐藏的名字包含了文件删除时的时间戳。在主节点常规扫描系统的命名空间的期间,它会删除命名空间中已经存在了3天(时间间隔可配置)的隐藏文件。在这之前,这个文件依然是可以通过重命名后的名字读取的,或者将其重命名回原来的名字来撤销删除。一旦主节点将其从命名空间中删除,其对应的元数据也会一并删除,从而切断了这个文件和任何 chunk 的关联。

类似的,主节点也会定期扫描所有 chunk 的命名空间,一旦发现没有任何关联的 chunk(即没有被任何文件引用的 chunk),主节点就会删除这些 chunk 的元数据。之后,在主节点和 chunkserver 的心跳监测中,chunkserver 会向主节点反馈其所拥有的 chunk,主节点会返回所有已经不在元数据中的 chunk 标识,chunkserver 收到回复后就可以删除掉这些不需要的 chunk

讨论

虽然分布式的垃圾回收是个困难的问题,往往需要一套特定编程语言下的复杂解决方案,在 GFS 的实现中却很简单。主节点可以轻易的标识出所有 chunk 的引用,它们全都保存在文件到 chunk 的映射中。同时主节点也可轻易的标识出所有的 chunk 副本,它们都是 chunkserver 上某个指定文件夹下的 Linux 文件。任何一个不在主节点中记录的副本都可以被回收。

GFS 采用的延迟垃圾回收的方式相比于立即删除的方式存在以下几个优点:

  1. 对于组件异常经常发生的大型分布式系统来说,延迟删除更为简洁和可靠。创建 chunk 时有可能在部分 chunkserver 上成功在部分 chunkserver 上失败,这就导致创建 chunk 的这次操作整体是失败的,从而使得主节点不会记录该 chunk,后续也无法知晓这些部分创建成功的 chunk。如果采用立即删除的方式,需要由主节点向 chunkserver 发送删除 chunk 的消息并需要对发送失败的消息进行重发。而延迟垃圾回收的方式提供了一个统一、可靠的方式来删除无用的 chunk
  2. 主节点将 chunk 的删除合并到了主节点的日常维护中进行,例如对命名空间的定期扫描以及主节点和 chunkserver 的心跳监测。这就可以将 chunk 的删除进行批量处理从而摊薄了运行的成本。另外,垃圾回收只会在主节点较为空闲的时候进行,主节点会更优先的响应客户端的请求。
  3. 延迟删除给不可逆转的误删除提供了缓冲。

在实际应用中,延迟删除的主要缺点是不利于磁盘剩余空间紧张的 chunkserver 释放磁盘空间。尤其是对不停的创建然后删除临时文件的应用来说,不能够马上利用那些不需要的临时文件的空间。GFS 通过加快回收某个重复删除的文件来缓解这个问题。同时,GFS 也允许用户针对不同的命名空间设置不同的副本数量阈值和垃圾回收策略。例如,用户可指定某个目录下的所有文件的 chunk 都不需要额外的副本,以及当文件删除时系统会立即直接删除。

过期的副本检测

在某个 chunk 所属的 chunkserver 异常下线期间,其他 chunkserver 上的 chunk 发生了修改,则该 chunkserver 再次上线后该 chunk 的数据版本已落后。主节点会记录每一个 chunk 最新的版本号,从而能识别过期的副本。

每当主节点为某一个 chunk 分配一个租约时,它会先更新 chunk 的版本号,并通知所有当前持有最新版本 chunkchunkserver 也更新版本号。主节点和 chunkserver 都会将这个版本号持久化,这里的持久化顺序应该是 chunkserver 先,主节点后,否则主节点先持久化版本号然后此时下线了,再次上线后造成没有任何一个 chunkserver 持有最新的版本号。只有当持久化完成之后,主节点才会将 primary 发送给客户端,之后才能开始写入。如果此时某个 chunkserver 下线了,那么它所持有的 chunk 的版本号就无法更新,当这个 chunkserver 再次上线时,它会向主节点报告所持有的 chunk 以及对应的版本号,主节点就能知道这个 chunkserver 持有了过期的副本。如果主节点发现某个 chunk 的副本的版本号大于自己内存中的版本号,那么主节点就知道自己在更新版本号期间下线了,直接使用 chunkserver 的版本号作为最新的版本号。

主节点会在垃圾回收期间删除过期的副本。在这之前,如果有客户端请求该 chunk 的信息,主节点会忽视持有过期副本的 chunkserver,不会将其返回给客户端。其次,主节点在返回给客户端的 chunk 信息中也包含了版本号,在要求某台 chunkserver 去复制另一台 chunkserver 上的 chunk 数据时也同样附带了版本号,这样客户端或者 chunkserver 在读取 chunk 数据时就能比较读到的数据是否是最近的版本。

容错和诊断

Google 在设计 GFS 遇到的最大问题之一是如何应对频繁的组件异常。各组件的质量和数量决定了异常是经常出现而不是偶尔出现:既不能完全信任机器,也不能完全信任磁盘。组件异常可能造成系统不可用,或者更糟的是数据损坏。本节主要描述 GFS 如何面对这些挑战以及开发了哪些工具来定位问题。

高可用

一个 GFS 集群由数百台机器组成,某些机器必然会在任意时间发生异常。GFS 通过快速恢复和副本这两个手段实现了系统的高可用。

快速恢复

主节点和 chunkserver 都是设计成不管以什么样的方式终止,都能在数秒内恢复终止前的状态。实际上,GFS 并不会刻意区分正常的终止还是异常的终止,各服务器本身日常就会通过杀死进程来终止。客户端或其他服务器会因此经历一段短暂的中断,因为进行中的请求会因为服务器的终止而超时,继而客户端会对重启后的服务器进行重连,然后重试。

chunk 副本

如之前描述,每个 chunk 会备份到不同机柜上的不同机器。用户可指定不同命名空间配置不同的备份策略。默认每个 chunk 有3个副本。主节点通过 chunkserver 间复制 chunk 来保证每个 chunk 有足够的副本数量,避免 chunkserver 由于下线或者数据损坏造成 chunk 副本数量不足。虽然依靠副本已经能较好的支撑需求,Google 的工程师也在考虑跨服务器的冗余手段如奇偶校验或者 erasure code 来应对日渐增长的只读存储需求。在当前这个松耦合的系统中实现更复杂的冗余策略会更有挑战性,不过也是可控的,因为大部分的文件操作都是追加写和读而不是随机写。

主节点副本

GFS 通过备份主节点的状态来实现可靠性。主节点的操作日志和检查点都会备份到多台机器上。一次文件的修改只有当主节点和所有备份服务器都写入到本地日志之后才认为是已提交的。从实现的简单性考虑,只会有一个主节点机器负责所有的元数据修改以及在后台运行的一些日常维护活动来修改主节点的内部状态,例如垃圾回收。当这个主节点发生异常时,它几乎能马上重启。如果整个机器或者磁盘发生异常,GFS 之外的监控设施能在其他机器上借助操作日志副本重新启动一个新的主节点进程。对于客户端来说不需要有任何改动,因为客户端和主节点的通信是通过主机名进行的(例如 gfs-test),其对应的是一个 DNS 别名,当主节点更换机器时,同步修改别名映射即可。

另外,即使主节点异常了,还有影子主节点也提供对本地文件系统的只读访问功能。因为这个影子主节点不是严格的镜像节点,所以它们的文件系统上的数据可能会略落后于主节点,一般来说是几秒。影子主节点依然能够提供当前未在修改的文件查询功能,或者应用程序不介意查询到一部分过期的数据,例如 chunkserver 列表小于实际运行的数量。因为文件的内容是通过 chunkserver 读取,和主节点无关,所以客户端不会读取到过期的数据。可能过期的数据是元数据,例如文件夹的内容或者权限控制信息。

为了保证自己持续更新,影子主节点会不断从某台操作日志副本机器读取日志,并以和主节点一样的执行顺序将其重放到内存中的数据结构中。和主节点一样,影子主节点在启动时也会拉取 chunkserver 信息来定位 chunk 的副本位置,并定期与其进行心跳监测。它依赖主节点来更新 chunk 副本的位置,如主节点创建和删除副本。

数据完整性

每台 chunkserver 使用校验和来检测损坏的数据。鉴于一个 GFS 集群有几千个磁盘运行在几百台机器上,磁盘异常经常会导致文件读写时数据损坏或丢失。虽然可以从其他 chunkserver 恢复,但是通过比较多个 chunkserver 上的副本来检测数据损坏显然是不实际的。另外,在一致性模型中提到,每个副本的数据不是每个比特都一致的,由于追加写的原子性保证造成异常重试的存在,每个副本的数据很大可能是不同的。所以,每台 chunkserver 必须能够自己独立通过校验和来检测数据是否损坏。

每个 chunk 切分为每块大小为 64 KB 的数据块,每个数据块对应一个32位的校验和。和其他元数据一样,校验和也保存在内存中,并随日志持久化到本地磁盘中,且和用户数据分开存放。

对于读操作,chunkserver 会根据读取的字节范围先根据校验和校验所覆盖的数据块的数据是否完整,然后再返回数据给客户端或其他的 chunkserver。因此,每台 chunkserver 不会把损坏的数据同步到其他 chunkserver 上。如果 chunkserver 发现某个数据块发生了损坏,则会返回一个异常给调用方,并向主节点报告。调用方收到回复后会重新选择一个 chunkserver 进行读操作,而主节点则会安排这个 chunkserver 从其他 chunkserver 那复制 chunk。当新的 chunk 就绪后,主节点会通知这个 chunkserver 删除损坏的 chunk

校验和对读操作带来的影响较小,主要有几个方面的原因。大部分的一次读操作只涉及了几个数据块,所以需要读取和验证的校验和数量也较少。另外,客户端在读取时,会尽量保证正好在数据块的边界读取,而不是跨数据块。最后,校验和验证不涉及 IO,以及校验和的计算可以和数据读取 IO 同时进行。

针对追加写,校验和计算对此做了大量的优化,因为追加写是主要的写场景。计算校验和时,GFS 会复用当前最后一个数据块的校验和,一个数据块的大小是 64 KB,可能目前只写入了 30 KB,那么本次往剩下的 34 KB 写入时,则无需从头开始计算这个数据块的校验和。对于新填充的数据块依然还是需要完整计算校验和。即使当前这个写入了 30 KB 的数据块损坏了,计算校验和时无法知晓,最终这个数据块的校验和与保存的数据已经不匹配,这个损坏也会在下一次读操作中被调用方感知。

相反的,对于随机写操作,如果当前的随机写覆盖了某个 chunk 已有的数据,则必须先对所覆盖的数据块进行校验和验证,确保当前的数据未损坏,然后再执行写入和计算新的校验和。如果不进校校验就写入,那么有可能在写入之外的地方已经发生了数据损坏。因为一次随机写可能写不满最后一个数据块,假设这个数据块是损坏的,如果写前校验就能发现这个问题。不过这里并没有理解为什么不能将完整性校验推迟到后续的读操作时,可能是因为追加写和流式读是 GFS 应用的主要方式,完整性校验推迟到读操作也能很快发现问题,而随机写是很少量的场景,写时校验能更快的发现问题。

在系统空闲的时候,chunkserver 会扫描和校验所有不活跃的 chunk,从而能够发现那些不经常被访问但是已损坏的 chunk(因为数据完整性检测发生在读写操作时)。一旦发现这样的 chunk,主节点会创建一个新的 chunk 然后指示 chunkserver 复制一份有效的 chunk,最后再删除损坏的 chunk。这就避免因为一些不活跃但已损坏的副本使得主节点认为这些 chunk 依然满足副本数量阈值的要求。

诊断工具

通过详尽的诊断日志能有效的帮助问题隔离,调试,性能分析,而且带来的成本较小。如果没有日志,则很难理解各机器间无法复现的交互。针对某些重要的事件(如 chunkserver 的下线和上线) GFS 会生成相应的诊断日志,同时还会记录所有的 RPC 请求和响应。诊断日志的删除对系统的正确性没有任何影响。然而在磁盘空间允许的情况下,还是会尽可能保留多的诊断日志。

除了正在读取或写入的文件,诊断日志记录了其他所有 RPC 的请求和响应。通过比对各机器上的 RPC 日志,就能重现当时系统交互的场景,从而进行问题定位。同时这些日志也能辅助压力测试和性能分析。

诊断日志带来的性能影响很小(在其带来的价值面前不值一提),因为采用了异步顺序的方式写入日志。同时,最近某段时间内的事件也会保存在内存中用于线上的持续监控。

参考

概览

Lab 1 需要我们实现一个单机多线程、多进程的 MapReduce 程序,通过 test-mr.sh 可以看到该实验会先启动一个主节点进程,然后再启动多个工作节点进程。

主节点

主节点的入口是 mrcoordinator.go,通过 go run mrcoordinator.go pg*.txt 可运行一个主节点程序,其中 pg*.txt 是本次 MapReduce 程序的输入数据。同时,主节点和工作节点间通过 RPC 进行交互,本实验需要我们实现以下 RPC 接口:

  1. 实验中主节点的任务分发采用的是拉模式,工作节点会定期向主节点请求一个任务,这个任务可以是 map 任务也可以是 reduce 任务,由主节点根据当前任务的状态决定应该推送 map 任务还是 reduce 任务。由于 reduce 任务会不间断的拉取中间结果数据,这里为了方便处理,当某个工作节点正在处理 reduce 任务时,在向主节点请求任务时可指明要求分发对应的 reduce 任务
  2. map 任务完成后需要通知主节点生成的中间结果文件地址,主节点收到请求后需要将中间结果文件地址保存到内部数据结构中,供后续发送给对应 reduce 任务
  3. reduce 任务完成后同样需要通知主节点任务完成,否则主节点无法知晓所有 reduce 任务是否已完成,从而无法退出主节点

任务管理

主节点需要维护所有 map 任务和 reduce 任务的状态,由于工作节点有可能失败,所以主节点需要同时记录每个任务已执行了多久,如果超过一定时间还没有收到任务完成的通知,则认为执行这个任务的工作节点已失联,然后需要将该任务重新分配给其他工作节点,在本实验中主节点等待每个任务完成的时间为10秒。

工作节点

工作节点的入口是 mrworker.go,通过 go run mrworker.go wc.so 可运行一个工作节点程序,其中 wc.so 是本次 MapReduce 程序用到的用户自定义 mapreduce 函数。工作节点只有一个 Worker 主方法,需要不断向主节点轮询请求任务,那么工作节点什么时候结束轮询?根据实验建议有两种方法,一种是当本次 MapReduce 任务完成后,主节点进程退出,当工作节点再次请求主节点任务时,RPC 请求必然失败,此时工作节点可认为本次任务已完成主节点已退出,从而结束轮询并退出;另一种是当主节点完成后,在一定时间内,在收到工作节点新的任务请求时,返回一个要求工作节点退出的标识(或者也可抽象为一种任务类型),工作节点收到 RPC 响应后退出,主节点在等待时间到了之后也进行退出。

空任务

当工作节点向主节点申请任务,而此时主节点没有可分发的任务时,主节点可返回一个自定义任务类型的任务,来表示空任务,工作节点收到响应后则直接睡眠等待下次唤醒。

map 任务

map 任务负责调用用户自定义的 map 函数,生成一组中间结果数据,然后将中间结果数据保存为文件,实验建议的文件名是 mr-X-Y,其中 X 表示 map 任务的编号,Y 表示 reduce 任务的编号,map 任务编号的范围可简单使用 [1, 输入文件的数量] 来表示,reduce 任务编号的范围为 [1, nReduce]。每个 map 任务会生成 R 个中间结果文件,实验已提供了分片函数,对中间结果的每个键使用 ihash(key) % nReduce 来决定写入到哪个中间结果文件中。

从数据结构角度来说,所有的中间结果数据是一个 nReduce * nMap 的二维矩阵,每一行对应一个 reduce 任务。

由于 map 节点有可能执行失败,为避免 reduce 节点读取到的是未写入完成的中间结果文件,在写入中间结果文件时可以先写入一个临时文件,在写入完成后再重命名为最终的文件名。在本实验中,可以使用 ioutil.TempFile 来创建临时文件,以及使用 os.Rename 来原子性的重命名文件。

在当前 map 任务的中间结果文件写入完成后,需要通过 RPC 请求通知主节点所有中间结果文件的文件地址。在本实验中,各个工作节点运行在同一台机器上,实验要求保存 map 任务的中间结果文件到当前文件夹,这样 reduce 任务就能通过中间结果文件的文件名读取中间结果数据。

对中间结果数据的写文件可以借助 Goencoding/json 模块,例如:

1
2
3
enc := json.NewEncoder(file)
for _, kv := ... {
err := enc.Encode(&kv)

然后 reduce 任务读取文件:

1
2
3
4
5
6
7
8
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}

reduce 任务

reduce 任务负责将所有中间结果数据按照键排序后应用用户自定义的 reduce 函数,并生成最终输出文件,文件格式为 mr-out-X,其中 X 表示 reduce 任务的编号。文件中的每一行对应一个 reduce 函数调用的结果,需要按照 Go%v %v 形式对键值对进行格式化。

在收到所有的中间结果数据之前,reduce 任务无法开始执行,所以在这个期间当工作节点请求 reduce 任务时,如果主节点暂时没有新的 reduce 任务可分发,则可返回一个空任务,工作节点收到响应后则暂时等待一段时间再重新请求任务。reduce 每次收到中间结果数据后会暂存在内存中,如果暂存的中间结果数据的数量等于 map 任务的数量(这个值可以放在 RPC 响应中),则说明所有中间结果数据已经接收完毕,可以开始执行 reduce 任务。

注意

mrworker.go 中注释描述通过 go run mrworker.go wc.so 来运行工作节点,不过如果构建 wc.so 时开启了竞争检测(-race),则运行 mrworker.go 时也同样需要开启竞争检测,否则会提示 cannot load plugin wc.so,如果打印 err 则会显示 plugin.Open("wc"): plugin was built with a different version of package runtime

同样的,如果使用 GoLand 调试程序,由于在调试模式下 GoLand 编译 Go 文件时会增加 -gcflags="all=-N -l" 参数,所以也需要在打包 wc.so 时增加相同的参数。

活锁

在最后的 crash test 遇到个类似活锁的问题,在前文提到,如果某个工作节点开始了 reduce 任务但是还没有接收全部的中间结果数据,则该节点下次申请任务时会继续申请该 reduce 任务(普通工作节点对申请的任务类型没有要求)。在 crash test 下,工作节点在执行用户自定义的 mapreduce 函数时有一定概率结束进程,假设现在有4个工作节点,其中一个在执行 map 任务,另外三个各自在执行 reduce 任务(轮询获取中间结果数据),不幸的是
这个时候 map 节点挂了,此时 test-mr.sh 会自动再启动一个工作节点,然后更不幸的是挂掉的 map 任务在主节点看来还没有到超时时间,所以主节点此时不会分配 map 任务给新的节点(假设没有其他 map 任务),会再分配一个 reduce 任务给新的节点,至此所有工作节点都在执行 reduce 任务,又都在等待中间结果数据完成,却又不可能完成。

造成这个问题的主要原因是任务分配顺序,上述问题下的任务分配顺序是:

  1. 指定的 reduce 任务
  2. 空闲或超时的 map 任务
  3. 空闲或超时的 reduce 任务

解决方法就是把前两个换下顺序即可,即:

  1. 空闲或超时的 map 任务
  2. 指定的 reduce 任务
  3. 空闲或超时的 reduce 任务

参考

Lab 1 虽然是个单机器多线程、多进程的程序,但主节点和工作节点的交互依然通过 RPC 实现,Go 本身也提供了开箱即用的 RPC 功能,下面将通过一个简单的求和服务来了解在 Go 中如何实现一个 RPC 服务。

定义请求体和响应体

请求体和响应体都非常简单,SumRequest 中包含要求和的两个数字,SumReply 中存放求和的结果:

1
2
3
4
5
6
7
8
9
10
package pb

type SumRequest struct {
A int
B int
}

type SumReply struct {
Result int
}

服务端

首先定义服务类 SumService 和提供的方法:

1
2
3
4
5
6
7
8
type SumService struct {
}

func (sumService *SumService) Sum(sumRequest *pb.SumRequest, sumReplay *pb.SumReply) error {
sumReplay.Result = sumRequest.A + sumRequest.B

return nil
}

SumService 只有一个 Sum 方法,接收 SumRequestSumReply 两个参数,求和后将结果放回到 SumReply 中(Sum 的方法签名必须是这样的形式,即两个入参和一个 error 类型的出参,具体规则见下文描述)。

然后进行服务注册:

1
2
3
sumService := &SumService{}
rpc.Register(sumService)
rpc.HandleHTTP()

通过 rpc.Register 这个方法可以知道,一个服务类及其提供的方法必须满足以下条件才能注册成功:

  1. 服务类必须是公共的
  2. 服务类提供的方法必须是公共的
  3. 服务类提供的方法入参必须是两个,一个表示请求,一个表示响应(从编码的角度来说方法入参是两个,但是实际代码是判断是否等于3个,因为在这种场景下定义的方法的第一个入参类似于 Java 中的 this
  4. 服务类提供的方法的第一个参数类型必须是公共的或者是 Go 内置的数据类型
  5. 服务类提供的方法的第二个参数类型也必须是公共的或者是 Go 内置的数据类型,且必须是指针类型
  6. 服务类提供的方法的出参个数只能是1个
  7. 服务类提供的方法的出参类型必须是 error

rpc.HandleHTTP() 表示通过 HTTP 作为客户端和服务端间的通信协议,当客户端发起一个 RPC 调用时,本质上是将要调用的方法和参数包装成一个 HTTP 请求,服务端收到 HTTP 请求后,解码出要调用的本地方法名称和入参,然后调用本地方法,在本地方法调用完成后再将结果写入到 HTTP 响应中,客户端收到响应后,再解析出远程调用的结果。

rpc.HandleHTTP() 本质上是个 HTTP 路由注册,实际上是调用 Handle(pattern string, handler Handler) 方法,当请求路由匹配 pattern 时,会调用对应的 handler 执行,对于 Go RPC 来说,固定路由路径是 /_goRPC_

所以,在完成 HTTP 路由注册后,还需要配合开启一个 HTTP 服务,这样才能接受远程服务调用:

1
2
3
4
5
6
7
8
listener, err := net.Listen("tcp", ":1234")

if err != nil {
log.Fatal("listen error:", err)
}

fmt.Println("Listening on port 1234")
http.Serve(listener, nil)

http.Serve 方法中对于每一个客户端的连接,最终会分配一个 goroutine 来调用 HandlerServeHTTP(ResponseWriter, *Request) 方法来处理,对于 GoRPC 包来说,则可以实现该方法来处理 RPC 请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ServeHTTP implements an http.Handler that answers RPC requests.
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "CONNECT" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
io.WriteString(w, "405 must CONNECT\n")
return
}
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
return
}
io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
server.ServeConn(conn)
}

客户端

对于客户端来说,发起远程方法调用前需要先和服务端建立连接:

1
2
3
4
5
6
7
client, err := rpc.DialHTTP("tcp", ":1234")

if err != nil {
log.Fatal("dialing:", err)
}

defer client.Close()

该方法同时返回了一个 RPC 客户端类,内部同时负责对 RPC 请求的编码和解码。

然后通过 client.Call 来发起远程调用:

1
2
3
4
5
6
7
8
9
10
11
12
sumRequest := &pb.SumRequest{
A: 1,
B: 2,
}
sumReply := &pb.SumReply{}
err = client.Call("SumService.Sum", sumRequest, sumReply)

if err != nil {
log.Fatal("call error:", err)
}

fmt.Println("Result:", sumReply.Result)

这里的调用一共有三个参数,第一个是被调用的远程方法名,需要是 类名.方法名 的形式,后两个则是远程方法的约定入参。

完整的代码可参考 go-rpc-demo

参考

Lab 1 提供了一个串行化的示例 MapReduce 程序,整体分两部分,第一部分是用户自定义的 mapreduce 函数,第二部分是框架代码。

用户自定义 map 和 reduce 函数

以单词计数应用 wc.go 为例,对于 map 函数来说,它的输入键值对类型为 <string, string>,中间结果数据类型为框架定义的 KeyValue 类型,本质上也是个 <string, string> 类型。map 函数首先将文件内容拆分为单词,然后遍历每个单词,输出对应中间结果数据 <w, "1">

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//
// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
//
func Map(filename string, contents string) []mr.KeyValue {
// function to detect word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }

// split contents into an array of words.
words := strings.FieldsFunc(contents, ff)

kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}

reduce 函数的输出类型为 string,其逻辑较为简单,中间结果数组的长度就是单词的个数:

1
2
3
4
5
6
7
8
9
//
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
//
func Reduce(key string, values []string) string {
// return the number of occurrences of this word.
return strconv.Itoa(len(values))
}

框架代码

我们可以通过 go run -race mrsequential.go wc.so pg*.txt 来运行串行化的 MapReduce 程序,这里的 wc.so 内包含了用户自定义的 mapreduce 函数,pg*.txt 则是本次 MapReduce 程序的原始输入数据。

首先,根据入参提供的插件找到用户自定义的 mapreduce 函数:

1
mapf, reducef := loadPlugin(os.Args[1])

接着,依次读取输入文件的内容,并调用用户自定义的 map 函数,生成一组中间结果数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//
// read each input file,
// pass it to Map,
// accumulate the intermediate Map output.
//
intermediate := []mr.KeyValue{}
for _, filename := range os.Args[2:] {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
}

然后,对所有中间结果数据按照键进行排序,而在 MapReduce 论文中,中间结果数据会经过分片函数分发给不同的 reduce 节点,由 reduce 节点自行排序处理:

1
2
3
4
5
6
7
//
// a big difference from real MapReduce is that all the
// intermediate data is in one place, intermediate[],
// rather than being partitioned into NxM buckets.
//

sort.Sort(ByKey(intermediate))

最后,遍历所有中间结果数据,对同键的中间结果数据调用用户自定义的 reduce 函数,并将结果写入到最终输出文件中,同样的,这里也只有一个最终输出文件而不是多个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
oname := "mr-out-0"
ofile, _ := os.Create(oname)

//
// call Reduce on each distinct key in intermediate[],
// and print the result to mr-out-0.
//
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)

// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

i = j
}

ofile.Close()

至此,一个串行化的 MapReduce 程序就完成了。

参考

Lab 1 中遇到的第一个命令是 go build -race -buildmode=plugin ../mrapps/wc.go,其中 -buildmode=plugin 表示以插件的形式打包源文件,这里的 wc.go 是用户实现的 mapreduce 方法,这体现了面向接口编程的思想,只要用户编写的 mapreduce 方法遵循统一的签名,则可以在不重新编译 MapReduce 框架代码的情况下,实时替换运行不同的用户应用。

假设有个 sum.go 文件,里面只有一个 Sum 方法:

1
2
3
4
5
package main

func Sum(a int, b int) int {
return a + b
}

sum.go 以插件形式编译:

1
go build -buildmode=plugin sum.go

会生成一个 sum.so 文件。

接着,在 main.go 中就可以通过 plugin.Open 读取 sum.so

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"log"
"plugin"
)

func main() {
fileName := "sum.so"
p, err := plugin.Open(fileName)

if err != nil {
log.Fatalf("cannot load plugin %v", fileName)
}

sumSymbol, err := p.Lookup("Sum")

if err != nil {
log.Fatalf("cannot find Map in %v", fileName)
}

sum := sumSymbol.(func(int, int) int)

fmt.Println("1 + 2 is", sum(1, 2))
}

然后通过 Lookup 根据方法名找到 Sum 方法,按照指定方法签名转换后即可进行调用。而如果需要换一个 Sum 的实现,则无需重新编译 main.go

参考:

Lab 1 中遇到的第一个命令是 go build -race -buildmode=plugin ../mrapps/wc.go,其中 -race 表示启用 Go 的竞争检测,在多线程编程时,数据竞争是必须要考虑的问题,而数据竞争的问题往往难以察觉和排查,所以 Go 内置了竞争检测的工具来帮助开发人员提前发现问题。另外,MIT 6.824 是一门分布式系统课程,必然会涉及多线程编程,所以竞争检测也是校验 Lab 程序正确性的一种方式。

介绍

Go 借助 goroutine 来实现并发编程,所以数据竞争发生在多个 goroutine 并发读写同一个共享变量时,并且至少有一个 goroutine 存在写操作。来看一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import "fmt"

func main() {
done := make(chan bool)
m := make(map[string]string)
m["name"] = "world"
go func() {
m["name"] = "data race"
done <- true
}()
fmt.Println("Hello,", m["name"])
<-done
}

其中 m 是一个共享变量,被两个 goroutine 并发读写,将上述代码保存为文件 racy.go,然后开启竞争检测执行:

1
go run -race racy.go

会输出类似如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Hello, world
==================
WARNING: DATA RACE
Write at 0x00c00007e180 by goroutine 7:
runtime.mapassign_faststr()
/usr/local/go/src/runtime/map_faststr.go:203 +0x0
main.main.func1()
/path/to/racy.go:10 +0x50

Previous read at 0x00c00007e180 by main goroutine:
runtime.mapaccess1_faststr()
/usr/local/go/src/runtime/map_faststr.go:13 +0x0
main.main()
/path/to/racy.go:13 +0x16b

Goroutine 7 (running) created at:
main.main()
/path/to/racy.go:9 +0x14e
==================
==================
WARNING: DATA RACE
Write at 0x00c000114088 by goroutine 7:
main.main.func1()
/path/to/racy.go:10 +0x5c

Previous read at 0x00c000114088 by main goroutine:
main.main()
/path/to/racy.go:13 +0x175

Goroutine 7 (running) created at:
main.main()
/path/to/racy.go:9 +0x14e
==================
Found 2 data race(s)
exit status 66

竞争检测会提示第13行和第10行存在数据竞争,一共涉及两个 goroutine,一个是主 goroutine,另一个是手动创建的 goroutine

典型数据竞争场景

循环计数器竞争

这个例子中每次循环时会创建一个 goroutine,每个 goroutine 会打印循环计数器 i 的值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"sync"
)

func main() {
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
fmt.Println(i) // Not the 'i' you are looking for.
wg.Done()
}()
}
wg.Wait()
}

我们想要的结果是能输出 01234 这5个值(实际顺序并不一定是 01234),但由于主 goroutinei 的更新和被创建的 goroutinei 的读取之间存在数据竞争,最终的输出结果可能是 55555 也可能是其他值。

如果要修复这个问题,每次创建 goroutine 时复制一份 i 再传给 goroutine 即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"sync"
)

func main() {
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func(j int) {
fmt.Println(j) // Good. Read local copy of the loop counter.
wg.Done()
}(i)
}
wg.Wait()
}

意外地共享变量

在日常开发中,我们可能不经意间在多个 goroutine 间共享了某个变量。在下面的例子中,首先 f1, err := os.Create("file1") 会创建一个 err 变量,接着在第一个 goroutine 中对 file1 写入时会对 err 进行更新(_, err = f1.Write(data)),然而在主 goroutine 中创建 file2 时同样会对 err 进行更新(f2, err := os.Create("file2"),这里虽然用的是 :=,不过 err 并不是一个新的变量,在同一个作用域下是不允许重复对某个同名变量使用 := 创建的,因为 f2 是一个新的变量,所以这里可用 :=),这就产生了数据竞争:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import "os"

// ParallelWrite writes data to file1 and file2, returns the errors.
func ParallelWrite(data []byte) chan error {
res := make(chan error, 2)
f1, err := os.Create("file1")
if err != nil {
res <- err
} else {
go func() {
// This err is shared with the main goroutine,
// so the write races with the write below.
_, err = f1.Write(data)
res <- err
f1.Close()
}()
}
f2, err := os.Create("file2") // The second conflicting write to err.
if err != nil {
res <- err
} else {
go func() {
_, err = f2.Write(data)
res <- err
f2.Close()
}()
}
return res
}

func main() {
err := ParallelWrite([]byte("Hello, world!"))
<-err
}

修复方法也很简单,在每个 goroutine 中使用 := 创建 err 变量即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import "os"

// ParallelWrite writes data to file1 and file2, returns the errors.
func ParallelWrite(data []byte) chan error {
res := make(chan error, 2)
f1, err := os.Create("file1")
if err != nil {
res <- err
} else {
go func() {
// This err is shared with the main goroutine,
// so the write races with the write below.
_, err := f1.Write(data)
res <- err
f1.Close()
}()
}
f2, err := os.Create("file2") // The second conflicting write to err.
if err != nil {
res <- err
} else {
go func() {
_, err := f2.Write(data)
res <- err
f2.Close()
}()
}
return res
}

func main() {
err := ParallelWrite([]byte("Hello, world!"))
<-err
}

未受保护的全局变量

某个包下对外暴露了 RegisterServiceLookupService 两个方法,而这两个方法会对同一个 map 变量进行读写,客户端调用时有可能多个 goroutine 并发调用,从而存在数据竞争:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"net"
)

var service map[string]net.Addr = make(map[string]net.Addr)

func RegisterService(name string, addr net.Addr) {
service[name] = addr
}

func LookupService(name string) net.Addr {
return service[name]
}

func main() {
go func() {
RegisterService("hello", &net.IPAddr{IP: net.ParseIP("127.0.0.1")})
}()
go func() {
fmt.Println(LookupService("hello"))
}()
}

可以通过 sync.Mutex 来保证可见性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"net"
"sync"
)

var (
service map[string]net.Addr = make(map[string]net.Addr)
serviceMu sync.Mutex
)

func RegisterService(name string, addr net.Addr) {
serviceMu.Lock()
defer serviceMu.Unlock()
service[name] = addr
}

func LookupService(name string) net.Addr {
serviceMu.Lock()
defer serviceMu.Unlock()
return service[name]
}

func main() {
go func() {
RegisterService("hello", &net.IPAddr{IP: net.ParseIP("127.0.0.1")})
}()
go func() {
fmt.Println(LookupService("hello"))
}()
}

未受保护的基本类型变量

除了 map 这样的复杂数据类型外,基本类型变量同样会存在数据竞争,如 boolintint64。例如在下面的例子中,主 goroutinew.last 的更新和创建的 goroutine 中对 w.last 的读取间存在数据竞争:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"os"
"time"
)

type Watchdog struct{ last int64 }

func (w *Watchdog) KeepAlive() {
w.last = time.Now().UnixNano() // First conflicting access.
}

func (w *Watchdog) Start() {
go func() {
for {
time.Sleep(time.Second)
// Second conflicting access.
if w.last < time.Now().Add(-10*time.Second).UnixNano() {
fmt.Println("No keepalives for 10 seconds. Dying.")
os.Exit(1)
}
}
}()
}

func main() {
watchdog := &Watchdog{}
watchdog.Start()
watchdog.KeepAlive()
select {}
}

我们依然可以借助 channelsync.Mutex 来修复这个问题,不过类似于 JavaGo 中同样有相应的原子变量来处理基本类型的并发读写,上述例子就可以通过原子包下的 atomic.StoreInt64atomic.LoadInt64 来解决:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"os"
"sync/atomic"
"time"
)

type Watchdog struct{ last int64 }

func (w *Watchdog) KeepAlive() {
atomic.StoreInt64(&w.last, time.Now().UnixNano())
}

func (w *Watchdog) Start() {
go func() {
for {
time.Sleep(time.Second)
if atomic.LoadInt64(&w.last) < time.Now().Add(-10*time.Second).UnixNano() {
fmt.Println("No keepalives for 10 seconds. Dying.")
os.Exit(1)
}
}
}()
}

func main() {
watchdog := &Watchdog{}
watchdog.Start()
watchdog.KeepAlive()
select {}
}

未同步的 send 和 close 操作

虽然对一个 channel 的发送和相应的读取完成之间存在 happens-before 的关系,但是对 channel 的发送和关闭间并没有 happens-before 的保证,依然存在数据竞争:

1
2
3
4
5
6
7
8
9
10
11
package main

func main() {
c := make(chan struct{}) // or buffered channel

// The race detector cannot derive the happens before relation
// for the following send and close operations. These two operations
// are unsynchronized and happen concurrently.
go func() { c <- struct{}{} }()
close(c)
}

所以在关闭 channel 前,增加对 channel 的读取操作来保证数据发送完成:

1
2
3
4
5
6
7
8
9
package main

func main() {
c := make(chan struct{}) // or buffered channel

go func() { c <- struct{}{} }()
<-c
close(c)
}

竞争检测的开销

在开启竞争检测的情况下,程序的内存使用可能会增加5到10倍,性能可能会增加2到20倍,所以一般建议在压力测试或集成测试阶段开启,或者线上集群选择某台机器单独开启。

参考

看汇编语言时看到,标志寄存器中 CF 标志位表示无符号数运算时是否向最高有效位外的更高位产生进位或借位,而 OF 标志位表示有符号数运算时是否产生溢出。这里存在两个疑问:

  1. 对于 CPU 来说,它并不区分处理的是无符号数还是有符号数,那什么时候设置 CF,什么时候设置 OF
  2. CF 表示进位时也是一种溢出,能否和 OF 共用一个

CF

首先来看 CF 进位的例子,这里我们以8位无符号数为例,其最大值为255,那么计算 255 + 1 则会产生进位。可以通过一段简单的汇编代码进行验证:

1
2
3
4
5
6
7
8
.section .text
.globl _start
_start:
mov $255, %al
add $1, %al
movl $1, %eax
movl $0, %ebx
int $0x80

在上述代码中,al 是一个8位寄存器,是 eax 寄存器的低8位,这里首先将255放到 al 寄存器内,然后对 al 寄存器中的值加1并放回到 al 寄存器中,即实现 255 +1 的运算。

最后的 int $0x80 中的 int 表示 interrupt,即中断,当发生一个中断时会有一个与之对应的中断处理程序来处理,这里的 $0x80 就是声明由哪个中断处理程序处理,在 Linux 中,$0x80 对应的是操作系统内核,用于发起一个系统调用,而具体发起哪个系统调用则由 eax 中的值决定,这就是 movl $1, %eax 的作用,1对应的系统调用是 exit,用于退出程序,而程序退出时会伴有一个状态码,这个状态码的值来自于 ebx,也就是 movl $0, %ebx 的作用,这里使用0来表示程序正常退出。

接下来我们借助 gdb 来观察程序运行时 CF 的值的变化。首先将上述代码保存为 demo.s 后进行编译:

1
as demo.s -o demo.o -gstabs+

这里的 -gstabs+ 表示生成机器码时同时生成调试信息,如果没有这个选项后续 gdb 加载时会提示 (No debugging symbols found in ./demo)

然后进行链接:

1
ld demo.o -o demo

这个时候就可以通过 gdb 加载生成的可执行文件:

1
gdb ./demo

alt

然后输入 break 4 在代码第四行设置一个断点,即 mov $255, %al 处,最后输入 run 开始调试执行:

alt

此时可输入 layout reg 来观察各寄存器内的值,我们需要关注的是 eflags 寄存器,它展示了哪些标志位生效了:

alt

或者通过执行 info registers eflags 来查看 eflags 的值:

1
2
(gdb) info registers eflags
eflags 0x202 [ IF ]

目前只有一个 IF 标志位,它用于表示是否响应中断。

接着,输入 next 来执行当前断点所在处的指令,可以看到,执行后 rax 寄存器内的值变成了255(rax 是64位 CPU 下的一个通用寄存器,32位 CPU 下对应为 eax):

alt

再输入一次 next 来执行加法运算,此时 rax 中的值变为了0(实际的二进制结果应该是100000000,因为 al 寄存器最多只能表示8位,所以最高位的1无法表示,最终结果为0),eflags 中出现了 CF 标志位,说明发生了进位:

alt

rax 中的值为0也说明了加法运算后产生的进位并不会体现在比参与运算的寄存器位数更多的寄存器中,否则 rax 中的值应该是256。

再来看借位,将程序稍加修改执行一个 1 - 2 的运算:

1
2
3
4
5
6
7
8
.section .text
.globl _start
_start:
mov $1, %al
sub $2, %al
movl $1, %eax
movl $0, %ebx
int $0x80

最后 rax 中的值为255(存在高位借位的情况下最后的二进制结果为11111111,解释为无符号数为255),eflags 中同样出现了 CF 标志位。

alt

所以,CF 的标记取决于两个二进制数的运算是否产生进位或借位。

OF

有符号数的溢出分两种情况,一种是运算结果应该是正数却返回负数,另一种是运算结果应该是负数却返回正数。

首先来看两个正数运算得到负数的例子,同样对代码稍加修改实现 127 + 1 的运算:

1
2
3
4
5
6
7
8
.section .text
.globl _start
_start:
mov $127, %al
add $1, %al
movl $1, %eax
movl $0, %ebx
int $0x80

最后 rax 中的值为128(对应二进制表示为10000000,以有符号数的角度来看,其值为-128,即两个正数相加得到一个负数),eflags 中出现了 OF 标志位,说明发生了溢出:

alt

从有符号数的角度来看,参与运算的两个数的符号位都是0,相加后符号位却是1,所以 OF 设置为1。

再来看两个负数运算得到正数的例子,再次对代码稍加修改实现 -128 - 1 的运算,-128的二进制补码表示为10000000,即无符号数角度下的128,-1的二进制补码表示为11111111,即无符号数角度下的255:

1
2
3
4
5
6
7
8
.section .text
.globl _start
_start:
mov $128, %al
add $255, %al
movl $1, %eax
movl $0, %ebx
int $0x80

最后 rax 中的值为127(对应二进制表示为01111111,以有符号数的角度来看,其值为127,即两个负数相加得到一个正数),eflags 中出现了 OF 标志位,说明发生了溢出:

alt

从有符号数的角度来看,参与运算的两个数的符号位都是1,相加后符号位却是0,所以 OF 设置为1。

所以,OF 的标记取决于运算结果的符号位是否发生变化,这里的变化指的是两个相同符号位的数的运算结果是一个不同符号位的数。

比较

注意到前面有符号数 -128 - 1 运算的例子中,最后 CFOF 都被设置为了1,说明 CFOF 并不是互斥的关系,在这个例子中即发生了进位又发生了符号位的变更,也就是说如果满足了设置 CF 的条件,那么 CF 就是1,如果满足了设置 OF 的条件,那么 OF 就是1。因此,回到文章开头的问题,CPU 并不是去判断该设置 CF 还是 OF,而是只要条件满足就会设置对应的标志位,而具体应该关注哪个标志位,则交由编译器去判断,因为对 CPU 而言它处理的只是比特运算,只有编译器知道当前的运算数是无符号数还是有符号数。

另外,CFOF 也不能合二为一,无法相互替代,例如两个无符号数相加 CF 有可能是0,但是 OF 却是1,如 127 + 1;两个有符号数相加 OF 有可能是0,但是 CF 却是1,如 -1 - 1。也有可能 CFOF 都是1,如有符号数运算 -128 - 1

参考

Sonatype OSSRH (OSS Repository Hosting) 提供了 JAR 包发布服务,并支持自动将 JAR 包同步到 Maven 中央仓库,所以我们将 JAR 包发布到 Sonatype OSSRH 即可。

创建 Sonatype 工单

第一步在 Sonatype 上注册一个账号,创建成功后在上面创建一个 IssueProject 选择 Community Support - Open Source Project Repository Hosting (OSSRH)Issue Type 选择 New Project

alt

这里要注意的是 Group Id 的填写,根据 Coordinates 的描述,这里分两种情况:

  1. 你拥有某个域名,如 example.com
  2. 你没有域名,但是你的代码托管在了某个代码托管服务上,如 GitHub

对于第一种情况,你的 Group Id 可以是任何以 com.example 为前缀的字符串,如 com.example.myawesomeproject。不过,Sonatype 会要求你证明确实拥有 example.com 域名,你需要在你的域名注册商那创建一条 TXT 记录,其内容就是你创建的 Issue 的工单号,如 OSSRH-12345,具体步骤可参考 How do I set the TXT record needed to prove ownership of my Web Domain?

对于第二种情况,以 GitHub 为例,你的 Group Id 必须是 io.github.myusernamemyusername 是你的 GitHub 账户名或者是组织名,类似的,为了证明你对 myusername 的所有权,你需要在 myusername 下创建一个公开的仓库,仓库名称为你所创建 Issue 的工单号,如 OSSRH-12345,认证完成之后你就可以删掉这个仓库。Sonatype 所支持的代码托管服务如下:

Service Example groupId
GitHub io.github.myusername
GitLab io.gitlab.myusername
Gitee io.gitee.myusername
Bitbucket io.bitbucket.myusername
SourceForge io.sourceforge.myusername

工单示例可参考 Publish my open source java package

安装 GPG

GPG 用于对所发布的包进行签名,在 GnuPG 根据自己的操作系统下载 GPG 安装包,安装完成后执行 gpg --full-gen-key 生成秘钥对,选择默认选项即可,生成秘钥对时会要求输入姓名、邮箱、注释和密码,其中密码在发布阶段会用到,秘钥生成信息类似如下:

1
2
3
4
pub   rsa3072 2022-02-26 [SC]
E892F685E5EA9005E0A2DE31F0F732425A15D81D
uid examplename <examplename@example.com>
sub rsa3072 2022-02-26 [E]

其中 E892F685E5EA9005E0A2DE31F0F732425A15D81D 是秘钥的 ID,然后我们需要将公钥分发到公共的秘钥服务器上,这样 Sonatype 就可以通过这个公钥来验证我们所发布包的签名是否正确:

1
gpg --keyserver keyserver.ubuntu.com --send-keys E892F685E5EA9005E0A2DE31F0F732425A15D81D

这里选择的公共秘钥服务器是 keyserver.ubuntu.com,也可以选择其他服务器,如 keys.openpgp.org 或者 pgp.mit.edu

配置 settings.xml

为了将包发到 Sonatype OSSRH,需要在 Mavensettings.xml 中配置用户信息,即在 servers 下添加如下信息,这里的 your-jira-idyour-jira-pwd 对应第一步创建的账号和密码:

1
2
3
4
5
<server>
<id>ossrh</id>
<username>your-jira-id</username>
<password>your-jira-pwd</password>
</server>

另外,为了在打包时对文件进行签名还需要在 profiles 下添加如下信息,这里的 the_pass_phrase 为生成 GPG 秘钥时设置的密码:

1
2
3
4
5
6
7
8
9
10
<profile>
<id>ossrh</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<gpg.executable>gpg</gpg.executable>
<gpg.passphrase>the_pass_phrase</gpg.passphrase>
</properties>
</profile>

配置 pom.xml

最后是配置 pom.xml,首先我们需要告诉 Maven 将包部署到 Sonatype OSSRH,需要增加一个 nexus-staging-maven-plugin 插件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.7</version>
<extensions>true</extensions>
<configuration>
<serverId>ossrh</serverId>
<nexusUrl>https://s01.oss.sonatype.org/</nexusUrl>
<autoReleaseAfterClose>true</autoReleaseAfterClose>
</configuration>
</plugin>
</plugins>
</build>

然后是配置 Javadoc 和源码插件,如果最后的 JAR 包没有包含 Javadoc 和源码,Sonatype 会不允许通过:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

不过上述配置不适合 Kotlin 项目,会提示 Missing: no javadoc jar found in folder '/com/example/username/awesomeproject',需要将 maven-javadoc-plugin 替换为 dokka-maven-plugin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<build>
<plugins>
<plugin>
<groupId>org.jetbrains.dokka</groupId>
<artifactId>dokka-maven-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<id>attach-javadocs-dokka</id>
<goals>
<goal>javadocJar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

最后,剩下补充一些元数据,这个也是必填项,包括:

  • 项目名称,描述和地址
  • 许可证信息
  • 开发者信息
  • 源码地址

完整的示例可参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.simpligility.training</groupId>
<artifactId>ossrh-demo</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<name>ossrh-demo</name>
<description>A demo for deployment to the Central Repository via OSSRH</description>
<url>http://github.com/simpligility/ossrh-demo</url>

<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>

<developers>
<developer>
<name>Manfred Moser</name>
<email>manfred@sonatype.com</email>
<organization>Sonatype</organization>
<organizationUrl>http://www.sonatype.com</organizationUrl>
</developer>
</developers>

<scm>
<connection>scm:git:git://github.com/simpligility/ossrh-demo.git</connection>
<developerConnection>scm:git:ssh://github.com:simpligility/ossrh-demo.git</developerConnection>
<url>http://github.com/simpligility/ossrh-demo/tree/master</url>
</scm>

...

</project>

发包

执行 mvn clean deploy 即可发包,如果执行成功,在提交的工单中会自动增加一条回复:

Central sync is activated for com.example.awesomeproject. After you successfully release, your component will be available to the public on Central https://repo1.maven.org/maven2/, typically within 30 minutes, though updates to https://search.maven.org can take up to four hours.

也就是30分钟内即可从 Maven 中央仓库下载 JAR 包,不过要想能在 search.maven.org 搜索到你的 JAR 包,需要等待至多4个小时。

另外,因为配置 nexus-staging-maven-plugin 时指定了 autoReleaseAfterClosetrue,所以发包后不需要去 https://oss.sonatype.org/#stagingRepositories 手动执行 closerelease 操作。

参考

二叉搜索树的删除可以分为三种情况。第一,被删除的节点是叶子节点:

alt

第二,被删除的节点只有一个孩子节点:

alt

第三,被删除的节点有两个孩子节点:

alt

对于第一种情况,我们只需断开被删除的节点和其父节点的关联即可,即将节点3的左孩子节点指针置为空;对于第二种情况,我们可以用被删除的节点的孩子节点来替代被删除的节点,即将节点5的右孩子指针改为指向节点7;第三种情况是最为复杂的情况,相当于删除一个子树的根节点,为了保持二叉搜索树的性质,我们可以使用左子树中的最大值或右子树的最小值来替代被删除的根节点。

不过在实现时,考虑到实现的简便,对于第三种情况会通过直接修改当前节点的值来替代修改节点的指针指向,以上述例子来说,如果使用指针修改的方式,则需要修改节点5的左孩子指针,修改节点2的左孩子指针和右孩子指针(这里假设使用节点2来替代被删除的节点3),总共三处修改较为繁琐;而如果使用修改节点值的方式,只需要先将节点3的值改为2(这里假设使用节点2来替代被删除的节点3),然后就可以将问题转化为在余下的左子树中删除节点2。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class TreeNode:
def __init__(self, val=0, left=None, right=None):
self.val = val
self.left = left
self.right = right


class Solution:
def deleteNode(self, root: TreeNode, key: int) -> TreeNode:
if not root:
return root

if root.val < key:
root.right = self.deleteNode(root.right, key)
elif root.val > key:
root.left = self.deleteNode(root.left, key)
else:
if root.left and root.right:
root.val = self._find_min(root.right)
root.right = self.deleteNode(root.right, root.val)
else:
return root.left or root.right

return root

def _find_min(self, root):
while root.left:
root = root.left

return root.val
0%