成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

RocketMQ事務消息解析!

開發 架構
事務消息是 RocketMQ 提供的一種消息類型,支持在分布式場景下保障消息生產和本地事務的最終一致性。RocketMQ?采用了2PC的思想來實現了提交事務消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息。

單體架構下的事務

在單體系統的開發過程中,假如某個場景下需要對數據庫的多張表進行操作,為了保證數據的一致性,一般會使用事務,將所有的操作全部提交或者在出錯的時候全部回滾。

以創建訂單為例,假設下單后需要做兩個操作:

?在訂單表生成訂單。

在積分表增加本次訂單增加的積分記錄。

在單體架構下只需使用@Transactional開啟事務,就可以保證數據的一致性。

@Transactional
    public void order() {
        String orderId = UUID.randomUUID().toString();
        // 生成訂單
        orderService.createOrder(orderId);
        // 增加積分
        creditService.addCredits(orderId);
    }

?

但在分布式架構下,訂單系統和積分系統可能是兩個獨立的服務,此時就不能使用上述的方法開啟事務了,因為它們不處于同一個事務中。

  • 在出錯的情況下,無法進行全部回滾,只能對當前服務的事務進行回滾。

所以就有可能出現訂單生成成功但是積分服務增加積分失敗的情況(也可能相反),此時數據處于不一致的狀態。

分布式架構下的事務

以下單流程為例,在分布式架構下的處理流程如下:

?訂單服務生成訂單。

發送訂單生成的MQ消息,積分服務訂閱消息,有新的訂單生成之后消費消息,增加對應的積分記錄。

普通MQ消息存在的問題

?假如訂單創建成功,MQ消息發送成功,但是order方法在返回的前一刻,服務突然宕機。

由于開啟了事務,事務還未提交(方法結束后才會正常提交)。

所以訂單表并未生成記錄,但是MQ卻已經發送成功并且被積分服務消費,此時就會存在訂單未創建但是積分記錄增加的情況。

假如先發送MQ消息再創建訂單,如果MQ消息發送成功,創建訂單失敗,那么同樣處于不一致的狀態。

@Transactional
    public void order() {
        String orderId = UUID.randomUUID().toString();
        // 創建訂單
        Order order = orderService.createOrder(orderDTO.getOrderId());
        // 發送訂單創建的MQ消息
        sendOrderMessge(order);
        return;
    }

可以使用RocketMQ事務消息解決上述問題。

RocketMQ事務消息基礎流程

?Apache RocketMQ在4.3.0版中已經支持分布式事務消息。

事務消息是 RocketMQ 提供的一種消息類型,支持在分布式場景下保障消息生產和本地事務的最終一致性。

RocketMQ采用了2PC的思想來實現了提交事務消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息。

基本流程

?第一階段

  • 發送 Message,Half Message,即半事務消息。
  • 此類型的 Message 是不會被 Consumer 消費。

第二階段:如果半事務消息投遞成功,則會開始執行本地事務。

分為如下三種 Case:

  • 本地事務執行成功:
  • 會向 Broker 發送 commit 消息,被 commit 過后的 Message 才能被 Consumer 消費到。
  • 本地事務執行失?。?/li>
  • 會向 Broker 發送 rollback 消息,Broker 則會將剛剛投遞的半事務消息刪除,從而保證上下游數據的一致性。

  • 如果 Producer 實例或者網絡出現了問題,Producer 沒能及時地將本地事務執行的結果通知 Broker。

  • Broker 會通過掃描發現某條 Message 長時間處于半事務消息狀態。

  • Broker 會主動地向 Producer 詢問此 Message 對應的事務狀態。

值得注意的是:

?

RocketMQ 并不會無休止的的信息事務狀態回查,默認回查 15 次。

如果 15 次回查還是無法得知事務狀態,RocketMQ 默認回滾該消息。

RocketMQ事務消息使用限制

?

事務消息不支持延時消息和批量消息。

事務性消息可能不止一次被檢查或消費,所以消費者端需要做好消費冪等。

事務消息的生產者 ID 不能與其他類型消息的生產者 ID 共享。

  • 與其他類型的消息不同,事務消息允許反向查詢、MQ服務器能通過它們的生產者 ID 查詢到消費者。

RocketMQ事務消息基本原理

采用2PC兩階段設計。

?

將 Message 原本真實的 Topic 和 MessageQueue 進行備份。

  • 放入到PROPERTY_REAL_TOPIC、PROPERTY_REAL_QUEUE_ID中保存。

將消息投遞到一個內部Topic中RMQ_SYS_TRANS_HALF_TOPIC,該隊列專門存儲事務消息。

所有的 Half Message 全部都寫入到 queueId 為 0 的 MessageQueue。

因為一個 Topic 下只有 1 個 MessageQueue:

  • 這個 Topic 下的所有 Message 就是全局有序的,它們會按照先來后到的順序被消費。

