数据治理
Flink 实时写入 Iceberg 带来的问题
在实时数据源源不断经过 Flink 写入的 Iceberg 的过程中,Flink 通过定时的 Checkpoint 提交 snapshot commit 操作到 Iceberg,将已写入到 Iceberg 的数据文件通过 Snapshot 组织暴露出来。如果不对流实时写入 Iceberg 的文件进行治理,久而久之 Iceberg 下的小文件会越来越多,Snapshot 版本也越来越多,查询速度大打折扣。
数据治理方案
基于上述问题,我们需要对 Iceberg 的元数据和数据文件定期进行治理。治理方向主要有俩点:
- 清理快照
- 合并小文件
因为我们查询引擎用 Trino,于是我们选用 Trino 对 Iceberg 进行优化。
Trino-Iceberg Connetor 提供了优化方法:
-- 清理快照 ALTER TABLE test_table EXECUTE remove_orphan_files(retention_threshold => '7d') -- 合并小文件 ALTER TABLE test_table EXECUTE optimize(file_size_threshold => '10MB')
使用 Trino SQL 便可以对 Iceberg 表进行优化,很方便。我们基于 Trino SQL 上,做了一个自动自助的 Iceberg 表优化工具,实现了定时对某个 Catalog 下的表进行优化,省去了人工运维优化的成本。
除了快照清理和合并小文件外,Trino 提供了清理无效数据的方法,可以删掉一些已经不被 Iceberg 管理的无用的数据文件。我们是每周对 Iceberg 执行一次无效数据清理。
-- 清理无效文件 ALTER TABLE test_table EXECUTE remove_orphan_files(retention_threshold => '7d')
查询加速
我们都知道对 Iceberg Partition 列进行查询速度都很快,因为其过滤掉很多文件,只读取符合查询分区的数据文件。单读到底层的 ORC 数据文件时,Iceberg 提供了 min/max 等数据元信息,通过元信息可以快速得知所找的数据是否在此文件内。
Bloom Filter
在最新的 Iceberg 1.1.0 版本中,Iceberg 支持在 ORC 数据文件内设置 bloom filters。
而新版 Trino 也跟上 Iceberg 适配 bloom filter,我们需要在 trino-iceberg 的配置文件里配置,来开启 Trino 查询时使用 bloom filter 查询
hive.orc.bloom-filters.enabled = true
除此之外,我们还需要设置 Iceberg 表属性,对列配置上 bloom filter
CREATE TABLE iceberg_table ( token_address varchar, from_address varchar, to_address varchar, block_timestamp timestamp(6) with time zone, ) WITH ( orc_bloom_filter_columns = ARRAY['token_address','from_address','to_address'], orc_bloom_filter_fpp = 0.05, partitioning = ARRAY['day(block_timestamp)'] )
因为 bloom filter 是生效于 ORC 文件中,如果想要应用在旧表上,需要将旧表数据重写到新表上,这样底层的数据文件才带有 bloom filter。
举例:
假如我们有一张 token_transfer 表,表内大概有四个字段
- from_address 买方地址
- to_address 卖家地址
- token_address 交易代币
- block_timestamp 日期
我们对该表 from_address、to_address、token_address 应用 bloom filter,对 timestamp 进行分区。该表每天的数据量假设有 100w 条数据。
此时有俩类查询过来:
- 查询热门 token 今天发生的交易
select * from token_transfer where token_address = '热门token' and block_timestamp > today
- 查询冷门 token 今天发生的交易
select * from token_transfer where token_address = '冷门token' and block_timestamp > today
此时俩类查询的 bloom filter 产生的效果是不一样的,因为热门的 token 会存在大部分数据文件里,冷门的 token 大概率只存在于少部分数据文件内。对于热门 token,bloom filter 的加速效果不佳,但对于冷门 token,bloom filter 帮助其快速过滤掉了很多数据文件,快速找到有冷门 token 的数据文件,加速效果极佳。
所以得到的结论是,bloom filter 对一些 不重复,特征值很高的数据有比较好的加速效果。
Order & Z-Order
上文提到,ORC数据文件内有 min/max 值,查询引擎可以根据 min/max 值判断数据是否在此文件内。
可是日常在写入 Iceberg 的数据一般都是无序写入的,无序写入会导致每个数据文件也是无序的,不能发挥 min/max 过滤的效果。
Order
Spark 提供了一个压缩文件并排序的方法,可以将无序的文件按指定列排好序。排序策略不仅可以优化文件大小,还可以对数据进行排序以对数据进行聚类以获得更好的性能。将相似数据聚集在一起的好处是更少的文件可能具有与查询相关的数据,这意味着 min/max 的好处会更大(扫描的文件越少,速度越快)。
CALL catalog.system.rewrite_data_files( table => 'db.teams', strategy => 'sort', sort_order => 'team ASC NULLS LAST, name DESC NULLS FIRST' )
Z-Order
虽然 Order 排序可以同时对多列进行排序,但其列与列之间的排序是有先后顺序之分的,像是 MySQL 里的联合索引,先对 字段A 排序再对 字段B 排序。如果只是的查询的谓词只包含 字段B,则上述索引失效(先对 字段A 排序再对 字段B 排序)。
而 Z-Order 能解决上面的问题,使用 Z-Order 对多列排序,列与列之间的排序权重相同。所以使用 Z-Order 对多字段进行排序,查询中只要谓词命中了 Z-Order 中其中任何一字段,都能加速查询。
Spark 提供了使用 Z-Order 的方法
CALL catalog.system.rewrite_data_files( table => 'db.people', strategy => 'sort', sort_order => 'zorder(height_in_cm, age)' )
差异
我们测试过对 100G 的表分别进行 Order 和 Z-Order,命中 Order 最高能带来 10 倍的性能提升,命中 Z-Order 能带来 2 倍的性能提升。粗步得到的结论是,Order 比 Z-Order 大致快 2 倍。
所以在实践应用上不能盲目选择 Z-Order,得根据这张表的热门查询SQL、字段特征、数量来做:
- 查询字段是数据连续且范围小的,选 Order
- 查询字段具有高基数特征,选 Z-Order
- 频繁查询此表多个字段的,选 Z-Order,否则 Order 的性能会更好
小结
Iceberg 做了很多功夫去加速查询,本文中提到的小文件合并、快照清理、Bloom Filter、Order、Z-Order 都是为了在查询时跳过无用的文件,通过减少磁盘 IO 操作来加速查询。Trino 和 Spark 提供许多便利的方法给开发者维护治理 Iceberg;数据治理这块成本比较低,可以写好自动化脚本每天执行数据治理;查询加速这里的维护成本比较高,都是需要重写元数据和数据文件的操作,一般每月做一次重写操作。