【读】Pinot: Realtime OLAP for 530 Million Users

介绍

作者对大数据时代下近实时的 OLAP 服务提出了如下要求:

  • 高性能:系统应当能够快速的返回用户的查询请求
  • 可扩展性:为了能够在处理大量并发查询请求的同时做到近实时的消费大量数据,系统应当提供近线性的扩展性和容错性来满足大规模的服务部署
  • 低成本:随着数据容量以及查询并发量的增加,系统的成本不能无限制的增长
  • 低延迟的实时数据消费:用户期望能够近实时的查询到刚添加到系统中的数据
  • 灵活性:系统应当能够支持用户查询时下钻到任意维度,而不受限于预聚合的数据;同时,系统也能够以零停机的方式在生产环境增加新的查询模式
  • 容错性:系统异常时能够提供优雅的服务降级
  • 非中断式运维:系统能够以零停机的方式进行服务升级或者表结构变更
  • 云服务友好的架构:系统应当能够轻易的部署到商用的云服务环境中

架构

Pinot 诞生于 LinkedIn,作为一款可扩展的分布式 OLAP 数据库,能够提供低延迟的实时数据分析。Pinot 构建在不可变的追加式数据存储之上,专门为数据分析查询场景作了优化,数据自开始接入到可被查询仅需几秒。

LinkedIn 内部,业务数据会先接入到 Kafka 中,然后经过 ETL 处理存储到 HDFSPinot 既支持近实时的从 Kafka 中消费数据,也支持从类似 Hadoop 的离线系统中导入数据。因此,Pinot 遵循 lambda 架构,能够自动合并从 Kafka 接入的流式数据和 Hadoop 导入的离线数据。

数据和查询模型

和常见的数据库一样,Pinot 也以表的方式管理数据,每个表背后由 schema 定义了有哪些列。支持的数据类型包括不同长度的整型,浮点数,字符串,布尔值,以及基于这些数据类型的数组。Pinot 中的列即可以是 dimension,也可以是 metric

Pinot 还支持一个特殊的时间列,一方面在查询时基于该列合并流式数据和离线数据,另一方面作为数据过期判断的依据。

Pinot 的表以 segment 为单位存储,每个 segment 一般能存储几千万条记录,一张表能支持几万个 segmentsegment 可以有副本,从而确保数据的高可用性。segment 中的数据是不可修改的,但是可以整个替换 segment 来更新数据(数据更新代价较大)。

segment 采用列存储保存数据,并支持多种编码策略来减少单个 segment 的大小。一个 segment 的大小一般在几百 MB 到几 GB 不等。下图展示了 segment 的数据存储方式:

alt

Pinot 的查询语言为 PQL,是 SQL 的子集,支持 selectprojectionaggregation,和 top-n 查询,不过不支持连接和嵌套查询。PQL 不支持单条记录级别的创建,更新或者删除。

组件

Pinot 有四个主要的组件用于数据存储,数据管理和查询:

  • controller
  • broker
  • server
  • minion

除此之外,Pinot 还依赖 Zookeeper 和持久化的对象存储。Pinot 借助 Apache Helix 来管理集群,Apache Helix 是一个通用的集群管理框架,用于管理分布式系统内的分区和副本。

server 主要负责存储 segment 并处理针对所负责的 segment 的查询请求。每个 segmentUNIX 的文件系统上对应一个目录,目录中保存了 segment 的元数据和索引文件。segment 的元数据保存了列的信息,包括类型,cardinality,编码,各式各样的统计信息,以及支持的索引。索引文件保存了每列的索引。索引文件只能追加写入,从而支持按需创建倒排索引(inverted index)。server 可插拔的架构支持从多种不同的存储格式加载列索引,以及在于运行时生成衍生列(synthetic column)。同时也能轻易的扩展从类似 HDFS 或者 S3 这样的分布式存储系统读取数据。Pinot 会维护一个 segment 的多个副本,并且所有副本都会参与查询。

controller 负责管理 segmentserver 的分配。controller 会根据运维需求或者 server 的可用性动态更新 segment 的分配。另外,controller 还负责一系列管理任务,例如查询所有可用的表、segment,添加或者删除表、segmentPinot 的表可以设置过期时间,超过过期时间的 segment 会被 controller 删除。segment 的所有元数据及 segmentserver 的映射都由 Apache Helix 管理。出于容错性的考虑,LinkedIn 一般会在每个数据中心部署三个 controller 实例,其中一个作为主节点,由 Apache Helix 管理,非主节点在大多数时间里是空闲的。

