RocketMQ 事務消息初體驗
事務消息是 RocketMQ 的高級特性之一 。這篇文章,筆者會從應用場景、功能原理、實戰例子三個模塊慢慢為你揭開事務消息的神秘面紗。
1 應用場景
舉一個電商場景的例子:用戶購物車結算時,系統會創建支付訂單。
用戶支付成功后支付訂單的狀態會由未支付修改為支付成功,然后系統給用戶增加積分。
通常我們會使用普通消費方案,該方案能夠發揮 MQ 的優勢:異步和解耦 , 同時架構設計非常簡單。
圖片
- 用戶購物車結算時,系統創建支付訂單;
- 支付成功后,更新訂單的狀態從未支付修改為支付成功;
- 發送一條普通消息到消息隊列服務端;
- 積分服務消費消息,添加積分記錄。
但該方案有個非常直觀的缺點:容易出現不一致的現象。
- 假如先發送消息,后修改訂單狀態,消息發送成功,訂單沒有執行成功,需要回滾整個事務(訂單數據事務回滾,積分服務消費時,需要先反查事務狀態,若事務提交,才能插入積分記錄)。
- 假如先修改訂單狀態,后發送消息,訂單狀態修改成功,但消息發送失敗,需要補償操作才能保持最終一致。
- 假如先修改訂單,后發送消息,訂單狀態修改成功,但消息發送超時,此時無法判斷需要回滾訂單還是提交訂單變更。
我們看到,為了完善普通消費方案,業務層還需要做到兩點:補償機制和提供事務狀態查詢接口。
要做到這兩點,難不難呢?
不難,但是業務層代碼會比較混亂,更優的方案還是得從中間件層面解決。
2 功能原理
RocketMQ 事務消息是支持在分布式場景下保障消息生產和本地事務的最終一致性。交互流程如下圖所示:
圖片
1、生產者將消息發送至 Broker 。
2、Broker 將消息持久化成功之后,向生產者返回 Ack 確認消息已經發送成功,此時消息被標記為"暫不能投遞",這種狀態下的消息即為半事務消息。
3、生產者開始執行本地事務邏輯。
4、生產者根據本地事務執行結果向服務端提交二次確認結果( Commit 或是 Rollback ),Broker 收到確認結果后處理邏輯如下:
- 二次確認結果為 Commit :Broker 將半事務消息標記為可投遞,并投遞給消費者。
- 二次確認結果為 Rollback :Broker 將回滾事務,不會將半事務消息投遞給消費者。
5、在斷網或者是生產者應用重啟的特殊情況下,若 Broker 未收到發送者提交的二次確認結果,或 Broker 收到的二次確認結果為 Unknown 未知狀態,經過固定時間后,服務端將對消息生產者即生產者集群中任一生產者實例發起消息回查。
- 生產者收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。
- 生產者根據檢查到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟4對半事務消息進行處理。
筆者認為事務消息的精髓在于:
- 本地事務執行成功,消費者才能消費事務消息;
- 消息回查本身就是補償機制的實現,事務生產者需提供了事務狀態查詢接口。
3 實戰例子
為了便于大家理解事務消息 ,筆者新建一個工程用于模擬支付訂單創建、支付成功、贈送積分的流程。
首先,我們創建一個真實的訂單主題:order-topic 。
圖片
然后在數據庫中創建三張表 訂單表、事務日志表、積分表。
圖片
最后我們創建一個 Demo 工程,生產者模塊用于創建支付訂單、修改支付訂單成功,消費者模塊用于新增積分記錄。
圖片
接下來,我們展示事務消息的實現流程。
1、創建支付訂單
調用訂單生產者服務創建訂單接口 ,在 t_order 表中插入一條支付訂單記錄。
圖片
2、調用生產者服務修改訂單狀態接口
接口的邏輯就是執行事務生產者的 sendMessageInTransaction 方法。
圖片
生產者端需要配置事務生產者和事務監聽器。
圖片
發送事務消息的方法內部包含三個步驟 :
圖片
事務生產者首先發送半事務消息,發送成功后,生產者才開始執行本地事務邏輯。
事務監聽器實現了兩個功能:執行本地事務和供 Broker 回查事務狀態 。
圖片
執行本地事務的邏輯內部就是執行 orderService.updateOrder 方法。
方法執行成功則返回 LocalTransactionState.COMMIT_MESSAGE , 若執行失敗則返回 LocalTransactionState.ROLLBACK_MESSAGE 。
圖片
需要注意的是: orderService.updateOrder 方法添加了事務注解,并將修改訂單狀態和插入事務日志表放進一個事務內,避免訂單狀態和事務日志表的數據不一致。
最后,生產者根據本地事務執行結果向 Broker 提交二次確認結果。
Broker 收到生產者確認結果后處理邏輯如下:
- 二次確認結果為 Commit :Broker 將半事務消息標記為可投遞,并投遞給消費者。
- 二次確認結果為 Rollback :Broker 將回滾事務,不會將半事務消息投遞給消費者。
3、積分消費者消費消息,添加積分記錄
當 Broker 將半事務消息標記為可投遞時,積分消費者就可以開始消費主題 order-topic 的消息了。
圖片
積分消費者服務,我們定義了消費者組名,以及訂閱主題和消費監聽器。
圖片
在消費監聽器邏輯里,冪等非常重要 。當收到訂單信息后,首先判斷該訂單是否有積分記錄,若沒有記錄,才插入積分記錄。
而且我們在創建積分表時,訂單編號也是唯一鍵,數據庫中也必然不會存在相同訂單的多條積分記錄。
4 總結
RocketMQ 事務消息是支持在分布式場景下保障消息生產和本地事務的最終一致性。
編寫一個實戰例子并不復雜,但使用事務消息時需要注意如下三點:
1、事務生產者和消費者共同協作才能保證業務數據的最終一致性;
2、事務生產者需要實現事務監聽器,并且保存事務的執行結果(比如事務日志表) ;
3、消費者要保證冪等。消費失敗時,通過重試、告警+人工介入等手段保證消費結果正確。
筆者會在后續的文章里,詳細解析事務消息的實現原理,敬請期待。
實戰代碼地址:
https://github.com/makemyownlife/rocketmq4-learning