[OLAP学习]实时更新-StarRocks

  • Post author:
  • Post category:其他


本文根据StarRocks Hacker Meetup的录播视频进行整理。如有错误欢迎指正。视频链接:

列式存储中实时更新与查询性能如何兼得?

演讲者常冰琳,Apache Kudu PMC。

为什么update很重要?

实时分析已经不是新鲜的概念了。数据分析像是和时间赛跑,虽然永远也跑不过时间,但业务总希望能尽早获取数据,诸如反欺诈、订单分析等场景。因此对数据的获取有如下的要求:

  • 从实时数据,到热数据,到经常会被修改的数据都需要能够获取并落地使用
  • 能够直接对tp系统里的数据进行分析

传统的AP数据库可以完成的工作大多有以下几类:

  • T+1的批量ETL,延时太高;
  • 做增量append,没有update;
  • append update&merge on read,查询性能太差。

常见技术路径

一个基础例子:想要完成整行的更新或者删除,应该如何完成?

方案1:Merge on Read

方案:类似LSM树的数据结构,当有更新数据进来时直接append,在读的时候merge。实现起来相对简单。

特点:写很快,读很慢

案例:Hudi的Merge on read 表, StarRocks的Unique Key

缺点:对于AP系统更应该更关注查询性能,但Merge-on-read的查询性能显然不佳

方案2:Copy-on-Write

方案:当有新数据进来的时候会和原始数据文件进行对比。

把有变更的文件找出来全部重写,如果找不到历史文件则写入新的文件,其他文件保持不动。

特点:写很慢,读很快。

案例:delta lake,Hudi的copy on write表,Iceberg,Snowflake

缺点:写入的代价太高(每次都要做join操作,如果涉及到更改的文件过多,可能要把数据集全重写一遍),因此适用于那些写入不频繁(例如小时级/天级)的场景,对于秒级写入代价太大。

方案3: Delta Store

方案:由额外的组件索引delta store来定位所有新来的数据原来所在的位置和新的修改。数据读取时需要获取老数据和delta store从而得到最新的版本。

特点:写慢一些,读很快。

案例:Kudu

缺点:写入会比merge on read慢一点,读的性能虽然达不到copy-on-write,也是比较快的。

方案4: Delete+Insert

方案:和Delta Lake类似,有索引。当有更新数据进来时,需要找到这条数据的位置并在索引中记录它已经被删除了。然后将更新的数据当作新的数据直接append操作。读取时遇到被删除的数据直接跳过。

特点:写慢一些,读很快。

案例:SQL Server,ADB,Hologres,StarRocks Primary Key table

缺点:写入会比merge on read慢一点,读的性能虽然达不到copy-on-write,也是比较快的。

方案总结

需要注意的是,merge-on-read, delta store和delete+insert都有compaction操作来合并数据。在方案上,读和写不可兼得,只有取舍。

StarRocks的技术路径

一条写入事务在StarRocks内是如何完成的?

StarRocks由FE和BE组成。每张表都会分成很多分区,每个分区又会拆成很多Tablet,每个Tablet都可以做多副本,会被分布在不同的BE上。

当有写入事务时,StarRocks会根据分区分桶的规则把数据分散至对应tablet上进行数据写入。写入执行完成后,FE会发起带有版本号的commit的消息给BE。BE commit成功后,事务完成。

在单个Tablet内的结构如下所示:

1. 元数据和DelVector(存储被删除的元数据)存在RocksDB内

2. Rowset:列存文件

3. PrimaryIndex:根据主键定位文件位置,在哪个Rowset的什么位置

其中元数据(Meta)换存在内存里,包含版本列表,该版本包含的rowset,以及相对于上一个版本的delta信息。例如:

[
    {
        EditVersion:[4,0],
        Rowsets:[1,2,3]
        Delta:[3]
    },
    {
        EditVersion:[5,0],
        Rowsets:[1,2,3,4]
        Delta:[4]
    },
    {
        EditVersion:[6,0],
        Rowsets:[1,2,3,4,5]
        Delta:[5]
    },
    rowset_id_next:6
]