broker 负责处理查询请求,它首先将查询分发到负责的各个 server 上,然后合并各个 server 的查询结果,最后返回给客户端。客户端通过 HTTPbroker 交互,所以可以前置负载均衡器来分摊各个 broker 的压力。

minion 负责运行一些计算密集型的任务,其任务由 controller 的调度器分配。另外,任务管理和调度支持扩展添加新的任务和调度类型以支持变化的业务需求。minion 的其中一个应用场景是数据清洗,出于数据合规的要求,LinkedIn 有时候需要清理特定成员的数据。由于 Pinot 数据的不可变性,minion 运行时需要先下载 segment,然后清洗数据,接着重新生成 segment 以及重建索引,最后上传 segmentcontroller 覆盖旧的 segment

Zookeeper 用于持久化存储元数据,并作为集群中各节点间通信的渠道。集群的状态,segment 的分配,以及元数据都通过 Helix 保存在 Zookeeper 中。segment 本身保存在持久化的对象存储中。在 LinkedIn 内部,Pinot 使用本地的 NFS 作为数据存储层,而运行在 LinkedIn 数据中心之外则借助 Azure Disk

下图是 Pinot 的架构图:
alt

常见操作

加载 segment

Helix 借助状态机来描绘集群的状态,集群中的每个资源都有当前的状态以及期望的状态。当状态发生变更时,对应的节点就会执行状态变更流程。

下图展示了 segment 的状态流转:

alt

segment 的初始状态为 OFFLINE,然后 Helix 会要求 server 执行 segmentOFFLINEONLINE 的状态迁移。server 首先会从对象存储拉取 segment,解压然后加载,完成后就可以服务查询请求,此时 segmentHelix 中的状态为 ONLINE

对于还在消费 KafkasegmentHelix 会要求 server 执行 segmentOFFLINECONSUMING 的状态迁移。server 会从指定的 Kafka 分区的 offset 开始创建消费者,所有的副本也同时从这个 offset 开始消费。

下图展示了 segment 的加载过程:

alt

更新路由表

每当 server 加载或者卸载 segmentHelix 都会更新集群的状态。broker 会监听集群状态的变更,并更新 server 到可用 segment 的路由表映射。这就确保 broker 能将查询请求正确的路由到可用的 server 上。

查询

broker 收到查询请求后,会执行以下步骤:

  1. 解析查询并优化
  2. 随机选择被查询表的路由表
  3. broker 将查询发送给路由表中的所有 server,各个 server 会各自查询本地的 segment
  4. server 根据可用的索引和列的元数据生成逻辑和物理查询计划
  5. server 会调度执行查询计划
  6. 当所有查询计划执行完成后,server 会合并各个查询计划的结果,然后返回给 broker
  7. broker 收到所有 server 的查询结果后,broker 会合并结果。如果某些 server 的查询出现了错误或者超时,那么该次查询会被标记为不完整,客户端就可以决定是否展示不完整的数据给用户还是之后重新提交查询
  8. broker 返回查询结果给客户端

Pinot 会自动合并来自实时和离线的数据。如下图所示,该表每天生成两个 segment,实时数据和离线数据在8月1号和8月2号存在重合,当 Pinot 收到一个覆盖这段时间范围的查询时,会将其改写为两个查询,一个负责查询离线数据,其查询时间范围在8月2号之前,另一个负责查询实时数据,其查询时间范围在8月2号及其之后。

alt

这也正是为什么 Pinot 的表需要一个时间列的原因。

server 如何执行查询

server 收到查询后,会生成逻辑和物理查询计划。因为每个 segment 中可用的索引和物理数据组织方式有可能不同,因此查询计划的粒度是 segment。这使得 Pinot 能够根据某些特殊场景做针对性的优化,例如判断 segment 中的值是否匹配某个查询条件。另外,Pinot 也会根据 segment 的元数据生成某些特殊的查询计划,例如查询 segment 中某列的最大值。

Pinot 会根据预估的执行代价选择物理算子(physical operator),并根据每一列的统计信息对物理算子重排序从而降低查询的整体成本。之后 Pinot 会将查询计划提交给查询执行器,然后并行处理。

下图展示了查询计划生成的过程:

alt

上传数据