如果本地事務執行成功進行Commit,則將RMQ_SYS_TRANS_HALF_TOPIC 隊列中的消息投遞到真實的Topic中,供后續流程執行。

  • 并刪除這條 Half Message ,但刪除也是假刪除,只是給 Message 打上一個刪除的 Tag。

如果本地事務執行失敗進行rollback,則直接刪除這條 Half Message ,但刪除也是假刪除。

如果本地事務遲遲沒有返回結果 (默認時間是6s),則會觸發事務回查機制

  • 執行回查之前需要校驗檢查次數是否到達了最大值(需要手動設置,沒有默認值)。
  • 或者是當前 Half Message 存在是否超過了 Message 保存的上限,即 3天。
  • 如果滿足上面條件中的一種Half Message 會被放進 TRANS_CHECK_MAX_TIME_TOPIC Topic 當中。
  • 一旦判定為需要執行事務回查邏輯,那么當前這條 Half Message 就算已經被消費了。
  • 在沒達到最大的校驗次數之前,都還需要將其投遞到事務隊列當中,以便下次重試時再次執行 Check 邏輯。
  • 如果回查成功則刪除投遞的 Half Message。

源碼解讀

發送事務消息調用的是TransactionMQProducer的sendMessageInTransaction方法:

主要有以下幾個步驟:

?獲取事務監聽器TransactionListener,如果獲取為空或者本地事務執行器LocalTransactionExecuter為空將拋出異常。

因為需要通過TransactionListener或者LocalTransactionExecuter來執行本地事務,所以不能為空。

在消息中設置prepared屬性,此時與普通消息(非事務消息)相比多了PROPERTY_TRANSACTION_PREPARED屬性。

調用send方法發送prepared消息也就是half消息,發送消息的流程與普通消息一致。

根據消息的發送結果判斷:

  • 如果發送成功執行本地事務,并返回本地事務執行結果狀態,如果返回的執行狀態結果為空,將本地事務狀態設置為UNKNOW。
  • 發送成功之外的其他情況,包括FLUSH_DISK_TIMEOUT刷盤超時、FLUSH_SLAVE_TIMEOUT和SLAVE_NOT_AVAILABLE從節點不可用三種情況。
  • 此時意味著half消息發送失敗,本地事務狀態置為ROLLBACK_MESSAGE回滾消息。

調用endTransaction方法結束事務。

參考

《RocketMQ技術內幕》

https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md。

https://github.com/apache/rocketmq/blob/master/docs/cn/design.md。

責任編輯:姜華 來源: 月伴飛魚
相關推薦

2023-07-17 08:34:03

RocketMQ消息初體驗

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2021-10-03 21:41:13

RocketMQKafkaPulsar

2021-04-15 09:17:01

SpringBootRocketMQ

2024-09-25 08:32:05

2024-08-22 18:49:23

2024-02-04 09:02:29

RocketMQ項目處理器

2023-09-04 08:00:53

提交事務消息

2024-10-11 09:15:33

2022-07-04 11:06:02

RocketMQ事務消息實現

2024-06-13 09:25:14

2024-10-22 08:01:15

2021-03-04 06:49:53

RocketMQ事務

2022-12-22 10:03:18

消息集成

2023-07-18 09:03:01

RocketMQ場景消息

2025-04-09 08:20:00

RocketMQ消息隊列開發

2022-06-02 08:21:07

RocketMQ消息中間件

2023-12-21 08:01:41

RocketMQ消息堆積

2022-03-31 08:26:44

RocketMQ消息排查

2021-07-14 17:18:14

RocketMQ消息分布式
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 99国产精品一区二区三区 | 蜜桃av人人夜夜澡人人爽 | 四虎精品在线 | 亚洲婷婷一区 | 华人黄网站大全 | 男女网站在线观看 | 亚洲国产成人精品女人 | 一级看片| 国产成人在线一区二区 | 亚洲一区在线日韩在线深爱 | 免费观看国产视频在线 | 欧美黑人国产人伦爽爽爽 | 色综合视频 | 午夜91| h视频在线播放 | 97精品国产97久久久久久免费 | 国产精品国产精品国产专区不卡 | 午夜a√ | 国产精品久久久久久久久久久久冷 | 午夜在线观看视频 | 天天综合国产 | 男人的天堂一级片 | 欧美一区免费 | 欧美精品在欧美一区二区少妇 | av大片 | 一区二区三区不卡视频 | 999久久久久久久久6666 | 97色在线视频 | 成人小视频在线观看 | 男人天堂999 | 91精品国产综合久久久动漫日韩 | 91色在线| 中文字幕亚洲精品 | 国产不卡视频 | 狠狠久久综合 | 免费国产网站 | 国产中文字幕在线观看 | 日韩中文字幕久久 | 99国内精品久久久久久久 | 91视频进入 | 成人区精品一区二区婷婷 |