日均PB級數據分析,B站基于Iceberg的湖倉一體架構實踐
?一、背景
在B站,每天都有PB級的數據注入到大數據平臺,經過離線或實時的ETL建模后,提供給下游的分析、推薦及預測等場景使用。面對如此大規模的數據,如何高效低成本地滿足下游數據的分析需求,一直是我們重點的工作方向。
我們之前的數據處理流程基本上是這樣的:采集端將客戶端埋點、服務端埋點、日志、業務數據庫等數據收集到HDFS、Kafka等存儲系統中,然后通過Hive、Spark、Flink等離線和實時引擎對數據進行ETL處理及數倉建模,數據存儲使用ORC列式存儲格式,用戶可以通過Presto、Spark等引擎對數倉建模后的數據進行數據探索以及構建BI報表。對于大部分的數據服務和部分BI報表,Presto、Spark訪問ORC格式數據可能無法滿足用戶對于查詢響應時間的要求,這時需要將數據寫入ClickHouse等這種專門的OLAP引擎或者進一步處理數據后寫入HBase、Redis等KV存儲系統中等方式解決。
?
當前的數據處理流程雖然在一定程度上可以滿足目前的業務需求,但是整個流程的效率和成本都還有很大的提升空間,主要體現在:
- 為了提升查詢效率,從Hive表出倉到ClickHouse、HBase、Redis、ElasticSearch、Mysql等外部系統中,需要額外的數據開發工作,額外的存儲冗余,但同時擁有了更少的數據靈活性,復雜的組件支持增加了數據服務開發的成本,更長的數據處理流程也降低了穩定性和可靠性。
- 對于未出倉的數據,用戶無論是進行數據探索還是使用BI報表,都還受SQL on Hadoop本身性能所限,和用戶期望的交互式響應有很大差距。
本文主要介紹為了應對以上挑戰,我們在湖倉一體方向上的一些探索和實踐。
二、為什么需要湖倉一體
在討論這個問題前,我們可能首先要明確兩個概念:什么是數據湖?什么是數據倉庫?這兩個概念在業界都有大量的討論,每個人的說法也不盡相同,我們嘗試總結如下,對于數據湖:
- 使用統一的分布式存儲系統,可假設為無限容量。
- 有統一的元數據管理系統。
- 使用開放的數據存儲格式。
- 使用開放的數據處理引擎對數據進行加工和分析。
我們之前的大數據架構基本上是一個典型的數據湖架構,使用HDFS作為統一的存儲系統,Hive metastore提供統一的Schema元數據管理,數據以CSV、JSON、ORC等開放存儲格式存儲在HDFS上,用戶可以使用SQL、DataSet、FileSystem等各個層次的API使用Hive、Spark、Presto、Python等框架或語言訪問數據。
數據湖架構的好處是有非常大的靈活性,結構化、半結構化、非結構化數據都可以放在數據湖中,用戶可以使用任意合適的引擎對所有的數據進行靈活的數據探索,幾乎沒有任何限制,但是它也存在很大的缺陷,最主要的就是數據管理和查詢效率的問題。
對于數據倉庫:
- 自定義的數據存儲格式。
- 自己管理數據的組織方式。
- 強Schema數據,對外提供標準的SQL接口。
- 具有高效的計算存儲一體設計和豐富的查詢加速特性。
數據倉庫(OLAP引擎)對于數據的要求相對更加嚴格,以ClickHouse為例,必須是預先定義的強Schema數據通過JDBC寫入ClickHouse中,ClickHouse使用自己的存儲格式存儲數據,并且會對數據文件進行排序或者文件合并之類的數據組織優化,對外提供SQL接口,不會暴露內部的數據文件,提供索引等高級的查詢加速特性,內部的計算引擎和存儲格式也會有很多的一體協同優化,一般認為專門的數據倉庫查詢效率會優于數據湖架構,在B站的實踐上,大部分場景,像ClickHouse對比Spark、Presto也確實有量級上的性能提升。
在我們實際的數據處理場景中,除了AI和數據探索等場景,探索未知數據的未知問題,比較依賴數據湖架構的靈活性,其實大部分的場景是基于已知數據的,即我們的數據開發同學,實際上是基于Hive表的強Schema數據,進行從ODS,DWD,DWB到ADS等各個業務數倉的分層建設,本質上我們是主要是基于數據湖的架構進行業務數倉的建設,如何提升這部分場景的查詢效率,使用成本和用戶體驗是我們在這方面工作的核心內容。
湖倉一體是近兩年大數據一個非常熱門的方向,如何在同一套技術架構上同時保持湖的靈活性和倉的高效性是其中的關鍵。常見的是兩條技術路線:一條是從分布式數倉向湖倉一體演進,在分布式數倉中支持CSV、JSON、ORC、PARQUET等開放存儲格式,將數據的處理流程從ETL轉換為ELT,數據注入到分布式數倉后,在分布式數倉中進行業務數倉的建模工作,比如AWS RedShift及SnowFlake等;另外一條是從數據湖向湖倉一體演進,基于開放的查詢引擎和新引入的開放表存儲格式達到分布式數倉的處理效率,這方面閉源商業產品的代表是DataBricks SQL,他們基于兼容Spark API的閉源Photon內核和DeltaLake存儲格式以及S3對象存儲的湖倉一體架構,宣稱在TPC-DS Benchmark上性能超過專門的云數據倉庫SnowFlake。在開源社區領域,Iceberg、Hudi、DeltaLake等項目的出現也為在SQL on Hadoop的數據湖技術方案上實現湖倉一體提供了基礎的技術儲備。在B站,基于我們之前的技術棧和實際的業務場景,我們選擇了第二個方向,從數據湖架構向湖倉一體演進。
三、B站的湖倉一體架構
對于B站的湖倉一體架構,我們想要解決的問題主要有兩個:一是鑒于從Hive表出倉到外部系統(ClickHouse、HBase、ES等)帶來的復雜性和存儲開發等額外代價,盡量減少這種場景出倉的必要性。二是對于基于SQL on Hadoop的分析查詢場景,提升查詢效率,降低成本。我們基于Iceberg構建了我們的湖倉一體架構,在具體介紹B站的湖倉一體架構之前,我覺得有必要先討論清楚兩個問題,為什么Iceberg可以構建湖倉一體架構,以及我們為什么選擇Iceberg?
1、為什么基于Iceberg可以構建湖倉一體架構?
對比開放的SQL引擎、存儲格式如:Presto、Spark、ORC、Parquet和分布式數倉如:ClickHouse、SnowFlake對應層的實現,其實差別不大,開源分布式引擎一直在逐漸補足SQL Runtime和存儲層的一些影響性能的高級特性,比如Runtime CodeGen,向量化執行引擎,基于statistic的CBO,索引等等,當前兩者最大的一個不同在于對于數據組織的管理能力。
對于數據湖架構來說,數據文件在HDFS的分布組織是由寫入任務決定的,而對于分布式數倉來說,數據一般是通過JDBC寫入,數據的存儲組織方式是由數倉本身決定的,所以數倉可以按照對于查詢更加友好的方式組織數據的存儲,比如對數據文件定期compact到合適的大小或者對數據進行合理排序和分組,對于大規模的數據來說,數據的優化組織可以大大提高查詢的效率。
Iceberg、Hudi、DeltaLake等新的表存儲格式的出現,最主要的特性就是可以在HDFS上自組織管理表的metadata信息,從而提供了表數據的Snapshot及粗粒度的事務支持能力,基于此,我們可以在開放的查詢引擎之外,異步地,透明地對Iceberg、Hudi、DeltaLake格式的數據進行重新的數據組織優化,從而達到了分布式數倉類似的效果。
2、為什么選擇Iceberg?
Iceberg、Hudi以及DeltaLake是基本同時期出現的開源表存儲格式項目,整體的功能和定位也是基本相同,網上已經有很多相關對比介紹的文章,這里就不詳細比較了,我們選擇Iceberg的主要原因是:Iceberg在三個里面是表存儲格式抽象的最好的,包括讀寫引擎、Table Schema、文件存儲格式都是pluggable的,我們可以進行比較靈活的擴展,并保證和開源以及之前版本的兼容性,基于此我們也比較看好該項目的長遠發展。
下圖是我們整體的湖倉一體架構,支持開放的Spark、Flink等引擎從Kafka、HDFS接入數據,然后Magnus服務會異步地拉起Spark任務對Iceberg數據進行重新的存儲組織優化,我們主要是用Trino作為查詢引擎,并引入Alluxio做Iceberg的元數據和索引數據的緩存加速。
1)Magnus:Iceberg智能管理服務
Magnus是我們湖倉一體架構的核心組件,它負責管理優化所有的Iceberg表中的數據。Iceberg本身是一個表存儲格式,雖然其項目本身提供了基于Spark、Flink等用于合并小文件,合并metadata文件或者清理過期Snapshot數據等Action Job,但是要依賴外部服務調度這些Action Job,而Magnus正是承擔這個角色。
我們對Iceberg進行了擴展,當Iceberg表發生更新的時候,會發送一個event信息到Magnus服務中,Magnus服務維護一個隊列用于保存這些commit event信息,同時Magnus內部的Scheduler調度器會持續消費event隊列,并根據對應Iceberg表的元數據信息及相關的策略決定是否及如何拉起Spark任務優化Iceberg表的數據組織。
2)Iceberg內核增強
對于豐富的多維分析場景,我們也有針對性的在Iceberg內核和其他方面進行了定制化增強,這里簡要介紹兩個方面:Z-Order排序和索引。
3、Z-Order排序
Iceberg在表的metadata中記錄了文件級別每個列的MinMax信息,并且支持小文件合并以及全局Linear排序(即Order By),這兩者配合起來,我們可以在很多查詢場景實現非常好的DataSkiping效果,比如我們對于某個Iceberg表的數據文件按照字段a進行全局排序后,如果后續查詢帶有a的過濾條件,查詢引擎會通過PredictePushDown把過濾條件下推到文件訪問層,我們就可以根據MinMax索引把所有不需要的文件直接跳過,只訪問數據所在的文件即可。
在多維分析的實際場景中,一般都會有多個常用的過濾字段,Linear Order只對靠前字段有較好的Data Skip效果,通常會采用將低基數字段作為靠前的排序字段,從而才能保證對于后面的排序字段在過濾時也有一定的Data Skipping效果,但這無法從根本上解決問題,需要引入一種新的排序機制,使得多個常用的過濾字段均能夠獲得比較好的Data Skipping效果。
Interleaved Order(即Z-Order)是在圖像處理以及數倉中使用的一種排序方式,Z-ORDER曲線可以以一條無限長的一維曲線,穿過任意維度的所有空間,對于一條數據的多個排序字段,可以看作是數據的多個維度,多維數據本身是沒有天然的順序的,但是Z-Order通過一定規則將多維數據映射到一維數據上,構建z-value,從而可以基于一維數據進行排序,此外Z-Order的映射規則保證了按照一維數據排序后的數據同時根據多個排序字段聚集。
參考wikipedia中的Z-Order介紹,可以通過對兩個數據比特位的交錯填充來構建z-value,如下圖所示,對于(x, y)兩維數據,數據值 0 ≤ x ≤ 7, 0 ≤ y ≤ 7,構建的z-values以及z-order順序如下:
可以看到,如果根據z-values的順序對數據進行排序,并平均分為4個文件,無論我們在查詢中使用x還是y字段過濾進行點查詢,都可以skip一半的不相干文件,如果數據量更大,效果會更好,也就是說,基于Z-Order分區存儲的文件,可以在多個字段上擁有比較好的Data Skipping效果。我們對Spark進行了增強,支持Z-Order Range Partitioner用于對Iceberg數據進行文件間的排序組織,擴展了Iceberg表的元信息,用戶可以自定義期望的Iceberg表的Distribution信息,支持按照Hash、Range、Z-Order等方式進行文件間數據排序,以及對應的OptimizeAction用于拉起Spark任務,按照用戶定義的Distribution信息對Iceberg表進行重組織。具體詳情可查詢參考文獻[1](通過數據組織加速大規模數據分析)。
4、索引
Iceberg默認存儲文件級別每列的Min、Max信息,并用于TableScan階段的文件過濾,基本等價于分布式數倉中的MinMax索引,MinMax索引對于排序后的字段DataSkipping效果很好,但是對于非排序字段,數據隨機散布于各個文件,使用該字段過濾時,MinMax索引基本很難有文件Skip的效果,BloomFilter索引在這種場景下可以更好地發揮作用,尤其是當字段基數較大的時候。布隆過濾器實際上是一個很長的二進制向量和多個Hash函數,數據通過多個函數映射到二進制向量的比特位上,布隆過濾器的空間效率和查詢時間都非常高效,非常適合用于檢索一個元素是否存在于一個集合中。
布隆過濾器的空間效率和查詢時間都非常高效,但是在使用上也有局限之處,主要是它能夠支持的過濾條件是有限的,只適用于:=、IN、NotNull等等值表達式,對于常見的Range過濾,比如>、>=、<、<=等是不支持的。為了支持更豐富的過濾表達式,我們引入了BitMap索引。BitMap也是一個非常常見的數據結構,將一組正整形數據映射到比特位,相比于BloomFilter,不存在Hash沖突的情況,所以不會出現False-Positive,但是一般需要更多的存儲空間。對于高基數字段的BitMap索引,落地實現主要的問題在于:
- 需要存儲字段基數對應個BitMap,存儲代價太大。
- 在Range過濾時,使用BitMap判斷是否可以Skip文件時,需要訪問大量BitMap,讀取代價太大。
為了解決以上問題,我們引入了Bit-sliced Encoded Bitmap實現。具體詳情可查詢參考文獻[2](通過索引加速湖倉一體分析)。
1)在B站的落地
基于Iceberg的湖倉一體方案在B站的數據分析場景正逐漸落地,我們目前已經支撐PB級的數據量,每天響應幾萬個查詢,其中P90的查詢可以在1s內響應,滿足了多個運營分析數據服務交互式分析的需求。接下來,我們希望能夠將湖倉一體架構作為我們OLAP數倉建模的基礎,統一大部分的業務數倉分析層數據的存儲和查詢,簡化技術架構,提升查詢效率,節省資源成本。
四、總結和展望
相比于傳統的SQL on Hadoop技術棧,基于Iceberg的湖倉一體架構,在保證了和已有Hadoop技術棧的兼容性情況下,提供了接近分布式數倉的分析效率,兼顧了湖的靈活性和倉的高效性,從我們落地實踐的經驗看,對于用戶基本透明,只是一種新的Hive表存儲格式,沒有更多使用和認知的門檻,和已有的大數據平臺工具和服務也能非常小代價地集成。為了進一步提高在不同場景的查詢效率和使用體驗,我們還在以下方向對Iceberg進行進一步的增強:
- 星型模型的數據分布組織,支持按照維度表字段對事實表數據進行排序組織和索引。
- 預計算,通過預計算對固定查詢模式進行加速。
- 智能化,自動采集用戶查詢歷史,分析查詢模式,自適應調整數據的排序組織和索引等。
后續的進展我們會持續更新,歡迎感興趣的小伙伴來和我們一起交流溝通。?