用户可以通过 HTTP POSTsegment 上传给 controller。当 controller 收到 segment 后,首先会先将其解压,并检查数据的完整性,并确保新添加的 segment 的大小不会造成表的配额超出限制,然后将 segment 的元数据写入 Zookeeper 中,最后触发 segment 的状态变更为 ONLINE 并分配给合适的副本节点,从而将集群更新至期望的状态,各 server 就可以执行 segment 的加载流程。

下图展示了这一过程:

alt

完结实时 segment

对于从 Kafka 接入实时数据的场景,每个副本都是独立的以相同的起始 offset 开始消费 Kafka 中的数据,并以相同的条件停止消费。当正在消费的 segment 结束消费时,server 会将其持久化到磁盘并提交给 controller。因为 Kafka 本身也有数据的过期策略,因此 Pinot 支持按照已消费的数据的条数或者所经过的时间来提交一个 segment

对于按照已消费的数据的条数来提交 segment 的场景,只要各副本都从相同的 Kafka 分区的 offset 开始消费,以及消费相同的数据条数,则生成的 segment 一定是相同的。不过,根据经过的时间来提交 segment 的场景,则可能因为各副本的本地时钟不同而造成生成的 segment 不一致。因此,Pinot 实现了一套 segment 完结协议来确保所有副本对最终生成的 segment 达成共识。

当一个 segment 准备完结时,server 会将其目前所在的 Kafka 分区的 offset 发给 controller,并要求 controller 下发进一步的指令。controller 返回给 server 的指令可能有:

  • 等待:什么也不做,过段时间再继续询问 controller
  • 丢弃:丢弃当前的 segment,并从 controller 拉取替代的 segment;这个发生的情况在于有其他副本已经提交了另一个版本的 segment
  • 追赶:server 会继续消费直到 controller 指定的 offset,然后再次询问 controller
  • 保持:将当前 segment 刷新到磁盘并加载;这个发生的情况在于有其他副本已经提交了一个完全一样的 segment
  • 提交:将当前 segment 刷新到磁盘并尝试提交;如果提交失败,则继续尝试询问 controller,否则加载 segment
  • 非主节点:当前 controller 不是主节点,需要 server 重新询问真正的主节点

server 询问 controller 的回复同样由状态机实现,controller 会等待直到足够数量的副本已经和 controller 进行了通信,或者距离第一次询问已经过去了足够的时间能够决定哪个副本能够提交 segmentcontroller 状态机会要求各个副本消费到所有副本中已消费的最大 Kafka 分区的 offset,然后让消费到最新 offset 的副本提交 segment。如果当前 controller 发生异常,则新的主节点会发起一个全新的状态机,接着继续上述的操作,所以这会暂缓 segment 的提交,不过对正确性没有影响。

这个策略减少了网络的传输并确保当 segment 被提交时,所有副本都有相同的数据。

云计算友好的架构

Pinot 的设计专门为在云计算环境中运行而优化。商用的云服务提供商为 Pinot 的执行提供了两个重要的条件:配有本地临时存储的计算实例,以及持久化的对象存储系统。

因此,Pinot 被设计为 share-nothing 的架构,各实例都是无状态的。所有持久化的数据都存储在对象存储,所有的系统元数据都存储在 Zookeeper 中;本地磁盘仅作为缓存使用,当实例重启时,所有数据都会从对象存储或者 Kafka 重新读取。因此,可以随时删除一个节点并替换,而不会影响集群的正常运行。

另外,所有面向用户的操作都通过 HTTP 完成,使得用户可以自行选择适合的负载均衡器。

扩展 Pinot

查询执行

Pinot 的查询执行模型被设计为可以扩展支持新的算子以及新的查询类型。例如,Pinot 的最初版本不支持 SELECT COUNT(*) 这样的查询,为了支持该查询则需要修改查询计划器,以及添加新的基于元数据的物理算子,不过不会涉及任何架构上的修改。

Pinot 的物理算子针对每一种数据形式都进行了专门处理;每一种数据编码都有对应的算子,从而能灵活的针对查询优化添加新的索引类型和特定的数据结构。Pinot 可以动态的由 server 或者 minion 重建 segment 内的索引,从而能动态的部署新的索引类型和编码,并且用户不会感知。

索引和物理数据存储

类似 DruidPinot 也支持基于位图的倒排索引。不过,如果能在存储数据时按照主列和二级列排序,则可以支持某些场景下更高效的查询。

例如,LinkedIn 网站上有个功能叫做“谁看了我的档案“,所有相关的查询都涉及到根据 vieweeId 列过滤。如果将数据按照 vieweeId 列排序,那么对于任意一个相关的查询,只需要扫描连续的一部分数据即可,因此 Pinot 可以只保存每一个 vieweeId 的起始和终止位置。这种数据的相邻特性也使得使用向量查询成为了可能。

