RocketMQ如何應對每天1500億條的數(shù)據(jù)處理?
同程藝龍的機票、火車票、汽車票、酒店相關業(yè)務已經接入了 RocketMQ,用于流量高峰時候的削峰,以減少后端的壓力。
同時,對常規(guī)的系統(tǒng)進行解耦,將一些同步處理改成異步處理,每天處理的數(shù)據(jù)達 1500 億條。
在近期的 Apache RocketMQ Meetup 上,同程藝龍機票事業(yè)部架構師查江,分享了同程藝龍的消息系統(tǒng)如何應對每天 1500 億條的數(shù)據(jù)處理。
通過此文,您將了解到:
- 同程藝龍消息系統(tǒng)的使用情況
- 同程藝龍消息系統(tǒng)的應用場景
- 技術上踩過的坑
- 基于 RocketMQ 的改進
同程藝龍消息系統(tǒng)的使用情況
RocketMQ 集群分為 Name Server 和 Broker 兩部分,Name Server 用的是雙主模式,一個是考慮性能,另一個考慮安全性。在純數(shù)據(jù)的 Broker 分成很多組,每個組里面分為 Master 和 Slave。
目前,我們的機票、火車票、汽車票、酒店相關業(yè)務已經接入了 RocketMQ,用于流量高峰時候的削峰,以減少后端的壓力。
同時,對常規(guī)的系統(tǒng)進行解耦,將一些同步處理改成異步處理,每天處理的數(shù)據(jù)達 1500 億條。
選擇 RocketMQ 的原因是:
- 接入簡單,引入的 Java 包比較少
- 純 Java 開發(fā),設計邏輯比較清晰
- 整體性能比較穩(wěn)定的,Topic 數(shù)量大的情況下,可以保持性能
同程藝龍消息系統(tǒng)的應用場景
退訂系統(tǒng)
這個是我們退訂系統(tǒng)中的一個應用場景。用戶點擊前端的退訂按鈕,系統(tǒng)調用退訂接口,再去調用供應商的退訂接口,從而完成一個退訂功能。
如果供應商的系統(tǒng)接口不可靠,就會導致用戶退訂失敗,如果系統(tǒng)設置為同步操作,會導致用戶需要再去點一次。
所以,我們引入 RocketMQ 將同步改為異步,當前端用戶發(fā)出退訂需求,退訂系統(tǒng)接收到請求后就會記錄到退訂系統(tǒng)的數(shù)據(jù)庫里面,表示這個用戶正在退訂。
同時通過消息引擎把這條退訂消息發(fā)送到和供應商對接的系統(tǒng),去調用供應商的接口。
如果調用成功,就會把數(shù)據(jù)庫進行標識,表示已經退訂成功。同時,加了一個補償?shù)哪_本,去數(shù)據(jù)庫撈那些未退訂成功的消息,重新退訂,避免消息丟失引起的退訂失敗的情況。
房倉系統(tǒng)
第二個應用場景是我們的房倉系統(tǒng)。這是一個比較常規(guī)的消息使用場景,我們從供應商處采集一些酒店的基本信息數(shù)據(jù)和詳情數(shù)據(jù),然后接入到消息系統(tǒng),由后端的分銷系統(tǒng)、最小價系統(tǒng)和庫存系統(tǒng)來進行計算。
同時當供應商有變價的時候,變價事件也會通過消息系統(tǒng)傳遞給我們的后端業(yè)務系統(tǒng),來保證數(shù)據(jù)的實時性和準確性。
供應庫的訂閱系統(tǒng)
數(shù)據(jù)庫的訂閱系統(tǒng)也用到了消息的應用。一般情況下做數(shù)據(jù)庫同步,都是通過 binlog 去讀里面的數(shù)據(jù),然后搬運到數(shù)據(jù)庫。
搬運過程中,我們最關注的是數(shù)據(jù)的順序性,因此在數(shù)據(jù)庫 row 模式的基礎上,新增了一個功能,以確保每一個 Queue 里面的順序是唯一的。
雖然 Queue 里面的順序天然都是唯一的,但我們在使用上有一個特點,就是把相同 ID 的消息都是放在同一個 Queue 里面的。
例如,圖中右上角 id1 的消息,數(shù)據(jù)庫主字段是 id1,就統(tǒng)一放在 Queue1 里面,而且是順序的。
在 Queue2 里,兩個 id3 之間被兩個順序的 id2 間隔開來了,但實際消費讀出來的時候,也會是順序的,由此,可以用多隊列的順序來提高整體的并發(fā)度。
技術上踩過的坑
供應商系統(tǒng)的場景
上圖中,一個 MQ 對應有兩個消費者,他們是在同一個 Group1 中,起初大家都只有 Topic1,這時候是正常消費的。
但如果在***個消費者里面加入一個 Topic2,這時候是無法消費或消費不正常了。
這是 RocketMQ 本身的機制引起的問題,需要在第二個消費者里面加入 Topic2 才能正常消費。
支付交易系統(tǒng)的場景
另外一個是支付交易系統(tǒng),這個場景下也是有兩個應用,他們都是在同一 Group 和同一 Topic 下,一個是消費 Tag1 的數(shù)據(jù),另一個是消費 Tag2 的數(shù)據(jù)。
正常情況下,啟動應該是沒問題的,但是有一天我們發(fā)現(xiàn)一個應用起不來了,另外一個應用,他只消費 Tag2 的數(shù)據(jù),但是因為 RocketMQ 的機制會把 Tag1 的數(shù)據(jù)拿過來,拿過來后會把 Tag1 的數(shù)據(jù)丟棄。
這會導致用戶在支付過程中出現(xiàn)支付失敗的情況。對此,我們把 Tag2 放到 Group2 里面,兩個 Group 就不會消費相同的消息了。
個人建議 RocketMQ 能夠實現(xiàn)一個機制,即只接受自己的 Tag 消息,非相關的 Tag 不去接收。
大量老數(shù)據(jù)讀取的場景
在火車票消費的場景中,我們發(fā)現(xiàn)有 200 億條老數(shù)據(jù)沒有被消費。當我們消費啟動的時候,RocketMQ 會默認從第 0 個數(shù)據(jù)開始讀,這時候磁盤 IO 飆升到 100%,從而影響到其他消費端數(shù)據(jù)的讀取,但這些老數(shù)據(jù)被加載后,并沒有實際作用。
因此,對于大量老數(shù)據(jù)讀取的改進方式是:
- 對于新消費組,默認從 LAST_OFFSET 消費。
- Broker 中單 Topic 堆積超過 1000 萬時,禁止消費,需聯(lián)系管理員開啟消費。
- 監(jiān)控要到位,磁盤 IO 飆升時,能立刻聯(lián)系到消費方處理。
服務端的場景
①CentOS 6.6 中 Futex Kernel bug, 導致 Name Server, Broker 進程經常掛起,無法正常工作
解決方法:升級到 6.7
②服務端 2 個線程會創(chuàng)建相同 CommitLog 放入 List,導致計算消息 offset 錯誤,解析消息失敗,無法消費,重啟沒法解決問題。
解決方法:線程安全問題,改為單線程
③Pull 模式下重置消費進度,導致服務端填充大量數(shù)據(jù)到 Map 中,Broker CPU 使用率飆升 100%。
解決方法:Map 局部變量場景用不到,刪除
④Master 建議客戶端到 Slave 消費時,若數(shù)據(jù)還沒同步到 Slave, 會重置 pullOffset,導致大量重復消費。
解決方法:不重置 offset
⑥同步沒有 MagicCode,安全組掃描同步端口時,Master 解析錯誤,導致一些問題。
解決方法:同步時添加 magicCode 校驗
基于 RocketMQ 的改進
新增客戶端
新增 .net 客戶端,基于 Java 源代碼原生開發(fā);新增 HTTP 客戶端,實現(xiàn)部分功能,并通過 Netty Server 連接 RocketMQ。
新增消息限流功能
如果客戶端代碼寫錯產生死循環(huán),就會產生大量的重復數(shù)據(jù),這時候會把生產線程打滿,導致隊列溢出,嚴重影響我們 MQ 集群的穩(wěn)定性,從而影響其他業(yè)務。
上圖是限流的模型圖,我們把限流功能加在 Topic 之前。通過限流功能可以設置 rate limit 和 size limit 等。
其中 rate limit 是通過令牌桶算法來實現(xiàn)的,即每秒往桶里放多少個令牌,每秒就消費多少速度,或者是往 Topic 里寫多少數(shù)據(jù)。以上的兩個配置是支持動態(tài)修改的。
后臺監(jiān)控
我們還做了一個監(jiān)控后臺,用于監(jiān)控消息的全鏈路過程,包括:
- 消息全鏈路追蹤,覆蓋消息產生、消費、過期整個生命周期
- 消息生產、消費曲線
- 消息生產異常報警
- 消息堆積報警,通知哪個 IP 消費過慢
其他功能:
- HTTP 方式生產,消費消息
- Topic 消費權限設置,Topic 只能被指定 Group 消費,防止線上錯亂訂閱
- 支持新消費組從***位置消費 (默認是從***條開始消費)
- 廣播模式消費進度同步 (服務端顯示進度)
以上便是同程藝龍在消息系統(tǒng)建設方面的實踐。