【读】Pinot: Realtime OLAP for 530 Million Users
介绍
作者对大数据时代下近实时的 OLAP
服务提出了如下要求:
- 高性能:系统应当能够快速的返回用户的查询请求
- 可扩展性:为了能够在处理大量并发查询请求的同时做到近实时的消费大量数据,系统应当提供近线性的扩展性和容错性来满足大规模的服务部署
- 低成本:随着数据容量以及查询并发量的增加,系统的成本不能无限制的增长
- 低延迟的实时数据消费:用户期望能够近实时的查询到刚添加到系统中的数据
- 灵活性:系统应当能够支持用户查询时下钻到任意维度,而不受限于预聚合的数据;同时,系统也能够以零停机的方式在生产环境增加新的查询模式
- 容错性:系统异常时能够提供优雅的服务降级
- 非中断式运维:系统能够以零停机的方式进行服务升级或者表结构变更
- 云服务友好的架构:系统应当能够轻易的部署到商用的云服务环境中
架构
Pinot
诞生于 LinkedIn
,作为一款可扩展的分布式 OLAP
数据库,能够提供低延迟的实时数据分析。Pinot
构建在不可变的追加式数据存储之上,专门为数据分析查询场景作了优化,数据自开始接入到可被查询仅需几秒。
在 LinkedIn
内部,业务数据会先接入到 Kafka
中,然后经过 ETL
处理存储到 HDFS
。Pinot
既支持近实时的从 Kafka
中消费数据,也支持从类似 Hadoop
的离线系统中导入数据。因此,Pinot
遵循 lambda
架构,能够自动合并从 Kafka
接入的流式数据和 Hadoop
导入的离线数据。
数据和查询模型
和常见的数据库一样,Pinot
也以表的方式管理数据,每个表背后由 schema
定义了有哪些列。支持的数据类型包括不同长度的整型,浮点数,字符串,布尔值,以及基于这些数据类型的数组。Pinot
中的列即可以是 dimension
,也可以是 metric
。
Pinot
还支持一个特殊的时间列,一方面在查询时基于该列合并流式数据和离线数据,另一方面作为数据过期判断的依据。
Pinot
的表以 segment
为单位存储,每个 segment
一般能存储几千万条记录,一张表能支持几万个 segment
。segment
可以有副本,从而确保数据的高可用性。segment
中的数据是不可修改的,但是可以整个替换 segment
来更新数据(数据更新代价较大)。
segment
采用列存储保存数据,并支持多种编码策略来减少单个 segment
的大小。一个 segment
的大小一般在几百 MB
到几 GB
不等。下图展示了 segment
的数据存储方式:
Pinot
的查询语言为 PQL
,是 SQL
的子集,支持 select
,projection
,aggregation
,和 top-n
查询,不过不支持连接和嵌套查询。PQL
不支持单条记录级别的创建,更新或者删除。
组件
Pinot
有四个主要的组件用于数据存储,数据管理和查询:
controller
broker
server
minion
除此之外,Pinot
还依赖 Zookeeper
和持久化的对象存储。Pinot
借助 Apache Helix
来管理集群,Apache Helix
是一个通用的集群管理框架,用于管理分布式系统内的分区和副本。
server
主要负责存储 segment
并处理针对所负责的 segment
的查询请求。每个 segment
在 UNIX
的文件系统上对应一个目录,目录中保存了 segment
的元数据和索引文件。segment
的元数据保存了列的信息,包括类型,cardinality
,编码,各式各样的统计信息,以及支持的索引。索引文件保存了每列的索引。索引文件只能追加写入,从而支持按需创建倒排索引(inverted index
)。server
可插拔的架构支持从多种不同的存储格式加载列索引,以及在于运行时生成衍生列(synthetic column
)。同时也能轻易的扩展从类似 HDFS
或者 S3
这样的分布式存储系统读取数据。Pinot
会维护一个 segment
的多个副本,并且所有副本都会参与查询。
controller
负责管理 segment
到 server
的分配。controller
会根据运维需求或者 server
的可用性动态更新 segment
的分配。另外,controller
还负责一系列管理任务,例如查询所有可用的表、segment
,添加或者删除表、segment
。Pinot
的表可以设置过期时间,超过过期时间的 segment
会被 controller
删除。segment
的所有元数据及 segment
到 server
的映射都由 Apache Helix
管理。出于容错性的考虑,LinkedIn
一般会在每个数据中心部署三个 controller
实例,其中一个作为主节点,由 Apache Helix
管理,非主节点在大多数时间里是空闲的。
broker
负责处理查询请求,它首先将查询分发到负责的各个 server
上,然后合并各个 server
的查询结果,最后返回给客户端。客户端通过 HTTP
和 broker
交互,所以可以前置负载均衡器来分摊各个 broker
的压力。
minion
负责运行一些计算密集型的任务,其任务由 controller
的调度器分配。另外,任务管理和调度支持扩展添加新的任务和调度类型以支持变化的业务需求。minion
的其中一个应用场景是数据清洗,出于数据合规的要求,LinkedIn
有时候需要清理特定成员的数据。由于 Pinot
数据的不可变性,minion
运行时需要先下载 segment
,然后清洗数据,接着重新生成 segment
以及重建索引,最后上传 segment
到 controller
覆盖旧的 segment
。
Zookeeper
用于持久化存储元数据,并作为集群中各节点间通信的渠道。集群的状态,segment
的分配,以及元数据都通过 Helix
保存在 Zookeeper
中。segment
本身保存在持久化的对象存储中。在 LinkedIn
内部,Pinot
使用本地的 NFS
作为数据存储层,而运行在 LinkedIn
数据中心之外则借助 Azure Disk
。
下图是 Pinot
的架构图:
常见操作
加载 segment
Helix
借助状态机来描绘集群的状态,集群中的每个资源都有当前的状态以及期望的状态。当状态发生变更时,对应的节点就会执行状态变更流程。
下图展示了 segment
的状态流转:
segment
的初始状态为 OFFLINE
,然后 Helix
会要求 server
执行 segment
从 OFFLINE
到 ONLINE
的状态迁移。server
首先会从对象存储拉取 segment
,解压然后加载,完成后就可以服务查询请求,此时 segment
在 Helix
中的状态为 ONLINE
。
对于还在消费 Kafka
的 segment
,Helix
会要求 server
执行 segment
从 OFFLINE
到 CONSUMING
的状态迁移。server
会从指定的 Kafka
分区的 offset
开始创建消费者,所有的副本也同时从这个 offset
开始消费。
下图展示了 segment
的加载过程:
更新路由表
每当 server
加载或者卸载 segment
,Helix
都会更新集群的状态。broker
会监听集群状态的变更,并更新 server
到可用 segment
的路由表映射。这就确保 broker
能将查询请求正确的路由到可用的 server
上。
查询
当 broker
收到查询请求后,会执行以下步骤:
- 解析查询并优化
- 随机选择被查询表的路由表
broker
将查询发送给路由表中的所有server
,各个server
会各自查询本地的segment
server
根据可用的索引和列的元数据生成逻辑和物理查询计划server
会调度执行查询计划- 当所有查询计划执行完成后,
server
会合并各个查询计划的结果,然后返回给broker
- 当
broker
收到所有server
的查询结果后,broker
会合并结果。如果某些server
的查询出现了错误或者超时,那么该次查询会被标记为不完整,客户端就可以决定是否展示不完整的数据给用户还是之后重新提交查询 broker
返回查询结果给客户端
Pinot
会自动合并来自实时和离线的数据。如下图所示,该表每天生成两个 segment
,实时数据和离线数据在8月1号和8月2号存在重合,当 Pinot
收到一个覆盖这段时间范围的查询时,会将其改写为两个查询,一个负责查询离线数据,其查询时间范围在8月2号之前,另一个负责查询实时数据,其查询时间范围在8月2号及其之后。
这也正是为什么 Pinot
的表需要一个时间列的原因。
server
如何执行查询
server
收到查询后,会生成逻辑和物理查询计划。因为每个 segment
中可用的索引和物理数据组织方式有可能不同,因此查询计划的粒度是 segment
。这使得 Pinot
能够根据某些特殊场景做针对性的优化,例如判断 segment
中的值是否匹配某个查询条件。另外,Pinot
也会根据 segment
的元数据生成某些特殊的查询计划,例如查询 segment
中某列的最大值。
Pinot
会根据预估的执行代价选择物理算子(physical operator
),并根据每一列的统计信息对物理算子重排序从而降低查询的整体成本。之后 Pinot
会将查询计划提交给查询执行器,然后并行处理。
下图展示了查询计划生成的过程:
上传数据
用户可以通过 HTTP POST
将 segment
上传给 controller
。当 controller
收到 segment
后,首先会先将其解压,并检查数据的完整性,并确保新添加的 segment
的大小不会造成表的配额超出限制,然后将 segment
的元数据写入 Zookeeper
中,最后触发 segment
的状态变更为 ONLINE
并分配给合适的副本节点,从而将集群更新至期望的状态,各 server
就可以执行 segment
的加载流程。
下图展示了这一过程:
完结实时 segment
对于从 Kafka
接入实时数据的场景,每个副本都是独立的以相同的起始 offset
开始消费 Kafka
中的数据,并以相同的条件停止消费。当正在消费的 segment
结束消费时,server
会将其持久化到磁盘并提交给 controller
。因为 Kafka
本身也有数据的过期策略,因此 Pinot
支持按照已消费的数据的条数或者所经过的时间来提交一个 segment
。
对于按照已消费的数据的条数来提交 segment
的场景,只要各副本都从相同的 Kafka
分区的 offset
开始消费,以及消费相同的数据条数,则生成的 segment
一定是相同的。不过,根据经过的时间来提交 segment
的场景,则可能因为各副本的本地时钟不同而造成生成的 segment
不一致。因此,Pinot
实现了一套 segment
完结协议来确保所有副本对最终生成的 segment
达成共识。
当一个 segment
准备完结时,server
会将其目前所在的 Kafka
分区的 offset
发给 controller
,并要求 controller
下发进一步的指令。controller
返回给 server
的指令可能有:
- 等待:什么也不做,过段时间再继续询问
controller
- 丢弃:丢弃当前的
segment
,并从controller
拉取替代的segment
;这个发生的情况在于有其他副本已经提交了另一个版本的segment
- 追赶:
server
会继续消费直到controller
指定的offset
,然后再次询问controller
- 保持:将当前
segment
刷新到磁盘并加载;这个发生的情况在于有其他副本已经提交了一个完全一样的segment
- 提交:将当前
segment
刷新到磁盘并尝试提交;如果提交失败,则继续尝试询问controller
,否则加载segment
- 非主节点:当前
controller
不是主节点,需要server
重新询问真正的主节点
server
询问 controller
的回复同样由状态机实现,controller
会等待直到足够数量的副本已经和 controller
进行了通信,或者距离第一次询问已经过去了足够的时间能够决定哪个副本能够提交 segment
。controller
状态机会要求各个副本消费到所有副本中已消费的最大 Kafka
分区的 offset
,然后让消费到最新 offset
的副本提交 segment
。如果当前 controller
发生异常,则新的主节点会发起一个全新的状态机,接着继续上述的操作,所以这会暂缓 segment
的提交,不过对正确性没有影响。
这个策略减少了网络的传输并确保当 segment
被提交时,所有副本都有相同的数据。
云计算友好的架构
Pinot
的设计专门为在云计算环境中运行而优化。商用的云服务提供商为 Pinot
的执行提供了两个重要的条件:配有本地临时存储的计算实例,以及持久化的对象存储系统。
因此,Pinot
被设计为 share-nothing
的架构,各实例都是无状态的。所有持久化的数据都存储在对象存储,所有的系统元数据都存储在 Zookeeper
中;本地磁盘仅作为缓存使用,当实例重启时,所有数据都会从对象存储或者 Kafka
重新读取。因此,可以随时删除一个节点并替换,而不会影响集群的正常运行。
另外,所有面向用户的操作都通过 HTTP
完成,使得用户可以自行选择适合的负载均衡器。
扩展 Pinot
查询执行
Pinot
的查询执行模型被设计为可以扩展支持新的算子以及新的查询类型。例如,Pinot
的最初版本不支持 SELECT COUNT(*)
这样的查询,为了支持该查询则需要修改查询计划器,以及添加新的基于元数据的物理算子,不过不会涉及任何架构上的修改。
Pinot
的物理算子针对每一种数据形式都进行了专门处理;每一种数据编码都有对应的算子,从而能灵活的针对查询优化添加新的索引类型和特定的数据结构。Pinot
可以动态的由 server
或者 minion
重建 segment
内的索引,从而能动态的部署新的索引类型和编码,并且用户不会感知。
索引和物理数据存储
类似 Druid
,Pinot
也支持基于位图的倒排索引。不过,如果能在存储数据时按照主列和二级列排序,则可以支持某些场景下更高效的查询。
例如,LinkedIn
网站上有个功能叫做“谁看了我的档案“,所有相关的查询都涉及到根据 vieweeId
列过滤。如果将数据按照 vieweeId
列排序,那么对于任意一个相关的查询,只需要扫描连续的一部分数据即可,因此 Pinot
可以只保存每一个 vieweeId
的起始和终止位置。这种数据的相邻特性也使得使用向量查询成为了可能。
因此,当创建物理过滤算子时,会先应用执行在已排序的列上,然后将过滤后的数据起始范围传递给后续的算子。后续的算子只需要扫描少部分的数据,从而提高了查询性能。
Iceberg 查询
Iceberg
查询是数据库查询的一个重要使用场景,它用于查询数据集中满足条件的一部分数据,然后进行聚合计算(如 sum
,max
,min
)。例如,如果想知道哪个国家的人访问某个页面最多,只需要先查询出访问次数大于某个最小阈值的国家即可,然后就能回答谁访问最多这个问题。这特别适合分析数据有长尾分布的场景,并且查询只关注某些关键的指标。
Pinot
实现了 star-tree
索引对 iceberg
查询进行了优化。star-tree
包含了一系列预聚合的记录节点,树的每一层包含满足了 iceberg
的查询条件的某个维度的节点,以及表示这一层所有数据的 star-node
。遍历树就等同于执行多个过滤条件的查询。下图展示了 star-tree
的两个例子:
查询路由和分区
Pinot
对于未分区的表会事先生成一个路由表,用于 server
到 segment
的映射,并支持多种路由策略。默认的路由策略是平衡策略(balanced strategy
),它将 segment
均匀的分配给 server
。当查询到达时,所有的 server
都会收到查询请求并查询对应的 segment
。
平衡策略适合于中小规模的集群,不过不适用于大型集群。因为集群越大,出现异常节点的概率也越高,从而由于异常节点拖慢一次查询。因此,Pinot
针对大型集群实现了另一种策略,从而尽量减少单词查询需要通信的 server
的数量。
Pinot
同时也支持对表按照某个分区函数进行分区。对于分区表,Pinot
不会生成路由表,而是根据查询条件将请求路由到持有特定 segment
的 server
。另一方面,Pinot
的分区函数也会和 Kafka
的分区函数行为保持一致,从而确保 Pinot
的离线数据也能和实时数据一样按照相同的分区方式分区。
多租户
对于大型公司来说,如果给每一个应用场景创建一个 Pinot
集群则过于昂贵和难以维护。因此,多租户的支持就尤为重要。为了避免某个租户的查询请求占用另一个租户的资源,Pinot
会给每个租户分配令牌桶(token bucket
)。每个查询请求都会根据实际查询的耗时来按比例的消耗令牌,当令牌不足时,查询就会被放入队列中等待。令牌桶会随着时间推移缓慢恢复,一方面满足了瞬时的查询高峰,另一方面也避免了某个租户的查询耗尽另一个租户的资源。
产线中的 Pinot
在 LinkedIn
的产线中,Pinot
运行在超过3000台分布在不同地理位置的服务器上,维护了超过1500张表,对应超过100万个 segment
。压缩后的数据大小将近30 TB
(未计算副本数据的大小)。各数据中心每秒共处理超过50000次查询。
应用场景类型
LinkedIn
的 Pinot
的使用场景主要分为两类:
- 高吞吐,简单的查询
- 低频,复杂的查询或者涉及数据量巨大
对于第一种场景,要求数据尽量在内存中缓存从而能提供每秒几万的查询,这类查询的查询模式一般不多。
对于第二种场景,一般数据都在磁盘上,由于其低频的查询,因此可以按需加载数据。虽然这类查询频率较低,但是有可能会有瞬时的大量查询,比如用户在访问某个报表页面。对于这种情况,实现资源的多租户共享就显得尤为重要,从而实现最小化硬件资源的占用,避免资源在长时间内处于空闲状态。
运维考量
为了减轻运维的压力,LinkedIn
在设计 Pinot
时就考虑到了尽可能的将 Pinot
设计为能够让用户自助运维的系统。
例如,可以随时修改表的结构来添加新的字段而不需要停机。当 schema
添加了新的列时,Pinot
会在几分钟内给所有已有的 segment
以某个默认值添加该列。同时,Pinot
的运维团队也在持续收集查询日志和执行统计信息,从而能自动的为某些列添加倒排索引以提高查询性能。
不过,在多个数据中心之间和环境(测试和生产环境)之间复制表配置成为了一个问题。目前的解决方案是将表配置保存在源代码控制软件内,然后通过 Pinot
的 REST
接口进行同步。这样做的好处在于能够追踪所有配置的变化,以及提供搜索,验证,代码审查等功能。