作者 | 張靜
編輯 | 姚佳新
本文整理自快手數據架構研發專家張靜在WOT2023大會上的主題分享,更多精彩內容及現場PPT,請關注51CTO技術棧公眾號,發消息【WOT2023PPT】即可直接領取。
今天的分享分為四個部分:首先介紹傳統離線鏈路,它存在哪些痛點;第二部分引入數據湖的特性;第三部分是通過快手數據湖幾個典型的業務場景來說明如何基于數據湖技術重塑離線鏈路的生產;最后一部分介紹近期工作和長遠規劃。希望通過本次分享能夠讓大家了解數據湖技術在重塑離線生產方式中的關鍵作用。
一、傳統離線鏈路的缺點
快手的傳統離線鏈路和很多公司是一致的,基于 Hive做離線分層數倉的建設。在入倉環節和層與層之間是基于 Spark 或者 Hive做清洗加工和計算。這個鏈路有以下四個痛點:
更新成本高:Hive 表最細的更新粒度是分區級,需要先掃出分區的全量數據,關聯這次更新的增量數據得到這次的全量數據并覆蓋原來的分區。這個過程導致計算開銷比較大,且降低時效性;
缺少索引:不僅影響更新,也影響讀取。因為查詢大部分會以掃描為主,由此會導致查詢效率低;
缺少事務:多個寫入任務之間,寫入任務和讀取任務之間缺少事務機制,需要讀寫鎖來避免數據的不一致;
啟動調度晚:目前離線任務調度最細粒度是小時級別,會影響下游各層的數據可見性;
圖片
二、HUDI 數據湖的特性
針對傳統離線鏈路的缺點,我們決定引入數據湖來解決上述的痛點。快手是在21年開始探索數據湖方向,我們進行了技術選型,考慮到HUDI 對更新能力的支持,以及活躍的社區生態,由此便選擇了HUDI。HUDI 具備如下幾個特點:
寫入:由于 HUDI提供多種內置的索引,基于這些索引可以提供高效的更新能力;寫入支持流式入湖,也支持離線入湖;支持多種的寫入操作,比如插入、更新、刪除、覆蓋;支持多種輸入源,比如更新流,日志流。
查詢:支持多種的查詢方式,比如讀優化查詢、快照查詢和增量查詢;提供時間旅行的特點解鎖查詢歷史版本的能力;社區做了很多優化提高查詢的效率。
并發控制:HUDI 引入 MVCC 來控制寫入任務和查詢任務之間的并發。引入 OCC 來控制多個寫入任務之間的并發。同時社區也有一些關于無鎖的并發控制。
豐富的表服務:Compaction、Clustering、Clean 等。
開放性:適配多種計算引擎和查詢引擎,比如 Spark,Flink,Presto,Trino,Starrokcs,Doris 等
Schema Evolution:提供Schema 演進的能力。
圖片
三、快手數據湖的典型業務場景
下面通過快手在數據湖上的幾個典型業務場景介紹如何用 HUDI重塑離線鏈路產生。分為三個方向:數據同步、數據更新、寬表拼接。每個方向都會介紹兩類最有代表性的場景。
圖片
1.數據同步 – 日志流入湖
首先是數據同步里日志流入湖。快手內部的數據同步工具有一個限制:只支持日期和小時兩級分區。所以一個日志流從 Kafka 到入倉整個鏈路需要多個離線任務加工,這就導致了鏈路長,重復計算和冗余存儲的問題。
圖片
基于 HUDI 改進后的方案,整個鏈路得到極大的簡化。直接用 Flink 任務做日志流數據入湖。最后一層將 HUDI 表落到 DWD 層數據主要是做兼容性,這樣下游業務依然可以訪問原來的 Hive 表,同時獲得時效性的提升,在資源持平情況下,時效性從之前1h40min縮減到40min,也降低了了鏈路的復雜度。
圖片
2.數據同步 – CDC 數據入湖
第二個場景是更新場景入湖。歷史上 Mysql to Hive的方案有兩個鏈路,一個全量初始化任務,一個是增量同步任務。初始化任務把全量數據落到一個HIVE 全量快照表,完成后啟動增量同步任務把增量binlog 數據落到一個 HIVE增量表,每天合并前一天的全量和今天的增量生成一個新的全量快照表。
圖片
Mysql to Hive 方案的痛點是時效低。時效低有兩方面原因:第一個是離線任務調度周期是T+1級別,第二個是任務調度以后才做全量和增量的合并。
圖片
改造后的Mysql to HUDI,鏈路得到了簡化,直接把 CDC 更新數據落到一個 HUDI 表里,這個 HUDI 表是沒有日期或者小時分區的。內部的 MySQL to HUDI 和其他公司的 CDC 更新流入湖比較起來有一些差異化的需求,因此我們在設計上也是有所不同。
避免在全量同步完成后再啟動增量同步任務:因為采用傳統的串行調度,如果全量同步任務執行很久才結束,增量同步啟動后可能發現最開始的一些 Kafka 數據已經被清理了,導致數據丟失。因此,支持全量初始化任務和增量同步任務的并行,不需要等全量初始化任務完成后再去調度增量同步任務。
按照事件時間來查詢某個版本:HUDI 的版本是一個 processing time 的語義,但是用戶需要能按照 event time 語義來訪問某個 HUDI 版本。為了支持按照事件時間方案,在元數據里維護 Processing time 到 Event time 的映射關系。收到按照事件時間的快照查詢請求,先做一下映射得到 processing time,再基于time travel能力查詢對應的版本。
數據就緒后盡快發布對應版本:如果完全依賴周期性的 checkpoint 來做分區發布會導致數據就緒后不能立刻發布對應的版本。這里修改了 Flink 引擎的邏輯,除了周期性的 checkpoint 以外,又增加一種非周期性的checkpoint 用于監聽到整點數據就緒以后立刻發布分區。
兼容當前 HIVE 表的使用方式:1. Mysql to HUDI 鏈路里的HUDI 表是沒有日期分區,如何能按照日期分區查詢。2.長生命周期管理,用戶可能需要訪問很久以前的數據。為了支持這兩個需求,Mysql to HUDI 的鏈路會輸出兩個表,一個是無時間分區的 HUDI 表,一個是HIVE 表。在發布分區時,會在HIVE 表里添加一個新分區,這個時候分區 location下是沒有數據,分區元數據里維護了它對應哪個 HUDI 表的哪個版本。無時間分區的HUDI 表是沒有辦法直接做長生命周期的,所以定期把HUDI 數據同步到Hive 表中去。歸檔后的 HIVE 表分區就是一個普通的 HIVE 分區,它的 location 下有對應的分區數據。因此,這個HIVE 表是一個異構的HIVE 表。異構性體現在兩個方面,第一個元數據是異構的,第二個是數據是異構的。這個異構設計對用戶是透明的。當用戶查詢HIVE分區的時候,引擎通過 Hive 元數據判斷這個日期是否被歸檔,如果還沒有被歸檔,會通過分區元數據里的HUDI 表和版本把請求路有到HUDI 表上。如果是歸檔后的分區,直接走正常的HIVE查詢流程把分區數據返回給用戶。
圖片
Mysql to HUDI的整個鏈路如上圖。分為左右兩部分。左邊是必選的,做CDC 入湖;右邊是可選的,為了支持兼容HIVE 的需求。
3.數據更新
數據更新的第一個業務場景是人群包圈選。每次活動DAU 是一個非常重要的指標,人群圈選業務是根據用戶的歷史行為來圈選出一些潛在的目標用戶。歷史方案是基于天級離線數據和小時級離線數據組合計算生成。這種方式存在的最大痛點就是時效性問題,某些場景下的小時級產出的數據延遲在3-4 小時左右,對于除夕活動來說,這種延遲是不能忍受的。基于 HUDI 改造后的鏈路是用一個實時的 Flink 任務,在入湖過程中完成更新。這使得整條鏈得到簡化,不僅時效性從3h ~ 4h左右縮短到15min左右,而且資源也有節約。
第二個業務場景是基于HUDI 自定義的payload能力的N天留存標簽更新。歷史的留存鏈路加工流程需要大規模Join 并且需要與行為數據進行整合,并且需要大規模數據回刷。具體過程是用當天的日活數據和歷史N天的日活數據算出當天日活用戶在過去 180 天的留存標簽,存一個中間表。然后分別用過去N天的行為數據關聯這個中間表得到最新的標簽覆蓋回對應的分區。這個方案的缺點是時效低,重復計算和重復存儲。
基于HUDI 改造后的鏈路從剛才的多層關聯升級為單表生產,時效性也是有了很大的提升,從2.5h縮短到1.5h。資源開銷也是有收益的。這里最重要的就是基于 HUDI 的 MOR 表能力和自定義payload 的特點。寫入流程非常輕量,將當天的日活數據產生的增量數據寫到歷史N 天的分區里。合并流程做在分區內部做局部關聯只更新對應的留存標簽。
圖片
圖片
4.寬表拼接
第三個方向是寬表拼接,也介紹兩個典型的業務場景,一個是離線寬表模型,一個是準實時的多流拼接。
寬表模型是指把業務主題相關的指標、維度、屬性關聯在一起的一張大寬表。寬表模型因為結構簡單,模型可復用度高,數據訪問效率等優勢,廣泛地使用在 BI 和 AI 場景。
圖片
基于 HUDI 的寬表拼接之前有很多公司也有分享,我們內部的寬表拼接有一些差異化的需求。
支持多個寫入任務并行:允許多個寫入任務并行加工一張寬表,每個寫入任務加工這個寬表中的部分列。
支持 Schema Evolution:在業務演進過程中可能隨時需要有更多的列加進來。用戶希望在創建表的時候,只需要定義必要的列,比如主鍵列、分區列、排序列。后續可以很靈活地添加新的列。
支持 Implicit Schema Evolution:顯式的 Schema Evolution 是指通過類似于 Alter table add column 這種DDL 語句來修改表。Implicit Schema Evolution,是指在寫入任務的 Schema里包含了表里不存在的列,會在寫入任務提交時追加到這個表的最后。
支持 Partial Insert:寫入任務不需要指定表里的所有列,允許只插入表里的部分。
支持不同分區設置不同的桶個數:有一些業務分區存在非常大的數據量差異,所以需要能支持不同子分區設置不同的桶個數。
支持快照隔離:讀取任務和寫入任務之間支持快照隔離,上游加工好部分列以后,下游就可以先讀這些加工好的部分列。
圖片
上圖是一個簡單的寬表拼接的例子。兩個寫入任務加工一個寬表,第一個寫入任務加工 id, ts 和name。第二個寫入任務加工 id, ts 和 price。每個寫入任務只需要寫入部分列,這個是 partial insert 的能力。最后合并流程做拼接。另外,這個圖也可以說明 schema evolution。建表時,只定義了主鍵、排序鍵和分區鍵。第一個寫入任務提交的時候追加了name 列,第二個寫入任務提交的時候追加 price 列。
圖片
寫入階段分為兩個階段,第一個階段寫入數據,第二個階段提交數據。第一個階段是無鎖方案的設計,第二個階段是有鎖的設計。第一個階段,寫入任務是在加工同一個文件組的同一個數據版本下不同的增量文件來避免多個任務把一個文件寫花。在提交階段引入一種特殊的沖突檢查機制,允許在不同分區或者是相同分區的不同列上的并發寫入,另外這個階段按需更新 schema,發現有新增的列需要更新schema 。
這個方案也可以用在實時寬表拼接場景,這里因為時間關系,不再做贅述。最后說一下在目前的寬表拼接實現里有一個限制,即寫入任務正在進行時不可以生成合并計劃,可能存在丟數據的風險。在用戶角度這個限制有三點影響:第一個是離線寬表拼接場景需要依賴任務以來關系來避免寫入任務和 schedule compaction 的并行。第二個是對實時寬表拼接場景,只能在同一個 Flink 作業的多個 pipeline 里共同加工一個寬表,不能多個 Flink 作業同時加工一個寬表。第三個是不能滿足實時和離線任務共同加工一張寬表的需求。
四、未來規劃
圖片
近期的工作有四點:
(1)Schedule Compaction 和 Writer 的并發。
(2)可擴展的 Bucket index,實現根據數據量自動適配 bucket number 個數。
(3)加速寫入流程:這里涉及到多個優化點,一個是優化寫入鏈路,一個是減少序列化和反序列化開銷
(4)服務化建設。包括 MetaStore Service 和 Table Service。
圖片
中長期的工作圍繞兩個方向,第一個是建設實時數據湖。對于實時數據湖也會有很多挑戰,需要把它補充齊才可以把實時化做起來,這塊會引入流計算領域領域通用的概念,比如事件時間和watermark。第二個是基于HUDI的分析查詢場景。我們會參與到社區的建設中,通過構建物化視圖減少重復計算加速查詢,后續也會引入緩存加速分析查詢的場景。這兩個方向都有很多地方需要探索和完善。