RocketMQ 消息集成:多類型業務消息 - 普通消息
引言
Apache RocketMQ 誕生至今,歷經十余年大規模業務穩定性打磨,服務了 100% 阿里集團內部業務以及阿里云數以萬計的企業客戶。作為金融級可靠的業務消息方案,RocketMQ 從創建之初就一直專注于業務集成領域的異步通信能力構建。本篇將從業務集成場景的訴求開始,介紹 RocketMQ 作為業務消息集成方案的核心能力和優勢,通過功能場景、應用案例以及最佳實踐等角度介紹 RocketMQ 普通消息類型的使用。
說起業務集成場景,RocketMQ 最初的使用場景就是典型代表。RocketMQ 誕生于阿里的電商系統,電商系統經常需要做各種大促活動,在這類復雜需求場景下對消息系統的吞吐性能、端到端延遲、削峰填谷等能力有著極高的要求。
一句話概括今天的核心問題,跑在核心交易業務鏈路的消息有什么特點,有什么要求,和跑在離線分析等場景的消息有什么不同。下面和大家一起來探討~
業務集成 vs 數據集成
集成目標不同
做業務核心架構設計時,很多時候需要面向上層需求去完成業務邏輯的設計。以電商交易場景為例,通過微服務的拆分,可能在整個鏈路中會拆成很多個環節,不同應用之間通過消息去集成時,更多的是關注用戶訂單的流轉過程,關注這個業務邏輯是否會正常的處理,這個就是業務集成。
對比一下,數據集成是以數據為中心,更多的是關注業務集成產生的數據,去分析這些業務數據的價值。數據集成并不關心這個數據是從哪里來,只關心數據本身的屬性和數據之間的關系。
關注重點不同
在業務集成里隨著企業業務邏輯的拓寬和復雜度的提升,調用和被調用方之間的耦合性會逐步增加,鏈路的拓撲也會變得越來越復雜。經常會出現一條消息的上游是另一條消息的下游,一個服務可能既是發送方也是消費方,等等。
而在數據集成的場景里面,并不關注上述鏈路,更多是關注數據的多樣性。也就是說,在做數據集成分析時,更多的是從各種異構的數據源里去提取、匯聚這些數據,然后把這些異構系統的數據聚合在一起做清洗,最終匯聚成結構化的數據或報表去做分析。數據集成更多是關注數據的異構性和多樣性。
實時性不同
業務集成簡單理解就是一種在線的邏輯,或者是一種強實時的邏輯。在這個業務集成領域,無論同步調用還是異步調用,都對調用和被調用之間的響應協同機制有一定的要求。舉個例子,一個訂單的處理必須是要在毫秒級完成,否則用戶的體驗會非常的差。
但是在數據集成領域,更多的可能是近實時甚至是離線非實時的場景,也就是說通過批、實時流或近實時流的 場景去爬取數據之后做分析,具體鏈路對于用戶來說并不是可見的,這也是數據集成和業務集成側重點的差異。
業務集成對消息系統的核心訴求
消息隊列是企業業務集成的主要模式之一,它是一種異步通信模式。異步模式提供了低耦合、高可靠、可觀測的異步通信能力。那么業務集成鏈路里使用消息之后會帶來什么效果呢?這里稍微羅列一下。
上圖就是一個比較典型的上層的應用鏈路,從應用 A 到下層的應用 B 的一個單鏈路,通過發送初始化或者結構化一個消息,作為調用事件發送到事件通道,這個通道就是消息系統,比如 RocketMQ、RabbitMQ 等。在時間通道里存儲后通過過濾路由的分發組件匹配到下游,然后推送處理。與此同時,還會有可觀測、運維、監控的一些體系去支撐這個鏈路的可靠運行。
完整的功能需求非常多,這里提煉業務集成對消息系統的四個核心訴求:
1)多類型消息傳輸:支持多樣業務場景集成訴求,主要包括普通消息、定時消息、事務消息、順序消息等;
2)豐富路由分發能力:支持多種分發路由條件,包括 Tag 過濾、消息屬性過濾,一對多、一對一分發等;
3)多樣交互模式:支持收發消息多樣交互方式,支持同步、異步發送,支持主動消費、被動推送消費,支持流式應答、單條應答;
4)可觀測體系:支持 Metrics、Trace、Events 分析,支持單鏈路、全鏈路軌跡追蹤,支持 Metrics 分析和監控告警,支持系統運行事件、業務事件透出處理。
RocketMQ 作為非常典型的業務消息方案,正是對應上述業務集成的訴求,提供了完善的消息功能、豐富的客戶端接口以及完善的可觀測體系和穩定性保障機制。
接下來就開始逐步拆解 RocketMQ 的多類型消息,本篇主要介紹普通消息。
普通消息原理介紹
功能簡介
在多種消息類型中,普通消息是最簡單也最為重要。普通消息是 RocketMQ 的基本消息類型,提供高吞吐、擴展、低延遲、異步的通信能力。其他高級消息類型基本都是在這種普通消息類型的基礎上疊加了獨有的控制特性,或者是特定的使用的方式。
下面這張圖就是普通消息的一個典型的拓撲,和消息隊列典型場景一樣,生產者發送消息,發送普通消息到服務端去存儲,存儲完之后,會把消息按照訂閱關系的匹配,最后推送給下游的消費方去做消費。
普通消息的特點
1)原子性:消息之間沒有關聯關系,收發處理邏輯原子;
2)擴展性:普通消息容量、能力可擴展,支持多隊列存儲、水平拆分、并發消費;
3)低延遲:普通消息鏈路短,交互簡單,狀態簡單,鏈路極簡,毫秒級低延遲通信。
消息的生命周期
普通消息從初始化發送開始到最終被處理的過程中會經歷多個狀態和過程,而了解消息的生命周期,可以幫助我們去判斷線上出現問題后如何快速定位和解決。
簡單來說消息的生命周期可以抽象成五個狀態:
- 初始化:普通消息被生產者構建初始化完成,待發送到服務端的狀態;
- 待消費:消息被傳輸到服務端,對下游可見,等待消費者獲取處理的狀態;
- 消費中:消息被消費者獲取,并按照業務邏輯處理過程,此時服務端會等待消費完成,如果一定時間后沒有收到消費提交的事件,消息還會重試處理;
- 消費提交:消費者完成消息處理,并提交應答事件到服務端,服務端標記當前消息已經被處理(包括消費成功和失敗)。RocketMQ默認支持所有消息保留,此時消息數據并不會立即被刪除,只是邏輯標記完成,在消息被物理刪除之前,消費者仍然可以回溯重新處理消息;
- 消息刪除:RocketMQ 按照消息保存時間機制滾動清理最早的消息數據,將消息從物理文件中刪除。
普通消息應用場景和案例
簡單的了解原理和基本介紹之后,那普通消息主要用在哪里呢?普通消息是RocketMQ應用最廣泛,使用規模最大的一種消息類型,它主要集中在服務間的解耦調用,同時還有一些批量數據的采集傳輸等場景。
使用場景
1)微服務調用解耦
- 異步化解耦:普通消息實現微服務異步調用,縮短業務流和響應時間。
- 流量削峰填谷:普通消息海量堆積能力,解決流量峰值下游處理能力不足的穩定性風險。
2)實時數據傳輸
- 高吞吐傳輸:普通消息可以實現無限水平擴展,數據傳輸吞吐高,解決采集上報問題。
- 實時傳輸:普通消息實時傳輸投遞,下游可以及時消費實現計算和分析。
案例介紹
1)場景簡介
交易平臺是買賣家在線上根據約定的契約完成錢貨交換的過程涉及的系統。交易平臺涉及到和支付、物流、下單、運營等多個子系統的交互大多使用 RocketMQ 普通消息做異步解耦,消息的可靠處理是電商大促保障的核心。
2)核心痛點
訂單狀態機復雜,需要縮短鏈路時間:訂單生命周期長,涉及下游多個子系統流轉,同步調用耗時長,用戶體驗差。
大促場景海量訂單處理,下游壓力大:大促場景訂單流量大,各子系統處理能力不足導致系統崩潰。
分布式場景訂單變化持久化和下游調用事務性:訂單狀態流轉需要確保數據庫狀態變更和下游調用同時成功或者失敗,即事務性。
快速上手收發消息
說了這么多場景和案例,直接看一下代碼怎么用。
發送普通消息
發送消息的流程非常簡單,但這其中需要注意以下幾點:
- 消息初始化應盡可能完整:普通消息初始化包括主題、Tag 標簽、索引 Key 和負載。可以按實際情況設置完成。
- 消息發送需要捕獲結果和異常:消息發送完成需要獲取響應結果,如果失敗需要捕獲異常并做重試處理。
消費普通消息
RocketMQ 支持的消費方式有多種,有主動獲取的方式,也有被動消費監聽器推送的方式。
被動消費方式只需要注冊消費監聽器,然后監聽器內部去處理這個邏輯,最終返回消費結果。如果消費失敗,希望 RocketMQ 再做重投,就要返回一個失敗的結果;拋異常也是返回失敗。類似于這樣的結果,返回服務端就完成了整個消費的過程。
對于主動獲取的方式,會更加靈活,由業務方主動調用獲取消息,可以按照自己的速率和并發取消息,處理完成后,再回復 RocketMQ 服務端消費結果。