揭露數(shù)據(jù)不一致的利器 —— 實(shí)時核對系統(tǒng)
隨著企業(yè)業(yè)務(wù)發(fā)展,以及微服務(wù)化大趨勢下單體服務(wù)的拆分,服務(wù)間的通信交互越來越多。與單體服務(wù)不同,微服務(wù)間的數(shù)據(jù)往往需要通過額外的手段來保障一致性,例如事務(wù)消息、異步任務(wù)補(bǔ)償?shù)?。除了從機(jī)制上最大程度保障以外,如何觀測并及時發(fā)現(xiàn)數(shù)據(jù)不一致也非常重要。
本文介紹 Shopee Financial Products 團(tuán)隊(duì)設(shè)計(jì)和開發(fā)的 實(shí)時核對系統(tǒng)(Real-time Checking System) ,它接入簡單,只需根據(jù)核對需求配置對應(yīng)的核對規(guī)則,實(shí)現(xiàn)了規(guī)則熱加載,并能在不侵入業(yè)務(wù)的前提下對系統(tǒng)數(shù)據(jù)進(jìn)行實(shí)時監(jiān)測對比,及時發(fā)現(xiàn)數(shù)據(jù)的不一致。系統(tǒng)落地至今,已在 Shopee 多個產(chǎn)品線推廣使用,幫助不同團(tuán)隊(duì)快速發(fā)現(xiàn)線上數(shù)據(jù)不一致問題,為數(shù)據(jù)保駕護(hù)航。
1. 背景
1.1 系統(tǒng)數(shù)據(jù)的不一致性
在日常的開發(fā)迭代中我們能發(fā)現(xiàn),系統(tǒng)的數(shù)據(jù)有時并不按照我們設(shè)想的那樣進(jìn)行變更。常見的場景如:用戶進(jìn)行了還款(Repay),系統(tǒng) A 收到了還款請求后調(diào)用系統(tǒng) B,將已凍結(jié)的賬戶進(jìn)行解凍,但因?yàn)槟承┰颍ㄈ缦到y(tǒng)故障、網(wǎng)絡(luò)分區(qū)等),解凍的請求沒有抵達(dá) B,或者解凍成功的響應(yīng)沒有返回給 A,此時會出現(xiàn)已經(jīng)確定收款但未解凍,或未確認(rèn)收款卻已解凍的情況,從而引起用戶投訴或資金損失。
Fig1. Data Inconsistency
造成這類問題的原因通常有:代碼邏輯 Bug、并發(fā)場景處理不當(dāng)、基礎(chǔ)組件(網(wǎng)絡(luò)、數(shù)據(jù)庫、中間件)故障、跨系統(tǒng)間缺乏原生的一致性保障等等。隨著業(yè)務(wù)擴(kuò)展,企業(yè)內(nèi)的應(yīng)用越來越多,且有許多 單體應(yīng)用 (Monolithic Application)向 微服務(wù) (Microservices)拆分轉(zhuǎn)型,分布式的場景下丟失了數(shù)據(jù)庫事務(wù)的支持,需要解決數(shù)據(jù)一致性的問題。
保障數(shù)據(jù)一致的方案有很多種,在單體服務(wù)且缺少不同組件間(例如跨 Database、不同存儲中間件)事務(wù)支持的場景下,可以使用本地事務(wù)表 + 補(bǔ)償任務(wù)的組合,將主表數(shù)據(jù)與檢查任務(wù)通過事務(wù)寫入,再通過異步任務(wù)不斷檢查目標(biāo)數(shù)據(jù)是否一致并進(jìn)行補(bǔ)償,可實(shí)現(xiàn)最終一致性;在跨服務(wù)場景下,Saga 模式通過可靠消息及服務(wù)提供回滾事務(wù)的能力,來實(shí)現(xiàn)分布式事務(wù)。
但是,對于重要的業(yè)務(wù),不管使用何種一致性方案, 提供額外的檢查、核對、兜底手段都是必要的 ,由此衍生出了很多的業(yè)務(wù)核對、對賬的需求。服務(wù)間通過特定手段保障數(shù)據(jù)一致性,并設(shè)計(jì)無侵入的旁路系統(tǒng)進(jìn)行數(shù)據(jù)核對和校驗(yàn),是微服務(wù)架構(gòu)下的典型搭配。
Fig2. Data Consistency Insurance
1.2 離線核對的缺陷
常見的離線數(shù)據(jù)核對可以通過定時任務(wù), 按照一定的篩選條件,從不同數(shù)據(jù)源中獲取特定數(shù)據(jù),再進(jìn)行比較 。這種方案的偽代碼如:
func Check() {
// 獲取上游 update_time 落在 [a, b) 的數(shù)據(jù)行
upstreamRows := QueryUpstreamDB(a, b)
for uniqueKey, sourceData := range upstreamRows {
// 為每個上游數(shù)據(jù)查找對應(yīng)的下游數(shù)據(jù)
targetData := QueryDownstreamDB(uniqueKey)
// 對比上下游數(shù)據(jù)
Compare(sourceData, targetData)
}
}
時效性低是這類查表方案的通病。核對操作通常放在異步任務(wù)中定時執(zhí)行,執(zhí)行時間和離數(shù)據(jù)變更時間有一定延遲,且定時任務(wù)的查詢條件也會對核對目標(biāo)造成影響。當(dāng)出現(xiàn)異常數(shù)據(jù)時,不能及時發(fā)現(xiàn)問題,只能等待下次定時任務(wù)執(zhí)行后才能發(fā)現(xiàn)。
引入了 額外的掃表開銷 同樣是個不容忽視的問題。在數(shù)據(jù)量較大,尤其是存在大量 ??INSERT?
? 操作的場景下,想要核對就需要 ??SELECT?
? 出上下游的目標(biāo)數(shù)據(jù)。為了在不影響正常業(yè)務(wù)的情況下及時處理完核對任務(wù),開發(fā)者可通過將查詢轉(zhuǎn)移到從庫,甚至引入核對任務(wù)獨(dú)占的從庫,但此類查表核對方案在資源使用和實(shí)現(xiàn)復(fù)雜度方面都不夠理想。
同時,由于查表得到的結(jié)果只是當(dāng)前的數(shù)據(jù)版本,在兩次檢查之間,數(shù)據(jù)可能發(fā)生了多次變更, 定時任務(wù)無法感知和觀測到每個狀態(tài)變更 ,在數(shù)據(jù)被頻繁 ??UPDATE?
? 的場景下也存在一定的核對和檢測難度。
因此,要實(shí)現(xiàn)更好的數(shù)據(jù)核對,我們需要考慮以下幾點(diǎn)目標(biāo):
- 實(shí)現(xiàn)秒級核對。
- 盡量減少數(shù)據(jù)庫查詢。
- 核對數(shù)據(jù)變更,而非核對數(shù)據(jù)快照。
- 簡單靈活的接入方式。
2. 實(shí)時數(shù)據(jù)核對
為了更好地發(fā)現(xiàn)數(shù)據(jù)不一致的情況,Shopee Financial Products 團(tuán)隊(duì)在 2021 年中設(shè)計(jì)并實(shí)現(xiàn)了 Real-time Checking System (實(shí)時核對系統(tǒng),RCS)。RCS 具有以下核心優(yōu)勢:
- 秒級數(shù)據(jù)核對。
- 對業(yè)務(wù)邏輯無侵入。
- 可配置化接入。
從上線至今,RCS 幫助團(tuán)隊(duì)及時檢測到了多次數(shù)據(jù)問題,可以將原因歸納為以下幾個方面:
- 代碼邏輯 Bug:包括冪等處理、并發(fā)問題、業(yè)務(wù)邏輯錯誤等。
- 系統(tǒng)運(yùn)行環(huán)境:DB 異常、網(wǎng)絡(luò)抖動、MQ 異常等。
Fig3. Types of spotted bugs
本節(jié)主要介紹 RCS 的實(shí)現(xiàn),包括系統(tǒng)架構(gòu)和核對流程、核對性能優(yōu)化、消息通知機(jī)制等。
2.1 系統(tǒng)架構(gòu)與核對流程
在系統(tǒng)設(shè)計(jì)上,我們將 RCS 分為了三層:
- 變更數(shù)據(jù)獲?。―ata Fetching Layer)
- 數(shù)據(jù)核對(Data Checking Layer)
- 核對結(jié)果處理(Result Handling Layer)
Fig4. System Layers
2.1.1 變更數(shù)據(jù)獲取
實(shí)時核對,顧名思義需要著重關(guān)注“實(shí)時”和“核對”兩個要點(diǎn)。Data Fetching Layer 負(fù)責(zé)達(dá)成實(shí)時的目標(biāo),通過對不同 CDC(Change Data Capture,變更數(shù)據(jù)抓取)方案的調(diào)研,我們使用了 Log-Based 的方案來提供時效性保障。
擴(kuò)展閱讀
CDC 模式用于感知數(shù)據(jù)變更,主要可以分為以下 4 類:
- Timestamps,基于 update_time 或類似字段進(jìn)行查詢來獲取變更數(shù)據(jù)。
- Table Differencing,獲取完整數(shù)據(jù)快照進(jìn)行比對。
- Triggers,為 DDL、DML 設(shè)置 Trigger,將變更內(nèi)容用額外的操作記錄至數(shù)據(jù)庫。
- Log-Based,典型例子為利用 MySQL binlog 和 MongoDB oplog。
其中,Timestamps 方案和 Table Differencing 均由定時任務(wù)驅(qū)動,時效性較弱。Timestamps 方案無法感知被刪除的數(shù)據(jù),使用時需要由軟刪除代替;Table Differencing 方案彌補(bǔ)了這個缺點(diǎn),但是多次獲取完整數(shù)據(jù)會讓整套方案顯得非常笨重。
Triggers 方案和 Log-Based 方案獲取到的均為數(shù)據(jù)變更而非數(shù)據(jù)快照,但 Triggers 感知后以特定的語句將其記錄下來,本質(zhì)上是一次寫操作,仍給數(shù)據(jù)庫帶來了額外的負(fù)擔(dān)。
當(dāng) MySQL 產(chǎn)生數(shù)據(jù)變更時,高可用的 binlog 同步組件會獲取到對應(yīng) binlog,并將其投遞至 Kafka 中,以此獲取變更數(shù)據(jù)的數(shù)據(jù)值用于核對。
Fig5. Data Fetching Layer
在實(shí)際使用中,需要核對的數(shù)據(jù)可能并非都存在于 MySQL 中,例如我們也需要核對 MySQL 與 MongoDB 的數(shù)據(jù)、MySQL 與 Redis 的數(shù)據(jù)。為此,業(yè)務(wù)系統(tǒng)也可以通過自行投遞特定格式的 Kafka 消息來接入,從而保證接入的靈活性。
2.1.2 數(shù)據(jù)核對
Data Checking Layer 負(fù)責(zé)處理接收到的數(shù)據(jù)流,包括獲取特定的核對規(guī)則,接收到數(shù)據(jù)時進(jìn)行暫存或比對。RCS 對 binlog 數(shù)據(jù)進(jìn)行抽象,提煉了一套通用的可配置化的核對規(guī)則。用戶只需要填寫對應(yīng)的規(guī)則,即可實(shí)現(xiàn)自助接入。規(guī)則定義示例如下:
Fig6. Config Example
不難想象,不同系統(tǒng)間數(shù)據(jù)的變更是有先后的,且變更的消息被 RCS 接收到也會有先后順序。因此,先抵達(dá)的數(shù)據(jù)需要被存儲下來作為后續(xù)比對的目標(biāo),后抵達(dá)的數(shù)據(jù)則按照規(guī)則與已有數(shù)據(jù)進(jìn)行比對。
Fig7. Check Flow
為了便于描述,這里先定義幾個名稱:
- 數(shù)據(jù)上游:先到達(dá) RCS 的數(shù)據(jù)為上游。
- 數(shù)據(jù)下游:后到達(dá) RCS 的數(shù)據(jù)為下游。
- 核對項(xiàng):某個數(shù)據(jù)核對需求,包括上游數(shù)據(jù)和下游數(shù)據(jù)。例如:System A 與 System B 核對用戶資金狀態(tài)的需求。
Fig8. Kafka Data Check Flow
以下面這一次核對為例,它需要判斷數(shù)據(jù)是否在 10 秒達(dá)成一致,整體的核對流程可以簡要描述為:
比對數(shù)據(jù)到達(dá),進(jìn)行核對,并刪除 Redis key;
比對數(shù)據(jù)未到達(dá),判斷延遲隊(duì)列中的數(shù)據(jù)。
- (圖 8)核對項(xiàng)的上游數(shù)據(jù)到達(dá),暫存 Redis 和延遲隊(duì)列。
- (圖 8)RCS 等待核對項(xiàng)的下游數(shù)據(jù):
- (圖 9)延遲隊(duì)列到達(dá)時間后,再次查詢在 Redis 中是否有對應(yīng)數(shù)據(jù):
- 存在,則超過核對時間閾值,發(fā)送異常告警,刪除 Redis key;
- 不存在,則已核對。
Fig9. DelayQueue Check Flow
2.1.3 消息通知機(jī)制
RCS 的目標(biāo)是及時發(fā)現(xiàn)數(shù)據(jù)不一致的問題,因此,在 Result Handling Layer 中接入了 Shopee 企業(yè) IM(SeaTalk)的機(jī)器人進(jìn)行告警。未來告警接口也會進(jìn)行開放,便于擴(kuò)展和讓其它消息應(yīng)用進(jìn)行接入。
我們設(shè)計(jì)了四種消息通知機(jī)制:
- Mismatch Notice
- Aggregated Notice
- Recovery Notice
- Statistical Notice
Mismatch Notice 應(yīng)對一般場景下的核對失敗,及時通知到對應(yīng)的業(yè)務(wù)負(fù)責(zé)人,便于快速定位問題原因并修復(fù)數(shù)據(jù)。但當(dāng)大量數(shù)據(jù)出現(xiàn)不一致時,Aggregated Notice 會取而代之,將告警進(jìn)行聚合發(fā)送,避免影響到值班人員的正常閱讀。
RCS 也會將核對失敗的數(shù)據(jù)持久化,因而具備恢復(fù)感知的能力。當(dāng)異常數(shù)據(jù)恢復(fù)時,Recovery Notice 會發(fā)送消息告知使用者何種不一致已經(jīng)恢復(fù),間隔了多少時間。
最后,Statistical Notice 會向使用者報(bào)告常規(guī)的統(tǒng)計(jì)數(shù)據(jù),包括 DB 主從延遲、當(dāng)日核對成功率等。
2.2 核對功能演進(jìn)
系統(tǒng)上線至今,接入或自行部署使用 RCS 的團(tuán)隊(duì)越來越多,對應(yīng)的業(yè)務(wù)場景也各不相同,早期的核對規(guī)則難以滿足不同團(tuán)隊(duì)的核對需求。在 2021 年末,Shopee Financial Products 研發(fā)團(tuán)隊(duì)又對 Data Checking Layer 進(jìn)行了一系列的擴(kuò)展,目的是減少維護(hù)成本,以較為通用的方式支持不同團(tuán)隊(duì)的使用。
2.2.1 等值 / 映射核對
在最早上線的版本中,RCS 系統(tǒng)包含了等值和狀態(tài)映射核對的功能,是針對組內(nèi)實(shí)際面臨的場景設(shè)計(jì)的,滿足日常的使用需求。
核對系統(tǒng)主要處理的是上下游系統(tǒng)之間金額數(shù)值、狀態(tài)的變化,通常我們能獲取到的 binlog 核心字段示例和核對邏輯如下:
Fig10. Equivalence Check
假設(shè)先接收到 System A 的 binlog 消息,暫存 Redis,規(guī)定時間內(nèi)也接收到了 System B 的 binlog 消息:
??loan_amount?
? 為 200,需要找到一條對應(yīng)的 System A 的 binlog,且 ??order_amount?
? 需與之匹配;
??loan_status?
? 為 4,需要找到一條對應(yīng)的 System A 的 binlog,且 ??order_status?
? 需為 2。
- 根據(jù) System B 這條 binlog 的特征,發(fā)現(xiàn)配置有兩條核對規(guī)則:
對于不同系統(tǒng)間產(chǎn)生的單條記錄變更的核對,等值和映射檢查能覆蓋到大部分場景。但是因?yàn)檫@兩種核對的邏輯都是固定下來的,所以業(yè)務(wù)方如果有不同的核對需要,則需要新的代碼邏輯實(shí)現(xiàn)。為此,研發(fā)團(tuán)隊(duì)考慮 將核對邏輯交給使用方來描述 ,因而催生出了表達(dá)式核對的功能。
2.2.2 表達(dá)式核對
如果我們考慮以下的 binlog 示例,不同系統(tǒng)間的數(shù)據(jù)模型設(shè)計(jì)并不一致,字段非一一對應(yīng)。
Fig11. Expression Check
System A 記錄了 訂單的金額為 100 ,而 System B 記錄了訂單的 已支付金額為 30,借貸金額為 70 ,需要核對的是 System A ??order_amount?
? 是否等于 System B ??paid_amount + loan_amount?
? ,原有的設(shè)計(jì)無法支持。
為此,我們引入了表達(dá)式求值的方案,當(dāng) binlog 抵達(dá)時, 使用方通過一個返回值為布爾類型的表達(dá)式來描述自己的核對邏輯 ,如:
??a.order_amount == b.loan_amount?
?
??a.order_status == 2 && b.loan_status == 4?
?
- 判斷 2.2.2 中求和場景: ?
?a.order_amount == b.paid_amount + b.loan_amount?
? - 兼容判斷 2.2.1 中場景:
在表達(dá)式核對方案下,兩個系統(tǒng)間的幾乎所有的單條數(shù)據(jù)核對場景都能進(jìn)行覆蓋,且這種方案的好處在于研發(fā)團(tuán)隊(duì)不用再費(fèi)心思提供新的計(jì)算、映射、與或非邏輯實(shí)現(xiàn)的支持,大大減少了維護(hù)成本。
2.2.3 動態(tài)配置數(shù)據(jù)核對
在電商和金融的場景中,存在一些動態(tài)數(shù)據(jù),例如費(fèi)率、活動優(yōu)惠折扣等,會隨著業(yè)務(wù)和運(yùn)營計(jì)劃發(fā)生實(shí)時變化。這類數(shù)據(jù)通常存儲在配置表中,因此通過簡單的表達(dá)式無法進(jìn)行定義,而不同業(yè)務(wù)系統(tǒng)中的配置表結(jié)構(gòu)設(shè)計(jì)也不一樣,很難在核對系統(tǒng)代碼中進(jìn)行聲明。
為了滿足這種場景,RCS 引入了對業(yè)務(wù)系統(tǒng) SQL 查詢的支持,當(dāng)獲取到新的 binlog 時,檢查這條 binlog 滿足的核對規(guī)則,使用方在核對規(guī)則中會配置需要執(zhí)行的 SQL 語句,以及分庫分表規(guī)則,由核對系統(tǒng)執(zhí)行并得到比對的內(nèi)容,再進(jìn)行表達(dá)式核對:
- binlog 中獲取到當(dāng)前訂單的費(fèi)率 ?
?order_rate?
? 為 0.5。 - 根據(jù)配置信息執(zhí)行 ?
?SELECT?
? 語句查詢實(shí)時的費(fèi)率 ??rate?
? 。 - 執(zhí)行表達(dá)式核對 ?
?a.order_rate == rate?
? 。
除此之外,RCS 也能支持 JSON 串核對,譬如 System A 需要核對 ??order_rate?
? ,但是存儲 ??order_rate?
? 信息是一個 JSON 串, ??rate_info = {"decimal_base":"10000", "order_rate":"0.5"}?
? ??梢栽?RCS 的核對規(guī)則中,自定義 JSON 解析表達(dá)式,提取真實(shí)需要核對的字段。
3. 性能表現(xiàn)
RCS 系統(tǒng)的性能主要取決于 Data Fetching Layer 和 Data Checking Layer。
Data Fetching Layer 的性能代表實(shí)時獲取變更數(shù)據(jù)的能力,受 binlog 解析(CPU 密集型任務(wù))及 Kafka 的消息持久化(I/O 密集型任務(wù))影響。 業(yè)務(wù)團(tuán)隊(duì)可根據(jù)需要選擇對應(yīng)的硬件搭建 CDC 模塊 ,以我們使用場景為例,每秒可投遞的消息數(shù)量超過 20K 。
Data Checking Layer 則負(fù)責(zé)進(jìn)行數(shù)據(jù)核對,為了測試 RCS 的性能極限,Data Fetching 采用 Kafka 發(fā)送源數(shù)據(jù),核對系統(tǒng)采用單機(jī)部署。測試結(jié)果表明, RCS 每秒可完成核對 10K+ 次 ,詳細(xì)數(shù)據(jù)如下:
Component | Machine |
Kafka | 3 * 48 Core 128 GB |
Redis | 3 * 48 Core 128 GB |
Real-time Checking System | 1 * 48 Core 128 GB |
Number of check entry | TPS | CPU Cost |
1 entry | 14.3K | 454% |
2 entries | 12.0K | 687% |
3 entries | 10.4K | 913% |
從壓測結(jié)果分析,RCS 的性能瓶頸主要取決于 Redis 集群的性能,單次核對耗時約為 0.5ms。當(dāng)然,RCS 支持集群部署,做為 Kafka 的消費(fèi)者,可以利用 Kafka consumer group 的 Rebanancing 機(jī)制,從而實(shí)現(xiàn)動態(tài)擴(kuò)/縮容的機(jī)制。
4. 總結(jié)
Shopee Financial Products 團(tuán)隊(duì)在 2021 年落地的 RCS 目前在多個產(chǎn)品線推廣和使用,主要解決傳統(tǒng) T+1 式離線數(shù)據(jù)核對延遲高、業(yè)務(wù)耦合緊密,且隨新業(yè)務(wù)上線還帶來額外的開發(fā)負(fù)擔(dān)的問題。
RCS 通過靈活的核對規(guī)則配置化、表達(dá)式場景覆蓋以及 Log-Based 的 CDC 方案,提供近實(shí)時的數(shù)據(jù)核對解決方案,最大程度地降低數(shù)據(jù)不一致導(dǎo)致的資金、信息安全等風(fēng)險。我們也歡迎不同的用戶和團(tuán)隊(duì)接入或部署使用,在后續(xù)的更新迭代中,RCS 會進(jìn)一步提升核對的性能,以支撐業(yè)務(wù)量增長帶來的核對需求。
本文作者
Yizhong、Songtao,后端研發(fā)工程師。來自 Shopee Financial Products 團(tuán)隊(duì)。
Jiekun,后端研發(fā)工程師,熱衷于分布式系統(tǒng) & Kubernetes。來自 Shopee Off-Platform Ads 團(tuán)隊(duì)。