Flink CDC MongoDB Connector 的實現原理和使用實踐
摘要:本文整理自 XTransfer 資深 Java 開發工程師、Flink CDC Maintainer 孫家寶在 Flink CDC Meetup 的演講。主要內容包括:
- MongoDB Change Stream 技術簡介
- MongoDB CDC Connector 業務實踐
- MongoDB CDC Connector 生產調優
- MongoDB CDC Connector 并行化 Snapshot 改進
- 后續規劃
01MongoDB Change Stream 技術簡介
MongoDB 是一種面向文檔的非關系型數據庫,支持半結構化數據存儲;也是一種分布式的數據庫,提供副本集和分片集兩種集群部署模式,具有高可用和水平擴展的能力,比較適合大規模的數據存儲。另外, MongoDB 4.0 版本還提供了多文檔事務的支持,對于一些比較復雜的業務場景更加友好。
MongoDB 使用了弱結構化的存儲模式,支持靈活的數據結構和豐富的數據類型,適合 Json 文檔、標簽、快照、地理位置、內容存儲等業務場景。它天然的分布式架構提供了開箱即用的分片機制和自動 rebalance 能力,適合大規模數據存儲。另外, MongoDB 還提供了分布式網格文件存儲的功能,即 GridFS,適合圖片、音頻、視頻等大文件存儲。
MongoDB 提供了副本集和分片集兩種集群模部署模式。
副本集:高可用的部署模式,次要節點通過拷貝主要節點的操作日志來進行數據的復制。當主要節點發生故障時,次要節點和仲裁節點會重新發起投票來選出新的主要節點,實現故障轉移。另外,次要節點還能分擔查詢請求,減輕主要節點的查詢壓力。
分片集:水平擴展的部署模式,將數據均勻分散在不同 Shard 上,每個 Shard 可以部署為一個副本集,Shard 中主要節點承載讀寫請求,次要節點會復制主要節點的操作日志,能夠根據指定的分片索引和分片策略將數據切分成多個 16MB 的數據塊,并將這些數據塊交給不同 Shard 進行存儲。Config Servers 中會記錄 Shard 和數據塊的對應關系。
MongoDB 的 Oplog 與 MySQL 的 Binlog 類似,記錄了數據在 MongoDB 中所有的操作日志。Oplog 是一個有容量的集合,如果超出預設的容量范圍,則會丟棄先前的信息。
與 MySQL 的 Binlog 不同, Oplog 并不會記錄變更前/后的完整信息。遍歷 Oplog 的確可以捕獲 MongoDB 的數據變更,但是想要轉換成 Flink 支持的 Changelog 依然存在一些限制。
首先,訂閱 Oplog 難度較大。每個副本集會維護自己的 Oplog, 對于分片集群來說,每個 Shard 可能是一個獨立的副本集,需要遍歷每個 Shard 的 Oplog 并按照操作時間進行排序。另外, Oplog 沒有包含變更文檔前和變更后的完整狀態,因此既不能轉換成 Flink 標準的 Changelog ,也不能轉換成 Upsert 類型的 Changelog 。這亦是我們在實現 MongoDB CDC Connector 的時候沒有采用直接訂閱 Oplog 方案的主要原因。
最終我們選擇使用 MongoDB Change Streams 方案來實現 MongoDB CDC Connector。
Change Streams 是 MongoDB 3.6 版本提供的新特性,它提供了更簡單的變更數據捕獲接口,屏蔽了直接遍歷 Oplog 的復雜度。Change Streams 還提供了變更后文檔完整狀態的提取功能,可以輕松轉換成 Flink Upsert 類型的 Changelog。它還提供了比較完整的故障恢復能力,每一條變更記錄數據都會包含一個 resume token 來記錄當前變更流的位置。故障發生后,可以通過 resume token 從當前消費點進行恢復。
另外, Change Streams 支持變更事件的篩選和定制化的功能。比如可以將數據庫和集合名稱的正則過濾器下推到 MongoDB 來完成,可以明顯減少網絡開銷。它還提供了對集合庫以及整個集群級別的變更訂閱,能夠支持相應的權限控制。
使用 MongoDB Change Streams 特性實現的 CDC Connector 如上圖所示。首先通過 Change Streams 訂閱 MongoDB 的變更。比如有 insert、update、delete、replace 四種變更類型,先將其轉換成 Flink 支持的 upsert Changelog,便可以在其之上定義成一張動態表,使用 Flink SQL 進行處理。
目前 MongoDB CDC Connector 支持 Exactly-Once 語義,支持全量加增量的訂閱,支持從檢查點、保存點恢復,支持 Snapshot 數據的過濾,支持數據庫的 Database、Collection 等元數據的提取,也支持庫集合的正則篩選功能。
02MongoDB CDC Connector 業務實踐
XTransfer 成立于 2017 年,聚焦于 B2B 跨境支付業務,為從事跨境電商出口的中小微企業提供外貿收款以及風控服務。跨境 B 類業務結算場景涉及的業務鏈路很長,從詢盤到最終的成交,過程中涉及物流條款、支付條款等,需要在每個環節上做好風險管控,以符合跨境資金交易的監管要求。
以上種種因素對 XTransfer 的數據處理安全性和準確性都提出了更高的要求。在此基礎上,XTransfer 基于 Flink 搭建了自己的大數據平臺,能夠有效保障在跨境 B2B 全鏈路上的數據能夠被有效地采集、加工和計算,并滿足了高安全、低延遲、高精度的需求。
變更數據采集 CDC 是數據集成的關鍵環節。在沒有使用 Flink CDC 之前,一般使用 Debezium、Canal 等傳統 CDC 工具來抽取數據庫的變更日志,并將其轉發到 Kafka 中,下游讀取 Kafka 中的變更日志進行消費。這種架構存在以下痛點:
- 部署組件多,運維成本較高;
- 下游數據消費邏輯需要根據寫入端進行適配,存在一定的開發成本;
- 數據訂閱配置較復雜,無法像 Flink CDC 一樣僅通過 SQL 語句便定義出一個完整的數據同步邏輯;
- 難以全部滿足全量 + 增量采集,可能需要引入 DataX 等全量采集組件;
- 比較偏向于對變更數據的采集,對數據的處理過濾能力較為薄弱;
- 難以滿足異構數據源打寬的場景。
目前我們的大數據平臺主要使用 Flink CDC 來進行變更數據捕獲,它具有如下優勢:
1. 實時數據集成
- 無須額外部署 Debezium、Canal、Datax 等組件,運維成本大幅降低;
- 支持豐富的數據源,也可復用 Flink 既有的 connectors 進行數據采集寫入,可以覆蓋大多數業務場景;
- 降低了開發難度,僅通過 Flink SQL 就可以定義出完整的數據集成工作流程;
- 數據處理能力較強,依托于 Flink 平臺強大的計算能力可以實現流式 ETL 甚至異構數據源的 join、group by 等。
2. 構建實時數倉
- 大幅簡化實時數倉的部署難度,通過 Flink CDC 實時采集數據庫的變更,并寫入 Kafka、Iceberg、Hudi、TiDB 等數據庫中,即可使用 Flink 進行深度的數據挖掘和數據處理。
- Flink 的計算引擎可以支持流批一體的計算模式,不用再維護多套計算引擎,可以大幅降低數據的開發成本。
3. 實時風控
- 實時風控以往一般采取往 Kafka 中發業務事件的方式實現,而使用 Flink CDC 之后,可以直接從業務庫中捕獲風控事件,然后通過 Flink CDC 來進行復雜的事件處理。
- 可以運行模型,以通過 Flink ML、Alink 來豐富機器學習的能力。最后將這些實時風控的處置結果回落進 Kafka,下達風控指令。
03MongoDB CDC Connector 生產調優
MongoDB CDC Connector 的使用有如下幾點要求:
- 鑒于使用了 Change Streams 的特性來實現 MongoDB CDC Connector, 因此要求 MongoDB 的最小可用版本是 3.6,比較推薦 4.0.8 及以上版本。
- 必須使用集群部署模式。由于訂閱 MongoDB 的 Change Streams 要求節點之間能夠進行相互復制數據,單機 MongoDB 無法進行數據的互相拷貝,也沒有 Oplog,只有副本集或分片集的情況下才有數據復制機制。
- 需要使用 WireTiger 存儲引擎,使用 pv1 復制協議。
- 需要擁有 ChangeStream 和 find 用戶權限。
使用 MongoDB CDC Connector 時要注意設置 Oplog 的容量和過期時間。MongoDB oplog 是一個特殊的有容量集合,容量達到最大值后,會丟棄歷史數據。而 Change Streams 通過 resume token 來進行恢復,太小的 oplog 容量可能會導致 resume token 對應的 oplog 記錄不再存在,即 resume token 過期,進而導致 Change Streams 無法被恢復。
可以使用 replSetResizeOplog 設置 oplog 容量和最短保留時間,MongoDB 4.4 版本之后也支持設置最小時間。一般而言,生產環境中建議 oplog 保留不小于 7 天。
對一些變更較慢的表,建議在配置中開啟心跳事件。變更事件和心跳事件可以同時向前推進 resume token,對于變更較慢的表,可以通過心跳事件來刷新 resume token 避免其過期。
可以通過 heartbeat.interval.ms 設置心跳的間隔。
由于只能將 MongoDB 的 Change Streams 轉換成 Flink 的 Upsert changelog,它類似于 Upsert Kafka 形式,為了補齊 –U 前置鏡像值,會增加一個算子 ChangelogNormalize,而這會帶來額外的狀態開銷。因此在生產環境中比較推薦使用 RocksDB State Backend。
當默認連接的參數無法滿足使用需求時,可以通過設置 connection.options 配置項來傳遞 MongoDB 支持的連接參數。
比如連接 MongoDB 的用戶創建的數據庫不在 admin 中,可以設置參數來指定需要使用哪個數據庫來認證當前用戶,也可以設置連接池的最大連接參數等,MongoDB 的連接字符串默認支持這些參數。
正則匹配多庫、多表是 MongoDB CDC Connector 在 2.0 版本之后提供的新功能。需要注意,如果數據庫名稱使用了正則參數,則需要擁有 readAnyDatabase 角色。因為 MongoDB 的 Change Streams 只能在整個集群、數據庫以及 collection 粒度上開啟。如果需要對整個數據庫進行過濾,那么數據庫進行正則匹配時只能在整個集群上開啟 Change Streams ,然后通過 Pipeline 過濾數據庫的變更。可以通過在 Ddatabase 和 Collection 兩個參數中寫入正則表達式進行多庫、多表的訂閱。
04MongoDB CDC Connector并行化 Snapshot 改進
為了加速 Snapshot 的速度,可以使用 Flip-27 引入的 source 來進行并行化改造。首先使用一個 split 枚舉器,根據一定的切分策略,將一個完整的 Snapshot 任務拆分成若干個子任務,然后分配給多個 split reader 并行做 Snapshot ,以此提升整體任務的運行速度。
但是在 MongoDB 里,大多情況下組件是 ObjectID,其中前面四個字節是 UNIX 描述,中間五個字節是一個隨機值,后面三個字節是一個自增量。在相同描述里插入的文檔并不是嚴格遞增的,中間的隨機值可能會影響局部的嚴格遞增,但從總體來看,依然能夠滿足遞增趨勢。
因此,不同于 MySQL 的遞增組件,MongoDB 并不適合采用 offset + limit 的切分策略對其集合進行簡單拆分,需要針對 ObjectID 采用針對性的切分策略。
最終,我們采取了以下三種 MongoDB 切分策略:
- Sample 采樣分桶:原理是利用 $sample 命令對 collection 進行隨機采樣,通過平均文檔大小和每個 chunk 的大小來預估需要的分桶數。要求相應集合的查詢權限,其優點是速度較快,適用于數據量大但是沒有分片的集合;缺點是由于使用了抽樣預估模式,分桶的結果不能做到絕對均勻。
- SplitVector 索引切分:SplitVector 是 MongoDB 計算 chunk 分裂點的內部命令,通過訪問指定的索引計算出每個 chunk 的邊界。要求擁有 SplitVector 權限,其優點是速度快,chunk 結果均勻;缺點是對于數據量大且已經分片的集合,不如直接讀取 config 庫中已經分好的 chunks 元數據。
- Chunks 元數據讀取:因為 MongoDB 在 config 數據庫會存儲分片集合的實際分片結果,因此可以直接從 config 中讀取分片集合的實際分片結果。要求擁有 config 庫讀取權限,僅限于分片集合使用。其優點是速度快,無須重新計算 chunk 分裂點,chunk 結果均勻,默認情況下為 64MB;缺點是不能滿足所有場景,僅限分片場景。
上圖為 sample 采樣分桶示例。左側是一個完整的集合,從完整的集合中設定樣本數量,然后將整個樣本縮小,并根據采樣以后的樣本進行分桶,最終結果就是我們希望的 chunks 邊界。
sample 命令是 MongoDB 采樣的一個內置命令。在樣本值小于 5% 的情況下,使用偽隨機算法進行采樣;樣本值大于 5% 的情況下,先使用隨機排序,然后選擇前 N 個文檔。它的均勻度和耗時主要取決于隨機算法和樣本的數量,是一種均勻程度和切分速度的折中策略,適合于要求切分速度快,但可以容忍切分結果不太均勻的場景。
在實際測試中,sample 采樣的均勻程度有著不錯的表現。
上圖為 SplitVector 索引切分示例。左側是原始集合,通過 SplitVector 命令指定需要訪問的索引,為 ID 索引。可以設置每個 chunk 的大小,單位為 MB,然后使用 SplitVector 命令訪問索引,并通過索引計算每個塊的邊界。
它速度快,chunk 結果也很均勻,適用于大部分場景。
上圖為 config.chuncks 讀取示例,即直接讀取 MongoDB 已經分好的 chunks 元數據。在 Config Server 中會存儲每個 Shard、其所在機器以及每個 Shard 的邊界。對于分片集合,可以直接在 chunks 中讀取它的邊界信息,無須重復計算這些分裂點,也可以保證每一個 chunk 的讀取在單臺機器上就能完成,速度極快,在大規模的分片集合場景下有著很好的表現。
05后續規劃
Flink CDC 的后續規劃主要分為以下五個方面:
- 第一,協助完善 Flink CDC 增量 Snapshot 框架;
- 第二,使用 MongoDB CDC 對接 Flink CDC 增量 Snapshot 框架,使其能夠支持并行 Snapshot 改進;
- 第三,MongoDB CDC 支持 Flink RawType。對于一些比較靈活的存儲結構提供 RawType 轉換,用戶可以通過 UDF 的形式對其進行自定義解析;
- 第四,MongoDB CDC 支持從指定位置進行變更數據的采集;
- 第五,MongoDB CDC 穩定性的優化。
提問&解答
Q1?:MongoDB CDC 延遲高嗎?是否需要通過犧牲性能來降低延遲?
MongoDB CDC 延遲不高,在全量采集的時候經過 changelog normalize 可能會對于 CDC 的增量采集造成一些背壓,但是這種情況可以通過 MongoDB 并行化改造、增加資源的方式來避免。
Q2?:默認連接什么時候無法滿足要求?
MongoDB 的用戶可以在任何數據庫、任何子庫中進行創建。如果不是在 admin 的數據庫中創建用戶,認證的時候需要顯示地指定要在哪個數據庫中認證用戶,也可以設置最大的連接大小等參數。
Q3?:MongoDB 目前的 DBlog 支持無鎖并發讀取嗎?
DBlog 的無鎖并發擁有增量快照的能力,但是因為 MongoDB 難以獲取當前 changelog 的位點,所以增量快照無法立刻實現,但無鎖并發的 Snapshot 即將支持。