举个栗子:

  1. 最初版本为verson(1,0);rowset为空;delta也为空
  2. 写入数据rowset0,此时的commit version+1为2;rowsets里包含了0;相较于上一个版本,delta中新增了0;version变为(2,0)
  3. 写入数据rowset1,此时的commit version+1为3;rowsets里包含了1,整体为0,1;相较于上一个版本,delta中新增了1;version变为(3,0)
  4. 此时发现可以进行compaction,在后台异步进行。compaction的输入为{0,1}。外部同时写入rowset2,此时的commit version+1为4;rowsets里包含了2,整体为0,1,2;相较于上一个版本,delta中新增了2;version变为(4,0)。
  5. compaction完成后也会有一次commit。此次compaction把0,1合并成了3,因此此时rowset是2,3,delta为0因为距离上一个版本没有数据写入;version变成(4,1)
  6. 继续写入数据rowset4,此时的commit version+1为5;rowsets里包含了4,整体为2,3,4;相较于上一个版本,delta中新增了4;version变为(5,0)
  7. 依此类推…

这样产生了一个问题,如果写入很频繁,历史版本就会很多。因此StarRocks采用了gc的方式对半小时前的版本进行删除。如果某些rowset没有被近半个小时的版本引用到,也可以被gc掉。这个gc时间可以用update_cache_expire_sec参数进行调整。

具体的写入流程是怎么进行的?

导入通常都是一批操作(upsert/delete),通过各类导入方式(stream load/broker load等)将数据发送至BE的内存buffer(Mem Table)中。当Mem Table满了之后,这些数据将会被排序、合并同类key,最终flash成upsert和delete的文件组。

这里拆分是因为upsert后续还涉及数据的导入,需要保存所有数据;delete单纯需要标记删除即可,仅需要保存主键信息。所以进行了拆分。

拆分结束后,FE会给BE发commit请求,BE在接受请求后会依次执行:

  • 查找primary index,找出所有涉及upsert和delete的行的位置
  • 将他们标记为删除。删除信息纪录在一个roaring bitmap的文件中,保存在RocksDB内。因此标记为删除实际上是省城了一个delete vector
  • 更新元数据
  • 讲delvec和更新的元数据提交给RocksDB进行持久化

举个栗子:

原始的rowset0内,有id为1至4的四条数据,为version1。Primary Index内记录了主键(id)信息,每个主键对应信息在哪个rowset中的第几行。

此时来了一个新的写入(rowset1),写入数据为id=1,3,5。其中id=1和id=3的数据已经存在,id=5的数据不存在。因此这是一个upsert操作。

在primary index中找到id=1和id=3的数据分别位于rowset0的第0行和第2行,因此我们在delvec中记录这两行为删除,rowset1正常append。此时版本为version2,primary index对应更新。

此时又发生了新的写入:写入id=6的数据,并删除id=3的数据。

在primary index中找到id=3的数据位于rowset1的第1行(此时即便没有发生compaction,rowset0中的数据也会被跳过),在delvec中记录这行为删除,rowset2的upsert正常append。此时版本为version3,primary index对应更新。

此时又发生了新的写入:写入id=2和id=5的数据。

同样是在primary index中找到id=2和id=5的数据所在rowset的行数,标记删除。psert正常append。此时版本为version4,primary index对应更新。

整体回顾一下每个版本所包含的rowset和delvec,如上图所示。

并发写入如何控制?

如果很多写入并发进行,应该如何避免事务冲突呢?回顾写入的过程,可以分为两大块:

  • 写入:即数据被写入rowset中,占据绝大部分时间。
  • commit:包含查找primary index,写delvec,更新元数据。运行速度比较快。

为了避免事务冲突,在并发写入时,第一步写入可以以任意顺序进行,但commit需要按照FE commit的版本号来顺序执行。

Primary Index的设计

上文说到,commit需要被序列化执行,因此commit的速度应该非常快才不会造成性能瓶颈。commit总共分三个部分:

  • 查找primary index
  • 写delvec
  • 更新元数据

