得物供應鏈復雜業務實時數倉建設之路
1、背景
得物供應鏈業務是紛繁復雜的,我們既有JIT的現貨模式中間夾著這大量的倉庫作業環節,又有到倉的寄售,品牌業務,有非常復雜的逆向鏈路。在這么復雜的業務背后,我們需要精細化關注人貨場車的效率和成本,每一單的及時履約情況,要做到這一點我們需要各粒度和維度的數據來支撐我們的精細化管理。
1.1 業務早期
業務早期,業務反饋我們后臺管理系統某些報表查詢慢。查詢代碼可知,如下圖:
這種現象一般表現為:
- 大表JOIN,rdbms不擅長做數據聚合,查詢響應慢,調優困難;
- 多表關聯,索引優化,子查詢優化,加劇了復雜度,大量索引,讀庫磁盤空間膨脹過快;
- 數據量大,多維分析困難,跨域取數,自助拉到實時數據困難等。
一方面原因是系統設計之初,我們主要關注業務流程功能設計,事務型業務流程數據建模,對于未來核心指標的落地,特別是關鍵實時指標落地在業務快速增長的情況下如何做到非常好的支撐。mysql在此方面越來越捉襟見肘。
另外一方面原因是mysql這種oltp數據庫是無法滿足實時數據分析需求的,我們需要探索一套實時數據架構,拉通我們的履約,倉儲,運配等各域的數據,做有效串聯,因此我們開始了我們的實時數據架構探索,下圖是我們一些思考。
附:數據視角的架構設計也是系統架構設計的重要組成部分。
2、架構演變
2.1 原始階段
2.1.1 通過Adb(AnalyticDB for MySQL)完成實時join
通過阿里云DTS同步直接將業務庫單表實時同步到Adb,通過Adb強大的join能力和完全兼容mysql語法,可以執行任意sql,對于單表大數據量場景或者單表和一些簡單維表的join場景表現還是不錯的,但是在業務復雜,復雜的sql rt很難滿足要求,即使rt滿足要求,單個sql所消耗的內存,cpu也不盡人意,能支撐的并發量很有限。
2.1.2 通過Otter完成大寬表的建設
基于Canal開源產品,獲取數據庫增量日志數據并下發,下游消費增量數據直接生成大寬表,但是寬表還是寫入mysql數據庫,實現單表查詢,單表查詢速度顯著提升,無olap數據庫的常見做法,通過寬表減少join帶來的性能消耗。
但是存在以下幾個問題:
- 雖然otter有不錯的封裝,通過數據路由能做一些簡單的數據拼接,但在調試上線復雜度上依然有不小的復雜度;
- otter偽裝mysql從庫同時要去做etl邏輯,把cdc干的活和實時ETL的活同時干了,耦合度較高
。
2.2 實時架構1.0
2.2.1 flink+kafka+ClickHouse
在上述調研嘗試后都沒有解決根本的問題,我們開始把目標建立標準的實時數倉的思路上來,在20年olap沒有太多的可選項,我們把目標放在clickhouse上。
- 為了保證順序append每次寫入都會生成一個part文件,滿足一定條件后臺定時合并。
- 非常弱的update delete,不能保證原子性和實時性。
- clickhouse只適合數據量大,業務模型簡單,更新場景少的場景。
- 存算不分離,復雜查詢影響clickhouse寫入。
因為clickhouse的這些特性,尤其是不支持upsert的情況下,我們通常需要提前把大寬表的數據提前在flink聚合好,并且供應鏈數據生命周期長,作業流程也長如:
- 貨物的生命周期較短時長為一周,長周期時長超過1個月;
- 庫內環節異常的多,從賣家發貨到收貨、分揀、質檢、拍照、鑒別、防偽、復查、打包、出庫、買家簽收等十幾個甚至更多的環節,一張以商品實物id為主鍵的大寬表,需要join幾十張業務表;
- 供應鏈系統早期設計沒有每張表都會冗余唯一單號(入庫單,作業單,履約單)這樣的關鍵字段,導致沒辦法直接簡單的join數據。
在這樣一個架構下,們的flink在成本上,在穩定性維護上,調優上做的非常吃力。
附:
clickhouse不支持標準的upsert模式,可以通過使用AggregatingMergeTree 引擎字段類型使用SimpleAggregateFunction(anyLast, Nullable(UInt64)) 合并規則取最后一條非null數據可以實現upsert相似的功能,但讀時合并性能有影響。
2.3 實時架構2.0
2.3.1 flink+kafka+hologres
因此我們迫切的希望有支持upsert能力的olap數據庫,同時能搞定供應鏈寫多少的場景,也能搞定我們復雜查詢的場景,我們希望的olap數據至少能做到如下幾點:
- 有upsert能力,能對flink大任務做有效拆分;
- 存算分離,復雜業務計算,不影響業務寫入,同時能平滑擴縮容;
- 有一定的join能力帶來一些靈活度;
- 有完善的分區機制,熱數據查詢性能不受整體數據增長影響;
- 完善的數據備份機制。
這樣一個行列混合的olap數據庫,支持upsert,支持存算分離,還是比較符合我們的預期。
目前這樣一套架構支持了供應鏈每天數千人的報表取數需求,以及每天10億數據量的導出,訪問量在得物所有to B系統中排名靠前。
2.3.2 我們遇到的一些問題
多時間問題
如何設置segment_key,選擇哪個業務字段作為segment_key供應鏈幾十個環節都有操作時間,在不帶segment_key的情況下性能如何保障,困擾了我們一段時間。
設置合理的segment_key如有序的時間字段,可以做到完全順序寫。每個segment文件都有個min,max值,所有的時間字段過來只需要去比較下在不在這個最小值最大值之間(這個動作開銷很低),不在范圍內直接跳過,在不帶segment_key查詢的條件下,也能極大的降低所需要過濾的文件數量。
批流融合
背景:業務快速發展過程中,持續迭代實時任務成為常態。供應鏈業務復雜,環節多,流程往往長達一個月周期之久,這就導致state ttl設置周期長。job的operator變化(sql修改),checkpoint無法自動恢復,savepoint恢復機制無法滿足,比如增加group by和join。重新消費歷史數據依賴上游kafka存儲時效,kafka在公司平臺一般默認都是存儲7天,不能滿足一個月數據回刷需求場景。
方案:通過批流融合在source端實現離線 + 實時數據進行數據讀取、補齊。
(1)離線按key去重,每個key只保留一條,減少消息量下發。
(2)離線和實時數據合并,使用last_value取相同主鍵最新事件時間戳的一條數據。
(3)使用union all + group by方式是可作為代替join的一個選擇。
(4)實時數據取當日數據,離線數據取歷史數據,防止數據漂移,實時數據需前置一小時。
Join算子亂序
- 問題分析
由于join算子是對join鍵做hash后走不同的分片處理數據,開啟了2個并發后,再因為header_id字段的值變化,detail表2次數據流走到了2個不同的taskmanage,而不同的線程是無法保證輸出有序性的,所以數據有一定的概率會亂序輸出,導致期望的結果不正確,現象是數據丟失。
- 解決辦法
通過header inner join detail表后,拿到detail_id,這樣再次通過detail_id join就不會出現(join鍵)的值會從null變成非null的情況發生了,也就不會亂序了。
2.3.3 Hologres or starrocks
這里也聊聊大家比較關注的hologres和starrocks,starrocks從開源開始也和我們保持了密切聯系,也做了多次的深入交流,我們也大致列了兩者之間的一些各自優勢和對于我們看來一些不足的地方。
3、其他做的一些事情
3.1 開發提效工具——flink代碼生成器
參考MyBatis gennerator一些思想,利用模板引擎技術,定制化模板來生成flink sql。可以解決代碼規范,和提升開發效率?;究梢酝ㄟ^代碼配置來生成flink sql。
3.2 開發提效工具——可視化平臺
直接通過配置的方式,在線寫sql,直接生成頁面和接口,一鍵發布,同時引入緩存,鎖排隊機制解決高峰訪問性能問題。
動態配置接口,一鍵生成rpc服務:
動態配置報表:
4、未來規劃
當前架構依然存在某種程度的不可能三角,我們需要探索更多的架構可能性:
(1)利用寫在holo,計算在mc避免holo這種內存數據庫,在極端查詢內存被打爆的問題,利用mc的計算能力可以搞定一些事實表join的問題提升一些靈活度。
(2) 借助apache hudi推進湖倉一體,hudi做批流存儲統一,flink做批流計算統一,一套代碼,提供5-10分鐘級的準實時架構,緩解部分場景只需要準時降低實時計算成本。