小紅書基于數據湖的流批統一存儲實踐
一、Lambda架構與實時數倉開發痛點
1、小紅書的數據平臺概覽
首先來整體介紹一下小紅書的數據平臺。
首先在最底層是一個個 Cloud,包括計算、存儲等。在這一基礎之上,是數據采集層,采集一些原始數據,比如用戶行為日志數據、RDBMS 關系型數據庫的增量日志數據,以及其他一些文件系統等。
然后基于源頭數據層(ODS 層)之上是數據存儲和加工層,主要分為兩大塊:一是偏離線的部分,主要使用 Hive、Spark 計算,使用 AWS S3 存儲;二是偏實時的部分,主要使用 Flink 計算,使用 Kafka 存儲。
再往上是一個數據共享層,我們把一些聚合數據、Join 數據和寬表數據寫入數據共享的一些分析引擎中,比如 ClickHouse、StarRocks、TiDB、HBase 等等。這些都是作為數據共享層數據存儲的底座,以及計算分析引擎的一個入口。
最上面是應用層,我們基于這一層做報表、即時查詢等,還會對數據做封裝,打造一些統一的數據產品。
2、典型的 Lambda 架構在小紅書的實踐現狀
小紅書采用的是典型的 Lambda 架構。實時鏈路主要使用 Flink 和 Kafka;離線鏈路主要使用 S3、Spark 和 Hive。Lambda 的特點就是兩條鏈路互相獨立建設,互不影響。
3、實時數倉開發痛點
Lambda 架構的痛點可以總結為三個方面:
① 實時和離線數據不一致,造成數據不一致的原因主要有三點:計算引擎不一致,相同 SQL 定義也容易產生不同結果;作業不同,開發人員需要維護兩套代碼,技術門檻高;數據 TTL 不同,Join 分析天然誤差。
② Kafka 缺乏數據檢索能力,對用戶來說 Kafka 更像一個黑盒。不管 Kafka 中數據存儲的是一些類似 protobuf 的數據還是 json 格式的數據,在做檢索的時候都非常困難。如果用戶想要根據某個條件去檢索數據,這個數據很難被查找。KSQL 產品更像是一個 streaming 的處理,更注重的是實時流處理能力,用來做離線大規模檢索并不適合。
③ 流存儲存數據有限,回溯效率低。這一點最大的原因是成本高,數據不能無限存。而且如果要去回溯讀,從歷史上去回追數據,它讀的性能也不及批量讀。
二、流批統一存儲架構介紹
基于 Lambda 帶來的痛點,我們萌生了去開發一個流批存儲的產品的想法來解決 Lambda 的痛點。下面就來介紹一些設計細節。
1、流批統一存儲架構介紹
如下是流批統一存儲的整體架構:
我們的流批統一存儲叫 Morphing Server,對用戶提供的 API 還是跟 Kafka 完全兼容,都是使用流式的方式去寫入和消費,這些接口都沒有變,所以用戶的使用方式不會有任何變化。
區別在于用戶寫入數據到 Kafka,Kafka 內部會有一個線程,異步將數據同步到數據湖中。我們的數據湖是采用的 Iceberg,當數據寫入到 Kafka 中,內部線程會去抓取 Leader 數據,經過一些 Schema 數據解析轉換為 Table Format 格式寫入到 Iceberg 中,這個過程是異步的,對用戶來說是無感的。
Kafka 的數據會被其他 Flink 作業消費,消費完之后可以寫到下一個 Kafka 中,在下一個 Kafka 依然是以異步的形式將數據落地到數據湖中。數據湖中的數據就可以提供批讀取和批存儲的能力。對于 Iceberg 中的數據如何去讀取的問題,我們會根據實際情況選取一些高性能的分析引擎,比如 StarRocks、小紅書自研的 RedCK 等來讀取離線數據。
2、產品能力
這里我們總結了 6 點流批統一存儲所提供的能力。
① 流批統一:同時提供流存儲和批存儲的讀寫能力,構建多種應用場景。
② 無感寫入:對外提供的寫入接口為原生 Kafka API,用戶無需關注落數據湖過程,自動異步寫湖。
③ Schema 解析:數據在落湖前會提前進行 Schema 解析,以結構化、半結構化的 Table 形式提供查詢。
④ 高速分析:借助 StarRocks 引擎的強大湖上查詢能力,能夠提供向量化、CBO 等高速查詢能力。
⑤ Exactly-Once:流、批數據實現 Exactly-Once 語義,數據一致性高。
⑥ 支持 Rollback:支持批數據的 Rollback 能力,在 Schema 變更不及時下,回溯修復數據。
接下去,我們介紹一下技術選項是如何去考量?關于技術選項分為兩個部分:自動落湖的過程如何選擇;對于數據湖中的數據如何選取合適的引擎去更加高效讀取
3、選型考量:Builtin Or Extension?
對于自動落湖過程我們考慮了兩種形式,Builtin(內嵌)和 Extension(外掛插件),這兩種形式其實都是可以的。
(1)Builtin 形式?
在 Builtin 的形式下,我們看到只有一個獨立的進程,在里面處理落日志之外,還會有一個異步的線程叫 Iceberg Syncer 去不斷拉取日志中的數據,然后寫入湖中,這種方式有優勢也有劣勢。
優勢如下:
① 產品形態完整,統一入口。
② 不需要額外維護外部組件。
③ 資源利用率高,共享進程。
劣勢如下:
① 企業內生成集群版本難以升級,在企業中有一些集群并沒有流批一體的功能,在升級中會非常困難。
② 進程隔離性弱,如果在異步線程中產生 bug,可能影響 Kafka 正常的讀寫功能。
(2)Extension 形式?
針對 Builtin 形式的一些劣勢,我們當初考慮了另外一種選項 - Extension,這個方式相對更加直觀。
Extension 形式,也存在著一些優勢和劣勢。
優勢如下:
① 接入靈活,集群不需要升級,我們把 Kafka 落湖進程摘取到 Kafka 進程之外,是一個單獨的進程,這是最大的一個好處。
② 流存儲可替換,并不局限于 Kafka,可以替換成其他引擎。
③ 進程隔離。
劣勢如下:
① 運維成本高,組件依賴過多,需要維護兩套組件。
② 產品體驗稍差,整體性弱。
目前我們落地的是 Builtin 的方式,所以后面介紹的一些細節方案都是基于 Builtin 方式的。
4、查詢 & 分析引擎選擇
接下來介紹查詢分析引擎的選型。我們希望找到一款 OLAP 產品,具備以下特點:
① MPP 架構、向量化和 CBO 來提高分析性能。
② 支持多場景,能夠在各種場景下滿足我們的需求。
③ 大規模,離線分析數量大,數據種類多的情況下,在大規模數據量下性能不退化。
基于這些考量,有兩大類選擇:左邊的是 Apache Doris 和 StarRocks 為代表的 OLAP 分析引擎;右邊是 ClickHouse 和小紅書基于 ClickHouse 自研的 RedCK 分析引擎。
左邊的分析引擎對分布式支持更好,對 SQL 協議兼容性高,提供更加一站式的查詢平臺。右邊的分析引擎對單表性能更加優秀,在超大規模下的數據承載能力更強,特別是我們在 RedCK 上做了一些深度的定制化自研去滿足更多應用場景。
(1)StarRocks(湖上分析)?
下面介紹我們在分布式引擎上選擇的 StarRocks。
StarRocks 支持湖上分析能力。它本身支持讀數據湖,不需要將數據以任何形式同步到 StarRocks 上,更像一種外表的形式,可以通過 Iceberg 的 Catalog 去查詢數據,還會做一些 Cache 緩存來加速查詢。
(2)StarRocks vs Persto 在流批一體(Iceberg)上的查詢對比?
我們對 StarRocks 和 Presto 在流批一體上做了查詢性能的對比,主要分為兩大類,四小類的 SQL 進行比對。
左邊主要是 Scan 全表掃描相關,在這一方面 Presto 的性能更加優越,但是兩者差距不大。右邊主要是 GroupBy 相關的聚合場景,具有 MPP 架構的 StarRocks 在性能上明顯更加優于 Presto。這也是我們選擇 StarRocks 的原因。因為在這個應用場景下 Join 使用較少,所以這里沒有進行對比。
(3)RedCK 架構?
還有一類分析引擎就是之前提到的 ClickHouse 和 RedCK,如何去更好的分析湖上的數據,這里介紹一下我們自研的 RedCK。
它是一個存算分離的架構,主要分為三個模塊:Service、Query Processing 和 Storage。
Service 主要提供 Gateway 網關和 Service Discovery 服務發現,能夠讓業務更好的接入;Query Processing 是計算層,可以去解析 SQL 生成執行計劃,分派這些任務去讀寫;Storage 是存儲層,支持文件存儲比如 HDFS 和 Juice FS,還支持對象存儲比如 OBS 和 COS。
(4)RedCK(湖上分析)?
接下來看一下 RedCK 和流批存儲是如何結合的。
RedCK 通過 MergeTree 的格式跟其他查詢引擎打通,比如 Spark、Flink 等計算可以直接讀寫 MergeTree 上的數據,然后通過 RedCK 在 MergeTree 上做 OLAP 分析。這樣的好處是使用 Spark 在寫數據的時候可以有一個更好的性能,做到了讀和寫兩種引擎的解耦。
基于這個考慮,我們在 Kafka 流批一體的引擎在落湖的過程中,原本只支持傳統的 Parquet 現在也支持寫 MergeTree 格式,同時也去提交一些和 RedCK 相兼容的元數據信息。這樣 RedCK 可以根據元數據信息直接找到 MergeTree 去做一些分析。
5、架構設計細節
整體上,落湖分為兩大塊:Commit 模塊和 Broker 模塊。
Commit 模塊主要負責:
① Iceberg 的元信息的管理。
② 協調 Broker 觸發 Broker 做 Checkpoint。
③ 更新寫入 Iceberg 的 WaterMark 和 CheckpointID。
④ Controller 做 RollBack 工作。
Broker 模塊主要負責的是數據湖寫入,利用 Kafka 本身的 Fetch 機制,將 Leader 上的最新數據進行解析并且不斷寫入,按照 Partition 維度來做單獨的線程寫入數據。
(1)Broker 設計細節?
Broker 的設計主要包括如下內容:
① Replica Leader:Kafka 原生部分,處理 Produce 請求和 Consume 請求。
② ReplicaRemoteFetcherThread:主要工作線程,異步 Fetch Leader 數據,經過 Schema 解析,寫入 Iceberg。
③ DefultSchemaTransform:Schema 解析模塊,提供寫入 Schema Server 變更。
④ IcebergRemoteLogStorageManager:封裝 Iceberg 接口,提供寫入 Iceberg 的 API 集合。
⑤ Schema Server:提供 Schema 管理服務,支持 Protobuf、Json 等。
(2)Commiter 設計細節?
Committer 主要的工作內容包括:
① Controller:暫時復用 Kafka Controller,實現 Commit 邏輯。
② 與 Broker 交互:發送 Checkpoint 請求,協調各 Broker Checkpoint 信息。
③ 與 Iceberg 交互:發起 Commit 請求。
(3)Excatly-Once 實現:兩階段提交?
Exactly-once 語義主要依托于兩階段提交來實現數據不丟不重,具體如下:
① 第一步,Committer 向所有 Broker 發起一個 RPC 請求,也就是 Checkpoint 請求。
② 第二步,Broker 在接受到 Broker 請求之后將目前為止還沒 Flush 的數據 Flush 到 Iceberg,完成之后將 Checkpoint 信息記錄到 Checkpoint Storage 中。
③ 第三步,Broker 向 Commiter 返回一個 ACK,告訴 Commiter 已經完成 Flush 工作。
④ 第四步,Commiter 等到所有 Broker 返回的 ACK 信息之后,發起第一階段提交并且記錄到 Checkpoint Storage 中,實際上做一個 Commiter 和 CheckpointID 關聯。
⑤ 最后一步,等第一階段完成之后,發起第二階段提交,發出一個 Commit 提交告訴 Iceberg 可以落盤。
(4)Exactly-Once 實現:故障 Failover?
實際生產中,常會出現一些故障。接下來介紹各種故障情況下,如何保證數據的不丟不重。
故障情況大概分為如下幾類:
① Broker 故障:比如突然宕機,其實這個故障沒有太大關系,因為 Kafka 本身有 Leader 切換能力,Leader 切換到其他 Broker 之后,會在新的 Broker 拉起異步線程寫 Iceberg。它會從 Checkpoint Storage 中讀取上一次 Checkpoint,從上一次的 Checkpoint 恢復這些數據去重新寫操作。在一次 Checkpoint 數據向 Iceberg 的數據,因為是 committer 還沒有進行第二階段提交,對于 Iceberg 來說是不可見的,可以直接丟棄這些不可見的數據。
② Controller 故障:在第一階段提交的時候失敗,會被自動切換到別的機器上面去再起一個 Commiter 線程,會發現第一階段還沒完成,那么會重新向所有 Broker 發起一輪新的 RPC 請求,重新做一次 Checkpoint,這一次其它 Broker 在接受到 RPC 請求之后會發現不需要做 flush 操作,就會立刻返回 ACK。在收到所有 ACK 之后,會重新做一次第一階段提交;第一階段提交之后成功了,但是在第二階段提交的時候失敗了,那么 Controller 切換到另外的一個機器首先會去 Checkpoint Storage 中查詢,如果第一階段提交信息已經存在就會直接發起第二階段提交工作。
③ Object Store 故障/HMS 故障:我們會做一個無限重試,并且將一些告警信息發送出來。
三、流批統一存儲應用實踐
流批統一存儲在公司內部落地之后,可以解決一些 Lambda 架構帶來的問題,下面將從四個方面來介紹。
1、Kafka 數據檢索
在流批一體之前,開發同學去檢索 Kafka 數據比較復雜,如左圖顯示:第一步需要去申請一個 topic,按照需要寫數倉作業;第二步找 DBA 申請一個 OLAP 表;第三步再去寫 Flink JOB 去解析 topic 數據寫到剛剛申請的 OLAP 表中,這個表純粹是用來查詢和排障,整個鏈路比較長。在使用流批一體之后,開發同學申請一個 Topic,然后往 Topic 中寫作業,這個時候開發同學可以直接查詢流批統一存儲。
2、強一致的數倉 ODS 層
流批統一的存儲,可作為數倉 ODS 層,建設下游鏈路。因為流批統一存儲是 Excatly-once 語義,所以可以做到實時和離線存儲完全匹配,可以避免雙鏈路帶來的數據不一致問題。
3、批量分區回刷,提升Backfill效率
結合 Flink 提供的流批統一的計算能力,同時從批存儲和流存儲回刷數據,極大提升回刷性能。與 Kafka 相比,批存儲提供更長的數據生命周期,數據 SLA 更有保障。
4、多維分析能力
利用 StarRocks 良好的湖上分析能力,充分發揮向量化引擎和 CBO 優勢,在統一的計算引擎上實現多業務多維分析。例如用戶行為分析、用戶畫像、自助報表、跨域分析等多種分析場景,都可以在一站式平臺上去完成。