用RocketMQ實現(xiàn)可靠消息最終一致性方案,yyds
寫在前面
對于常見的微服務系統(tǒng),大部分接口調(diào)用是同步的,也就是一個服務直接調(diào)用另外一個服務的接口。
這個時候,用TCC分布式事務方案來保證各個接口的調(diào)用,要么一起成功,要么一起回滾,是比較合適的。
但是在實際系統(tǒng)的開發(fā)過程中,可能服務間的調(diào)用是異步的。
也就是說,一個服務發(fā)送一個消息給MQ,即消息中間件,比如RocketMQ、RabbitMQ、Kafka、ActiveMQ等等。
然后,另外一個服務從MQ消費到一條消息后進行處理。這就成了基于MQ的異步調(diào)用了。
那么針對這種基于MQ的異步調(diào)用,如何保證各個服務間的分布式事務呢?
?也就是說,我希望的是基于MQ實現(xiàn)異步調(diào)用的多個服務的業(yè)務邏輯,要么一起成功,要么一起失敗。
這個時候,就要用上可靠消息最終一致性方案,來實現(xiàn)分布式事務。?
大家看看上面那個圖,其實如果不考慮各種高并發(fā)、高可用等技術挑戰(zhàn)的話,單從“可靠消息”以及“最終一致性”兩個角度來考慮,這種分布式事務方案還是比較簡單的。
可靠消息最終一致性方案的核心流程
1、上游服務投遞消息
如果要實現(xiàn)可靠消息最終一致性方案,一般你可以自己寫一個可靠消息服務,實現(xiàn)一些業(yè)務邏輯。
首先,上游服務需要發(fā)送一條消息給可靠消息服務。
?這條消息說白了,你可以認為是對下游服務一個接口的調(diào)用,里面包含了對應的一些請求參數(shù)。
然后,可靠消息服務就得把這條消息存儲到自己的數(shù)據(jù)庫里去,狀態(tài)為“待確認”。
接著,上游服務就可以執(zhí)行自己本地的數(shù)據(jù)庫操作,根據(jù)自己的執(zhí)行結果,再次調(diào)用可靠消息服務的接口。
如果本地數(shù)據(jù)庫操作執(zhí)行成功了,那么就找可靠消息服務確認那條消息。如果本地數(shù)據(jù)庫操作失敗了,那么就找可靠消息服務刪除那條消息。
此時如果是確認消息,那么可靠消息服務就把數(shù)據(jù)庫里的消息狀態(tài)更新為“已發(fā)送”,同時將消息發(fā)送給MQ。
這里有一個很關鍵的點,就是更新數(shù)?據(jù)庫里的消息狀態(tài)和投遞消息到MQ。這倆操作,你得放在一個方法里,而且得開啟本地事務。
啥意思呢?
- 如果數(shù)據(jù)庫里更新消息的狀態(tài)失敗了,那么就拋異常退出了,就別投遞到MQ;
- 如果投遞MQ失敗報錯了,那么就要拋異常讓本地數(shù)據(jù)庫事務回滾。
- 這倆操作必須得一起成功,或者一起失敗。
如果上游服務是通知刪除消息,那么可靠消息服務就得刪除這條消息。
2、下游服務接收消息
下游服務就一直等著從MQ消費消息好了,如果消費到了消息,那么就操作自己本地數(shù)據(jù)庫。
如果操作成功了,就反過來通知可靠消息服務,說自己處理成功了,然后可靠消息服務就會把消息的狀態(tài)設置為“已完成”。
3、如何上游服務對消息的100%可靠投遞?
上面的核心流程大家都看完:一個很大的問題就是,如果在上述投遞消息的過程中各個環(huán)節(jié)出現(xiàn)了問題該怎么辦?
我們?nèi)绾伪WC消息100%的可靠投遞,一定會從上游服務投遞到下游服務?別著急,下面我們來逐一分析。
如果上游服務給可靠消息服務發(fā)送待確認消息的過程出錯了,那沒關系,上游服務可以感知到調(diào)用異常的,就不用執(zhí)行下面的流程了,這是沒問題的。
如果上游服務操作完本地數(shù)據(jù)庫之后,通知可靠消息服務確認消息或者刪除消息的時候,出現(xiàn)了問題。
比如:沒通知成功,或者沒執(zhí)行成功,或者是可靠消息服務沒成功的投遞消息到MQ。這一系列步驟出了問題怎么辦?
其實也沒關系,因為在這些情況下,那條消息在可靠消息服務的數(shù)據(jù)庫里的狀態(tài)會一直是“待確認”。
此時,我們在可靠消息服務里開發(fā)一個后臺定時運行的線程,不停的檢查各個消息的狀態(tài)。
如果一直是“待確認”狀態(tài),就認為這個消息出了點什么問題。
此時的話,就可以回調(diào)上游服務提供的一個接口,問問說,兄弟,這個消息對應的數(shù)據(jù)庫操作,你執(zhí)行成功了沒?。?/span>
如果上游服務答復說,我執(zhí)行成功了,那么可靠消息服務將消息狀態(tài)修改為“已發(fā)送”,同時投遞消息到MQ。
如果上游服務答復說,沒執(zhí)行成功,那么可靠消息服務將數(shù)據(jù)庫中的消息刪除即可。
通過這套機制,就可以保證,可靠消息服務一定會嘗試完成消息到MQ的投遞。
4、如何保證下游服務對消息的100%可靠接收?
?那如果下游服務消費消息出了問題,沒消費到?或者是下游服務對消息的處理失敗了,怎么辦?
其實也沒關系,在可靠消息服務里開發(fā)一個后臺線程,不斷的檢查消息狀態(tài)。
如果消息狀態(tài)一直是“已發(fā)送”,始終沒有變成“已完成”,那么就說明下游服務始終沒有處理成功。
此時可靠消息服務就可以再次嘗試重新投遞消息到MQ,讓下游服務來再次處理。
只要下游服務的接口邏輯實現(xiàn)冪等性,保證多次處理一個消息,不會插入重復數(shù)據(jù)即可。?
5、如何基于RocketMQ來實現(xiàn)可靠消息最終一致性方案?
在上面的通用方案設計里,完全依賴可靠消息服務的各種自檢機制來確保:
- 如果上游服務的數(shù)據(jù)庫操作沒成功,下游服務是不會收到任何通知
- 如果上游服務的數(shù)據(jù)庫操作成功了,可靠消息服務死活都會確保將一個調(diào)用消息投遞給下游服務,而且一定會確保下游服務務必成功處理這條消息。
通過這套機制,保證了基于MQ的異步調(diào)用/通知的服務間的分布式事務保障。
其實阿里開源的RocketMQ,就實現(xiàn)了可靠消息服務的所有功能,核心思想跟上面類似。
只不過RocketMQ為了保證高并發(fā)、高可用、高性能,做了較為復雜的架構實現(xiàn),非常的優(yōu)秀。
有興趣的同學,自己可以去查閱RocketMQ對分布式事務的支持。
可靠消息最終一致性方案的高可用保障生產(chǎn)實踐
1、背景引入
?其實上面那套方案和思想,很多同學應該都知道是怎么回事兒,我們也主要就是鋪墊一下這套理論思想。
在實際落地生產(chǎn)的時候,如果沒有高并發(fā)場景的,完全可以參照上面的思路自己基于某個MQ中間件開發(fā)一個可靠消息服務。
如果有高并發(fā)場景的,可以用RocketMQ的分布式事務支持,上面的那套流程都可以實現(xiàn)。
今天給大家分享的一個核心主題,就是這套方案如何保證99.99%的高可用。
其實大家應該發(fā)現(xiàn)了這套方案里保障高可用性最大的一個依賴點,就是MQ的高可用性。
任何一種MQ中間件都有一整套的高可用保障機制,無論是RabbitMQ、RocketMQ還是Kafka。
所以在大公司里使用可靠消息最終一致性方案的時候,我們通常對可用性的保障都是依賴于公司基礎架構團隊對MQ的高可用保障。
也就是說,大家應該相信兄弟團隊,99.99%可以保障MQ的高可用,絕對不會因為MQ集群整體宕機,而導致公司業(yè)務系統(tǒng)的分布式事務全部無法運行。?
但是現(xiàn)實是很殘酷的,很多中小型的公司,甚至是一些中大型公司,或多或少都遇到過MQ集群整體故障的場景。
MQ一旦完全不可用,就會導致業(yè)務系統(tǒng)的各個服務之間無法通過MQ來投遞消息,導致業(yè)務流程中斷。
比如最近就有一個朋友的公司,也是做電商業(yè)務的,就遇到了MQ中間件在自己公司機器上部署的集群整體故障不可用,導致依賴MQ的分布式事務全部無法跑通,業(yè)務流程大量中斷的情況。
這種情況,就需要針對這套分布式事務方案實現(xiàn)一套高可用保障機制。
2、基于KV存儲的隊列支持的高可用降級方案
大家來看看下面這張圖,這是我曾經(jīng)指導過朋友的一個公司針對可靠消息最終一致性方案設計的一套高可用保障降級機制。
這套機制不算太復雜,可以非常簡單有效的保證那位朋友公司的高可用保障場景,一旦MQ中間件出現(xiàn)故障,立馬自動降級為備用方案。
(1)自行封裝MQ客戶端組件與故障感知
首先第一點,你要做到自動感知MQ的故障接著自動完成降級,那么必須動手對MQ客戶端進行封裝,發(fā)布到公司Nexus私服上去。
然后公司需要支持MQ降級的業(yè)務服務都使用這個自己封裝的組件來發(fā)送消息到MQ,以及從MQ消費消息。
在你自己封裝的MQ客戶端組件里,你可以根據(jù)寫入MQ的情況來判斷MQ是否故障。
比如說,如果連續(xù)10次重試嘗試投遞消息到MQ都發(fā)現(xiàn)異常報錯,網(wǎng)絡無法聯(lián)通等問題,說明MQ故障,此時就可以自動感知以及自動觸發(fā)降級開關。
(2)基于kv存儲中隊列的降級方案
如果MQ掛掉之后,要是希望繼續(xù)投遞消息,那么就必須得找一個MQ的替代品。
舉個例子,比如我那位朋友的公司是沒有高并發(fā)場景的,消息的量很少,只不過可用性要求高。此時就可以類似redis的kv存儲中的隊列來進行替代。
由于redis本身就支持隊列的功能,還有類似隊列的各種數(shù)據(jù)結構,所以你可以將消息寫入kv存儲格式的隊列數(shù)據(jù)結構中去。
ps:關于redis的數(shù)據(jù)存儲格式、支持的數(shù)據(jù)結構等基礎知識,請大家自行查閱了,網(wǎng)上一大堆。
但是,這里有幾個大坑,一定要注意一下。
第一個,任何kv存儲的集合類數(shù)據(jù)結構,建議不要往里面寫入數(shù)據(jù)量過大,否則會導致大value的情況發(fā)生,引發(fā)嚴重的后果。
因此絕不能在redis里搞一個key,就拼命往這個數(shù)據(jù)結構中一直寫入消息,這是肯定不行的。
第二個,絕對不能往少數(shù)key對應的數(shù)據(jù)結構中持續(xù)寫入數(shù)據(jù),那樣會導致熱key的產(chǎn)生,也就是某幾個key特別熱。
大家要知道,一般kv集群,都是根據(jù)key來hash分配到各個機器上的,你要是老寫少數(shù)幾個key,會導致kv集群中的某臺機器訪問過高,負載過大。
基于以上考慮,下面是筆者當時設計的方案:
- 根據(jù)他們每天的消息量,在kv存儲中固定劃分上百個隊列,有上百個key對應。
- 這樣保證每個key對應的數(shù)據(jù)結構中不會寫入過多的消息,而且不會頻繁的寫少數(shù)幾個key。
- 一旦發(fā)生了MQ故障,可靠消息服務可以對每個消息通過hash算法,均勻的寫入固定好的上百個key對應的kv存儲的隊列中。
同時此時需要通過zk觸發(fā)一個降級開關,整個系統(tǒng)在MQ這塊的讀和寫全部立馬降級。
3、下游服務消費MQ的降級感知
下游服務消費MQ也是通過自行封裝的組件來做的,此時那個組件如果從zk感知到降級開關打開了,首先會判斷自己是否還能繼續(xù)從MQ消費到數(shù)據(jù)?
如果不能了,就開啟多個線程,并發(fā)的從kv存儲的各個預設好的上百個隊列中不斷的獲取數(shù)據(jù)。
每次獲取到一條數(shù)據(jù),就交給下游服務的業(yè)務邏輯來執(zhí)行。
通過這套機制,就實現(xiàn)了MQ故障時候的自動故障感知,以及自動降級。如果系統(tǒng)的負載和并發(fā)不是很高的話,用這套方案大致是沒沒問題的。
因為在生產(chǎn)落地的過程中,包括大量的容災演練以及生產(chǎn)實際故障發(fā)生時的表現(xiàn)來看,都是可以有效的保證MQ故障時,業(yè)務流程繼續(xù)自動運行的。
4、故障的自動恢復
如果降級開關打開之后,自行封裝的組件需要開啟一個線程,每隔一段時間嘗試給MQ投遞一個消息看看是否恢復了。
如果MQ已經(jīng)恢復可以正常投遞消息了,此時就可以通過zk關閉降級開關,然后可靠消息服務繼續(xù)投遞消息到MQ,下游服務在確認kv存儲的各個隊列中已經(jīng)沒有數(shù)據(jù)之后,就可以重新切換為從MQ消費消息。
5、更多的業(yè)務細節(jié)
其實上面說的那套方案主要是一套通用的降級方案,但是具體的落地是要結合各個公司不同的業(yè)務細節(jié)來決定的,很多細節(jié)多沒法在文章里體現(xiàn)。
比如說你們要不要保證消息的順序性?是不是涉及到需要根據(jù)業(yè)務動態(tài),生成大量的key?等等。
此外,這套方案實現(xiàn)起來還是有一定的成本的,所以建議大家盡可能還是push公司的基礎架構團隊,保證MQ的99.99%可用性,不要宕機。
其次就是根據(jù)大家公司的實際對高可用需求來決定,如果感覺MQ偶爾宕機也沒事,可以容忍的話,那么也不用實現(xiàn)這種降級方案。
但是如果公司領導認為MQ中間件宕機后,一定要保證業(yè)務系統(tǒng)流程繼續(xù)運行,那么還是要考慮一些高可用的降級方案,比如本文提到的這種。
最后再說一句,真要是一些公司涉及到每秒幾萬幾十萬的高并發(fā)請求,那么對MQ的降級方案會設計的更加的復雜,那就遠遠不是這么簡單可以做到的。