因此,当创建物理过滤算子时,会先应用执行在已排序的列上,然后将过滤后的数据起始范围传递给后续的算子。后续的算子只需要扫描少部分的数据,从而提高了查询性能。

Iceberg 查询

Iceberg 查询是数据库查询的一个重要使用场景,它用于查询数据集中满足条件的一部分数据,然后进行聚合计算(如 summaxmin)。例如,如果想知道哪个国家的人访问某个页面最多,只需要先查询出访问次数大于某个最小阈值的国家即可,然后就能回答谁访问最多这个问题。这特别适合分析数据有长尾分布的场景,并且查询只关注某些关键的指标。

Pinot 实现了 star-tree 索引对 iceberg 查询进行了优化。star-tree 包含了一系列预聚合的记录节点,树的每一层包含满足了 iceberg 的查询条件的某个维度的节点,以及表示这一层所有数据的 star-node。遍历树就等同于执行多个过滤条件的查询。下图展示了 star-tree 的两个例子:

alt

alt

查询路由和分区

Pinot 对于未分区的表会事先生成一个路由表,用于 serversegment 的映射,并支持多种路由策略。默认的路由策略是平衡策略(balanced strategy),它将 segment 均匀的分配给 server。当查询到达时,所有的 server 都会收到查询请求并查询对应的 segment

平衡策略适合于中小规模的集群,不过不适用于大型集群。因为集群越大,出现异常节点的概率也越高,从而由于异常节点拖慢一次查询。因此,Pinot 针对大型集群实现了另一种策略,从而尽量减少单词查询需要通信的 server 的数量。

Pinot 同时也支持对表按照某个分区函数进行分区。对于分区表,Pinot 不会生成路由表,而是根据查询条件将请求路由到持有特定 segmentserver。另一方面,Pinot 的分区函数也会和 Kafka 的分区函数行为保持一致,从而确保 Pinot 的离线数据也能和实时数据一样按照相同的分区方式分区。

多租户

对于大型公司来说,如果给每一个应用场景创建一个 Pinot 集群则过于昂贵和难以维护。因此,多租户的支持就尤为重要。为了避免某个租户的查询请求占用另一个租户的资源,Pinot 会给每个租户分配令牌桶(token bucket)。每个查询请求都会根据实际查询的耗时来按比例的消耗令牌,当令牌不足时,查询就会被放入队列中等待。令牌桶会随着时间推移缓慢恢复,一方面满足了瞬时的查询高峰,另一方面也避免了某个租户的查询耗尽另一个租户的资源。

产线中的 Pinot

LinkedIn 的产线中,Pinot 运行在超过3000台分布在不同地理位置的服务器上,维护了超过1500张表,对应超过100万个 segment。压缩后的数据大小将近30 TB(未计算副本数据的大小)。各数据中心每秒共处理超过50000次查询。

应用场景类型

LinkedInPinot 的使用场景主要分为两类:

  • 高吞吐,简单的查询
  • 低频,复杂的查询或者涉及数据量巨大

对于第一种场景,要求数据尽量在内存中缓存从而能提供每秒几万的查询,这类查询的查询模式一般不多。

对于第二种场景,一般数据都在磁盘上,由于其低频的查询,因此可以按需加载数据。虽然这类查询频率较低,但是有可能会有瞬时的大量查询,比如用户在访问某个报表页面。对于这种情况,实现资源的多租户共享就显得尤为重要,从而实现最小化硬件资源的占用,避免资源在长时间内处于空闲状态。

运维考量

为了减轻运维的压力,LinkedIn 在设计 Pinot 时就考虑到了尽可能的将 Pinot 设计为能够让用户自助运维的系统。

例如,可以随时修改表的结构来添加新的字段而不需要停机。当 schema 添加了新的列时,Pinot 会在几分钟内给所有已有的 segment 以某个默认值添加该列。同时,Pinot 的运维团队也在持续收集查询日志和执行统计信息,从而能自动的为某些列添加倒排索引以提高查询性能。

不过,在多个数据中心之间和环境(测试和生产环境)之间复制表配置成为了一个问题。目前的解决方案是将表配置保存在源代码控制软件内,然后通过 PinotREST 接口进行同步。这样做的好处在于能够追踪所有配置的变化,以及提供搜索,验证,代码审查等功能。

参考