其中超过90%的耗时在查找primary index上,因此优化primary index至关重要。

当前,StarRocks的primary index是一个全内存的hash表,只要有导入发生就会拉进内存:

  • key是编码过的binary字符串
  • value是64位的编码过的行的位置:<32bit rowset_id, 32bit rowid>
  • 实现逻辑跟跟查询执行层的hash表一样
  • 性能测试:20-200ns/op 5M-50M op/s per thread

举个栗子,如果将10M行平均分成10个tablet写入,每个tablet内需要完成1M次操作,可以在0.12s内完成(假设10M op/s)。

但是,primary index存在内存中,对内存的消耗很大。StarRocks对primary index做了进一步优化:

  • 预取
  • 内存使用优化

    • fix length key:FixSlice,不用再存长度
    • var length key:按长度shard,可以节省1/2-1/3的内存
    • on-demand:6分钟内如果没有导入就release
  • 未来规划

    • 持久化index,正在进行中

因此,在目前场景下,Primary Key表适合应用在如下场景:

  • 有明显冷/热数据分区

比如只有最近几天的数据需要更新,老的数据一般不会被修改。例如商品订单、网约车订单、app的用户session分析场景。

  • 大宽表

列非常多,比如用户画像。

上述这些场景都是primary index比较小,对内存的影响不大的场景。当然,未来实现了primary index持久化后,应用场景会更加多。

Compaction

当实时写入的小文件越来越多、原来的记录中被删除的越来越多时,就需要compaction来合并小文件、删掉空洞。

这里注意和LSM的compaction不同。LSM的compaction需要把很多sst文件合并成一个新的sst文件并做原子替换。

StarRocks的rcompaction相对简单,因为不存在重复的行,也没有range delete这类的操作,

但需要对primary index做更新。

整体的流程跟commit类似。

举个栗子:

rowset 0-2进行合并,生成rowset4(可以看到所有标记del的行都被筛掉了)。此时更新 primary index。在4.1版本中包含rowset3和rowset4。

如果compaction和写入有冲突怎么办?再举个栗子:

仍旧是rowset0-2进行合并,生成了rowset5。此时在rowset5 commit之前又发生了rowset4的写入。可以看到rowset4 和rowset5中都包含id=4的行。

正常逻辑是rowset4是新的写入,此时写入的id=4 c0=y应该是正确的数据。但compaction和写入是异步的,如果写入的commit早于compaction的commit,则会发生冲突。造成version5.1的数据错误。

为了解决这个问题,StarRocks设计了rssid文件,用于记录此条数据是从哪个rowset过来的。

还是上面的栗子,当rowset0-2 merge的时候,rssid会记录当前rowset5内的每条数据来源于哪个rowset。其中,id=4的这一行来自rowset0。

commit时,StarRocks会判断,rowset5中所有行的rssid和当前primary index中的rowset是否一致。例如此栗中,id=4的rs为4,而不是rowset5中记录的0。这说明id=4的这一行已经被重新写过了,不需要被替换。

由此,compaction中id=4的更新被跳过,这个冲突就被解决掉了。

在处理compaction时,StarRocks采用的是cost based policy。被删除的比例越高、越小的文件越容易被compaction。

容错怎么做?

StarRocks是通过多副本来进行容错的。假设有三个副本,当写事务发生时三个副本都会写入。如果其中一个BE crash了,未能进行写入,重新上线后FE会对该BE发起clone命令。此BE就会从其他副本中抓取数据,恢复正常版本。

抓取数据有两种方式:

1. 增量clone

例如上图这两个副本,replica1目前处于version5,replica0处于version6,因此replica1需要clone数据。(注意,不同replica的第一位版本号是一一对应的,但rowset的组织不一定一样,因为不同的replica可能会有不同的compaction)

在replica0中发现version6和version5中间差的是rowset5,因此replica1会将rowset5从replica0中拉过来commit。

2. 完整clone

