Flink CDC 在大健云倉的實踐
摘要:本文整理自大健云倉基礎(chǔ)架構(gòu)負(fù)責(zé)人、Flink CDC Maintainer 龔中強在 5 月 21 日 Flink CDC Meetup 的演講。主要內(nèi)容包括:
- 引入 Flink CDC 的背景
- 現(xiàn)今內(nèi)部落地的業(yè)務(wù)場景
- 未來內(nèi)部推廣及平臺化建設(shè)
- 社區(qū)合作
一、引入 Flink CDC 的背景
公司引入 CDC 技術(shù),主要基于以下四個角色的需求:
- 物流科學(xué)家:需要庫存、銷售訂單、物流賬單等數(shù)據(jù)用于做分析。
- 開發(fā):需要同步其他業(yè)務(wù)系統(tǒng)的基本信息。
- 財務(wù):希望財務(wù)數(shù)據(jù)能夠?qū)崟r傳送到財務(wù)系統(tǒng),而不是月結(jié)前才能看到。
- 老板:需要數(shù)據(jù)大屏,通過大屏查看公司的業(yè)務(wù)和運營情況。
CDC 是數(shù)據(jù)捕獲變更的技術(shù)。廣義上來說,但凡能夠捕獲數(shù)據(jù)變更的技術(shù),都能被稱為 CDC。但通常我們說的 CDC 技術(shù)主要面向數(shù)據(jù)庫的變更。
CDC 的實現(xiàn)方式主要有兩種,分別是基于查詢和基于日志:
- 基于查詢:查詢后插入、更新到數(shù)據(jù)庫即可,無須數(shù)據(jù)庫的特殊配置以及賬號權(quán)限。它的實時性基于查詢頻率決定,只能通過提高查詢頻率來保證實時性,而這必然會對 DB 造成巨大壓力。此外,因為是基于查詢,所以它無法捕獲兩次查詢之間數(shù)據(jù)的變更記錄,也就無法保證數(shù)據(jù)的一致性。
- 基于日志:通過實時消費數(shù)據(jù)的變更日志實現(xiàn),因此實時性很高。而且不會對 DB 造成很大的影響,也能夠保證數(shù)據(jù)的一致性,因為數(shù)據(jù)庫會將所有數(shù)據(jù)的變動記錄在變更日志中。通過對日志的消費,即可明確知道數(shù)據(jù)的變化過程。它的缺點是實現(xiàn)相對復(fù)雜,因為不同數(shù)據(jù)庫的變動日志實現(xiàn)不一樣,格式、開啟方式以及特殊權(quán)限都不一樣,需要針對每一種數(shù)據(jù)庫做相應(yīng)的適配開發(fā)。
正如 Flink 的宣言 “實時即未來”,在如今的大背景下,實時性是亟待解決的重要問題。因此,我們將主流 CDC 基于日志的技術(shù)做了對比,如上圖所示:
- 數(shù)據(jù)源:Flink CDC 除了對傳統(tǒng)的關(guān)系型數(shù)據(jù)庫做到了很好的支持外,對文檔型、NewSQL(TiDB、OceanBase) 等當(dāng)下流行的數(shù)據(jù)庫都能夠支持;Debezium 對數(shù)據(jù)庫的支持相對沒有那么廣泛,但是對主流的關(guān)系型數(shù)據(jù)庫都做到了很好的支撐;Canal 和 OGG 只支持單一的數(shù)據(jù)源。
- 斷點續(xù)傳:四種技術(shù)都能夠支持。
- 同步模式:除了 Canal 只支持增量,其他技術(shù)均支持全量 + 增量的方式。而全量 + 增量的方式意味著第一次上線時全量到增量的切換過程全部可以通過 CDC 技術(shù)實現(xiàn),無須人為地通過全量的任務(wù)加上增量的 job 去實現(xiàn)全量 + 增量數(shù)據(jù)的讀取。
- 活躍度:Flink CDC 擁有非常活躍的社區(qū),資料豐富,官方也提供了詳盡的教程以及快速上手教程;Debezium 社區(qū)也相當(dāng)活躍,但資料大多是英文的;Canal 的用戶基數(shù)特別大,資料也相對較多,但社區(qū)活躍度一般;OGG 是 Oracle 的大數(shù)據(jù)套件,需要付費,只有官方資料。
- 開發(fā)難度:Flink CDC 依靠 Flink SQL 和 Flink DataStream 兩種開發(fā)模式,尤其是 Flink SQL,通過非常簡單的 SQL 即可完成數(shù)據(jù)同步任務(wù)的開發(fā),開發(fā)上手尤為簡單;Debezium 需要自己解析采集到的數(shù)據(jù)變更日志進(jìn)行單獨處理,Canal 亦是如此。
- 運行環(huán)境依賴:Flink CDC 是以 Flink 作為引擎,Debezium通常是將 Kafka connector 作為運行容器;而 Canal 和 OGG 都是單獨運行。
- 下游豐富程度:Flink CDC 依靠 Flink 非常活躍的周邊以及豐富的生態(tài),能夠打通豐富的下游,對普通的關(guān)系型數(shù)據(jù)庫以及大數(shù)據(jù)存儲引擎 Iceberg、ClickHouse、Hudi 等都做了很好的支持;Debezium 有 Kafka JDBC connector, 支持 MySQL 、Oracle 、SqlServer;Canal 只能直接消費數(shù)據(jù)或?qū)⑵漭敵龅?MQ 中進(jìn)行下游的消費;OGG 因為是官方套件,下游豐富程度不佳。
二、現(xiàn)今內(nèi)部落地的業(yè)務(wù)場景
- 2018 年之前,大健云倉數(shù)據(jù)同步的方式為:通過多數(shù)據(jù)應(yīng)用定時同步系統(tǒng)之間的數(shù)據(jù)。
- 2020 年之后,隨著跨境業(yè)務(wù)的飛速發(fā)展,多數(shù)據(jù)源應(yīng)用經(jīng)常打滿 DB 影響在線應(yīng)用,同時定時任務(wù)的執(zhí)行順序管理混亂。
- 因此, 2021 年我們開始調(diào)研選型 CDC 技術(shù),搭建了小型試驗場景,進(jìn)行小規(guī)模的試驗。
- 2022 年,上線了基于 Flink CDC 實現(xiàn)的 LDSS 系統(tǒng)庫存場景同步功能。
- 未來,我們希望依托 Flink CDC 打造數(shù)據(jù)同步平臺,通過界面的開發(fā)和配置完成同步任務(wù)的開發(fā)、測試和上線,能夠全程在線管理同步任務(wù)的整個生命周期。
LDSS 庫存管理的業(yè)務(wù)場景主要有以下四種:
- 倉儲部門:要求倉庫的庫存容量和商品品類分布合理,庫存容量方面,需要留一些 buffer 以防突如其來的入庫單導(dǎo)致爆倉;商品品類方面,季節(jié)性的商品庫存分配不合理導(dǎo)致熱點問題,這必將給倉庫的管理帶來巨大挑戰(zhàn)。
- 平臺客戶:希望訂單處理及時,貨物能夠快速、精準(zhǔn)地交到客戶手上。
- 物流部門:希望能夠提升物流效率,降低物流成本,高效利用有限的運力。
- 決策部門:希望 LDSS 系統(tǒng)能夠?qū)υ诤螘r何地新建倉庫提供科學(xué)的建議。
上圖為 LDSS 庫存管理分單場景架構(gòu)圖。
首先,通過多數(shù)據(jù)源同步的應(yīng)用向下拉取倉儲系統(tǒng)、平臺系統(tǒng)以及內(nèi)部 ERP 系統(tǒng)數(shù)據(jù),將所需數(shù)據(jù)抽取到 LDSS 系統(tǒng)的數(shù)據(jù)庫中,以支撐 LDSS 系統(tǒng)訂單、庫存、物流三大模塊的業(yè)務(wù)功能。
其次,需要產(chǎn)品信息、訂單信息以及倉庫信息才能進(jìn)行有效的分單決策。多數(shù)據(jù)源定時同步任務(wù)基于 JDBC 查詢,通過時間做篩選,同步變更的數(shù)據(jù)到 LDSS 系統(tǒng)中。LDSS 系統(tǒng)基于這些數(shù)據(jù)做分單決策,以獲得最優(yōu)解。
定時任務(wù)同步的代碼,首先需要定義定時任務(wù)、定義定時任務(wù)的類、執(zhí)行方法以及執(zhí)行間隔。
上圖左側(cè)為定時任務(wù)的定義,右側(cè)是定時任務(wù)的邏輯開發(fā)。首先,打開 Oracle 數(shù)據(jù)庫進(jìn)行查詢,然后 upsert 到 MySQL 數(shù)據(jù)庫,即完成了定時任務(wù)的開發(fā)。此處以接近原生 JDBC 的查詢方式,將數(shù)據(jù)依次塞到對應(yīng)的數(shù)據(jù)庫表中,開發(fā)邏輯十分繁瑣,也容易出現(xiàn) bug。
因此,我們基于 Flink CDC 對其進(jìn)行了改造。
上圖為基于 Flink CDC 實現(xiàn)的實時同步場景,唯一的變化是將此前的多數(shù)據(jù)源同步應(yīng)用程序換成了 Flink CDC 。
首先,通過 SqlServer CDC、MySQL CDC、Oracle CDC 分別連接抽取對應(yīng)倉儲平臺、 ERP 系統(tǒng)數(shù)據(jù)庫的表數(shù)據(jù),然后通過 Flink 提供的 JDBC connector 寫入到 LDSS 系統(tǒng)的 MySQL 數(shù)據(jù)庫中。能夠通過 SqlServer CDC、MySQL CDC、Oracle CDC 將異構(gòu)數(shù)據(jù)源轉(zhuǎn)化為統(tǒng)一的 Flink 內(nèi)部類型,再往下游寫。
此架構(gòu)相比于之前的架構(gòu),對業(yè)務(wù)系統(tǒng)沒有侵入性,而且實現(xiàn)較為簡單。
我們引入了 MySQL CDC 和 SqlServer CDC 分別連接 B2B 平臺的 MySQL 數(shù)據(jù)庫以及倉儲系統(tǒng)的 SqlServer 數(shù)據(jù)庫,然后將抽取到的數(shù)據(jù)通過 JDBC Connector 寫入到 LDSS 系統(tǒng)的 MySQL 數(shù)據(jù)庫。
通過以上改造,得益于 Flink CDC 賦予其實時的能力,不需要管理繁雜的定時任務(wù)。
基于 Flink CDC 同步代碼的實現(xiàn)分為以下三步:
- 第一步,定義源表 —— 需要同步的表;
- 第二步,定義目標(biāo)表 —— 需要寫入數(shù)據(jù)的目標(biāo)表;
- 第三步,通過 insert select 語句,即可完成 CDC 同步任務(wù)的開發(fā)。
上述開發(fā)模式非常簡單,邏輯清晰。此外,依托 Flink CDC 的同步任務(wù)和 Flink 架構(gòu),還獲得了失敗重試、分布式、高可用、全量增量一致性切換等特性。
三、未來內(nèi)部推廣及平臺化建設(shè)
上圖為平臺架構(gòu)圖。
左側(cè) source 是由 Flink CDC + Flink 提供的源端,能夠通過豐富的源端抽取數(shù)據(jù),通過數(shù)據(jù)平臺上的開發(fā)寫入到目標(biāo)端。目標(biāo)端又依托于 Flink 的強大生態(tài),能夠很好地支撐數(shù)據(jù)湖、關(guān)系型數(shù)據(jù)庫、MQ 等。
Flink 目前有兩種運行方式,一種是國內(nèi)比較流行的 Flink on Yarn,另一種是 Flink on Kubernets。中間部分的數(shù)據(jù)平臺向下管理 Flink 集群,以向上支撐 SQL 在線開發(fā)、任務(wù)開發(fā)、血緣管理、任務(wù)提交、在線 Notebook 開發(fā)、權(quán)限和配置以及對任務(wù)性能的監(jiān)控和告警,同時也能夠?qū)?shù)據(jù)源做到很好的管理。
數(shù)據(jù)同步的需求在公司內(nèi)部特別旺盛,需要通過平臺來提高開發(fā)效率,加快交付速度。而且平臺化之后,可以統(tǒng)一公司內(nèi)部的數(shù)據(jù)同步技術(shù),收攏同步技術(shù)棧,減少維護(hù)成本。
平臺化的目標(biāo)如下:
- 能夠很好地管理數(shù)據(jù)源、表等元信息;
- 任務(wù)的整個生命周期都可以在平臺上完成;
- 實現(xiàn)任務(wù)的性能觀測以及告警;
- 簡化開發(fā),快速上手,業(yè)務(wù)開發(fā)人員經(jīng)過簡單培訓(xùn)即可上手開發(fā)同步任務(wù)。
平臺化能帶來以下三個方面的收益:
- 收攏數(shù)據(jù)同步任務(wù),統(tǒng)一來管理;
- 平臺管理維護(hù)同步任務(wù)的全生命周期;
- 專門的團(tuán)隊負(fù)責(zé),團(tuán)隊能夠?qū)W⑶把氐臄?shù)據(jù)集成技術(shù)。
有了平臺之后,即可快速落地應(yīng)用更多的業(yè)務(wù)場景。
- 實時數(shù)倉:希望通過 Flink CDC 以支持更多實時數(shù)倉的業(yè)務(wù)場景,借助 Flink 強大的計算能力做一些數(shù)據(jù)庫的物化視圖。將計算從 DB 里解脫出來,通過 Flink 的外部計算再重新寫回數(shù)據(jù)庫,以加速平臺應(yīng)用的報表、統(tǒng)計、分析等實時應(yīng)用場景。
- 實時應(yīng)用:Flink CDC 能夠從 DB 層捕獲變更,因此可以通過 Flink CDC 實時更新搜索引擎中的內(nèi)容,實時向財務(wù)系統(tǒng)推送財務(wù)和核算數(shù)據(jù)。因為大部分財務(wù)系統(tǒng)的數(shù)據(jù)都需要業(yè)務(wù)系統(tǒng)通過跑定時任務(wù)以及經(jīng)過大量關(guān)聯(lián)、聚合、分組等操作才能計算出來,再推送到財務(wù)系統(tǒng)中。而借助 Flink CDC 強大的數(shù)據(jù)捕獲能力,再加上 Flink 的計算能力,將這些數(shù)據(jù)實時地推送到核算系統(tǒng)和財務(wù)系統(tǒng),就能夠及時發(fā)現(xiàn)業(yè)務(wù)的問題,減少公司的損失。
- 緩存:通過 Flink CDC,能夠構(gòu)建一個脫離于傳統(tǒng)的應(yīng)用之外的實時緩存,對于在線應(yīng)用的性能有極大的提升。
有了平臺的助力,相信 Flink CDC 能夠在公司內(nèi)部更好地釋放它的能力。
上圖展示了 SqlServer CDC 的原理。
社區(qū)同學(xué)使用了當(dāng)前版本的 SqlServer CDC 后,主要反饋的問題有以下三個:
- 快照過程中鎖表:鎖表操作對于 DBA 和在線應(yīng)用都是不可忍受的, DBA 無法接受數(shù)據(jù)庫被夯住,同時也會影響在線應(yīng)用。
- 快照過程中不能 checkpoint:不能 checkpoint 就意味著快照過程中一旦失敗,只能重新開始跑快照過程,這對于大表非常不友好。
- 快照過程只支持單并發(fā):千萬級、上億級的大表,在單并發(fā)的情況下需要同步十幾甚至幾十個小時,極大束縛了 SqlServer CDC 的應(yīng)用場景。
我們針對上述問題做了實踐和改進(jìn),參考社區(qū) 2.0 版本 MySQL CDC 并發(fā)無鎖算法的思想,對 SqlServer CDC 進(jìn)行了優(yōu)化,最終實現(xiàn)了快照過程中無鎖,實現(xiàn)一致性快照;快照過程中支持 checkpoint ;快照過程中支持并發(fā),加速快照過程。在大表同步的情況下,并發(fā)優(yōu)勢尤為明顯。
但是由于 2.2 版本社區(qū)將 MySQL 的并發(fā)無鎖思想抽象成了統(tǒng)一公共的框架,SqlServer CDC 需要重新適配這套通用框架后才能貢獻(xiàn)給社區(qū)。
提問&解答
Q1需要開啟 SqlServer 自己的 CDC 嗎?
是的,SqlServer CDC 的功能就是基于 SqlServer 數(shù)據(jù)庫自己的 CDC 特性實現(xiàn)的。
Q2物化視圖通過什么方式去刷新定時任務(wù)觸發(fā)器?
通過 Flink CDC 將需要生成物化視圖的 SQL 放在 Flink 里運行,通過原表的變動觸發(fā)計算,然后同步到物化視圖表里。
Q3平臺化是怎么做的?
平臺化參考了社區(qū)眾多的開源項目以及優(yōu)秀的開源平臺,比如 StreamX、DLink 等優(yōu)秀的開源項目。
Q4SqlServer CDC 在消費 transaction log 時有瓶頸嗎?
SqlServer 并沒有直接消費 log,其原理是 SqlServer capture process 去匹配 log 內(nèi)哪些表開啟了 CDC ,然后將這些表從日志里撈到開啟 CDC 表的變更數(shù)據(jù),再轉(zhuǎn)插到 change table 里,最后通過開啟 CDC 之后數(shù)據(jù)庫生成的 CDC query function 獲取到數(shù)據(jù)的變更。
Q5Flink CDC 高可用如何保障同步任務(wù)過多或密集處理方案?
Flink 的高可用依賴于 Flink 特性比如 checkpoint 等來保證。同步任務(wù)過多或處理方案密集的情況,建議使用多套 Flink 下游集群,然后根據(jù)同步的實時性區(qū)分對待,將任務(wù)發(fā)布到相應(yīng)的集群中。
Q6中間需要 Kafka 嗎?
取決于同步任務(wù)或數(shù)倉架構(gòu)是否需要將中間數(shù)據(jù)做 Kafka 落地。
Q7一個數(shù)據(jù)庫中有多張表,可以放到一個任務(wù)里運行嗎?
取決于開發(fā)方式。如果是 SQL 的開發(fā)方式,要實現(xiàn)一次性寫多表只能通過多個任務(wù)。但 Flink CDC 提供了另外一種比較高階的開發(fā)方式 DataStream ,可以將多表放到一個任務(wù)里運行。
Q8Flink CDC 支持讀取 Oracle 從庫的日志嗎?
目前還無法實現(xiàn)。
Q9通過 CDC 同步后兩個端的數(shù)據(jù)質(zhì)量如何監(jiān)控,如何比對?
目前只能通過定時抽樣來做數(shù)據(jù)質(zhì)量的檢查,數(shù)據(jù)質(zhì)量問題一直是業(yè)內(nèi)比較棘手的問題。
Q10大健云倉用的什么調(diào)度系統(tǒng)?系統(tǒng)如何與 Flink CDC 集合?
使用 XXL Job 作為分布式的任務(wù)調(diào)度,CDC 沒有用到定時任務(wù)。
Q11如果采集增刪表,SqlServer CDC 需要重啟嗎?
SqlServer CDC 目前不支持動態(tài)加表的功能。
Q12同步任務(wù)會影響系統(tǒng)性能嗎?
基于 CDC 做同步任務(wù)肯定會影響系統(tǒng)性能,尤其是快照過程對數(shù)據(jù)庫會有影響,進(jìn)而影響應(yīng)用系統(tǒng)。社區(qū)將來會做限流、對所有 connector 做并發(fā)無鎖的實現(xiàn),都是為了擴(kuò)大 CDC 的應(yīng)用場景以及易用性。
Q13全量和增量的 savepoint 怎么處理?
(未通過并發(fā)無鎖框架實現(xiàn)的連接器)全量過程中不可以觸發(fā) savepoint,增量過程中如果需要停機發(fā)布,可通過 savepoint 恢復(fù)任務(wù)。
Q14CDC 同步數(shù)據(jù)到 Kafka ,而 Kafka 里面存的是 Binlog ,如何保存歷史數(shù)據(jù)和實時數(shù)據(jù)?
將 CDC 同步的數(shù)據(jù)全部 Sync 到 Kafka,保留的數(shù)據(jù)取決于 Kafka log 的清理策略,可以全部保留。
Q15CDC 會對 Binlog 的日志操作類型進(jìn)行過濾嗎?會影響效率嗎?
即使有過濾操作,對性能影響也不大。
Q16CDC 讀 MySQL 初始化快照階段,多個程序讀不同的表會有程序報錯無法獲取鎖表的權(quán)限,這是什么原因?
建議先查看 MySQL CDC 是不是使用老的方式實現(xiàn),可以嘗試新版本的并發(fā)無鎖實現(xiàn)。
Q17MySQL 上億大表全量和增量如何銜接?
建議閱讀雪盡老師在 2.0 的相關(guān)博客,非常簡單清晰地介紹了并發(fā)無鎖如何實現(xiàn)一致性快照,完成全量和增量的切換。