当版本隔的实在太多时,会发起完整clone。如上图所示,replica1处在version7,但replica0最早的版本是version10,之前的version已经被GC了。这时无法获取delta信息,因此会触发完整clone,即replica1删除全部数据,并完整复制replica0的数据。

怎么读取?

查询会通过planner生成表达式树,在ScanNode上是一个TabletReader,会告诉Tablet要读取哪个版本的数据。Tablet接到指令后根据版本号,在元数据中可以找到对应的rowset和delvec,从而读取所有的数据。

好处是可以将谓词下推,并充分利用StarRocks的向量化执行,不用merge,可以并行执行,还可以应用各类index。具体举个栗子:

假设我们要执行如下的sql语句:

select count(*), sum(revenue) from orders where state=2

该语句中有一个筛选条件:where state=2

如果是merge on read,则需要扫描全部数据,merge结束后再做filter并aggregate。数据IO和操作都比较多。

如果是delete+insert,谓词就可以直接下推。如果state添加了bitmap索引,则可以直接取到对应数据,并且可以直接读到revenue,其他数据则不需要读取。这样的数据IO和操作都大幅减少。

Benchmark

对比unique key(merge on read)和primary key(delete+insert),查询性能有明显提升。

TPCH跟某业界数据库对比也比较明显。

未来的工作和展望

Partial Update

目前前面提到的都是整行更新,但是更新有很多种,可能只涉及的一行的某几列,其他列保持不变。这就要求系统还要读取原来的数据和新倒入的数据做拼接,步骤更加复杂。这个功能预计2.2发布。

有了partial update功能,下面的几个场景都可以得到满足:

  • 更新维度比较少的场景:类似订单表,订单创建后大部分列不会变了,更新的大部分是订单状态、物流状态等。
  • ELT批量处理场景:对某一列做集体的变换
  • Join on load场景:在ap系统内是一张宽表,上游有很多流,分别对应宽表的某几列
  • Conditional update场景:条件更新。比如数据是乱序的,直接upsert容易老数据覆盖新数据,因此需要加一条时间戳的条件
  • merge update场景:例如对array列进行元素append
  • 通用读写事物……

事务的难度总结:

简单举几个栗子:

append only:类似日志,往上怼就完事了

整行的upsert和delete:要处理冲突,要求有unique的约束

local update:partial的

整体回顾

再次回顾实现update的几种方式:

  • copy on write:当有新数据进来的时候会和原始数据文件进行对比。把有变更的文件找出来全部重写,如果找不到历史文件则写入新的文件,其他文件保持不动。

  • delta store:由额外的组件索引delta store来定位所有新来的数据原来所在的位置和新的修改。数据读取时需要获取老数据和delta store从而得到最新的版本。

  • delete+insert:和Delta Lake类似,有索引。当有更新数据进来时,需要找到这条数据的位置并在索引中记录它已经被删除了。然后将更新的数据当作新的数据直接append操作。读取时遇到被删除的数据直接跳过。

针对不同的应用场景,不同的方式会有不同的优势。下面列举几个思考:

场景1: 大宽表,某一列全部更新

delete+insert需要重写文件,如果用delta store则可以对这一列记录增量,写入的代价比delete+insert要小很多。

还有一种思路是,如果文件可以完全按照列存打散,使用copy on wirte直接重写该列的文件可能会更好。

场景2: 冷热数据的更新

例如有一批数据,分冷热。更新更多涉及热数据而少量涉及冷数据。

在理想情况下,冷数据因为被操作而变成了热数据,最优方案应该是直接delete整行并append一条新的数据到热数据里。同时因为大部分更新的是热数据,热数据采用copy on write重写整列会更好。

因此,最理想的情况下应该是根据数据的冷热特性而动态决定更新的方案。

整理结束,如有错误欢迎指正。StarRocks的Hacker Meetup质量很高,各位大佬讲的深入浅出,如果我还有肝会把另外三篇也肝出来。

期待开源数据库领域能有更多这样优质的内容。



版权声明:本文为mathematics_mo原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。