聊聊 RocketMQ 4.X 知識體系
本文將帶您深入了解 RocketMQ 4.X 的核心知識體系,從架構設計到關鍵機制,一探這款高可用消息中間件的底層邏輯。
一、整體架構
RocketMQ 4.X 架構中包含四種角色 :
1.NameServer
名字服務是是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。它是一個非常簡單的 Topic 路由注冊中心,其角色類似 Dubbo 中的 zookeeper ,支持 Broker 的動態注冊與發現。
2.BrokerServer
Broker 主要負責消息的存儲、投遞和查詢以及服務高可用保證 。
3.Producer
消息發布的角色,Producer 通過 MQ 的負載均衡模塊選擇相應的 Broker 集群隊列進行消息投遞,投遞的過程支持快速失敗并且低延遲。
4.Consumer
消息消費的角色,支持以 push 推,pull 拉兩種模式對消息進行消費。
圖片
RocketMQ 集群工作流程:
1、啟動 NameServer,NameServer 起來后監聽端口,等待 Broker、Producer 、Consumer 連上來,相當于一個路由控制中心。
2、Broker 啟動,跟所有的 NameServer 保持長連接,定時發送心跳包。心跳包中包含當前 Broker信息( IP+端口等 )以及存儲所有 Topic 信息。注冊成功后,NameServer 集群中就有 Topic 跟 Broker 的映射關系。
3、收發消息前,先創建 Topic,創建 Topic 時需要指定該 Topic 要存儲在哪些 Broker 上,也可以在發送消息時自動創建 Topic。
4、Producer 發送消息,啟動時先跟 NameServer 集群中的其中一臺建立長連接,并從 NameServer 中獲取當前發送的 Topic 存在哪些 Broker 上,輪詢從隊列列表中選擇一個隊列,然后與隊列所在的 Broker 建立長連接從而向 Broker 發消息。
5、Consumer 跟 Producer 類似,跟其中一臺 NameServer 建立長連接,獲取當前訂閱 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立連接通道,開始消費消息。
二、發布訂閱模型
傳統的消息隊列 ActiveMQ 是典型的點對點模式。
圖片來自公眾號武哥漫談IT
圖片來自公眾號武哥漫談IT
- 在點對點模式中,消息發送者(生產者)將消息發送到一個特定的隊列,而消息接收者(消費者)從該隊列中接收消息。
- 消息在隊列中存儲,一旦一個消息被消費者接收,它就從隊列中移除,這確保了每個消息只被一個消費者處理。
- 這種模式適用于一對一的通信,其中一個生產者向一個特定的消費者發送消息,確保消息的可靠傳遞和處理。
RocketMQ 和 Kafka 是發布訂閱模式。
圖片來自公眾號武哥漫談IT
圖片來自公眾號武哥漫談IT
- 在發布訂閱模式中,消息發送者將消息發布到一個主題(topic),而消息訂閱者則訂閱感興趣的主題。
- 每個主題可以有多個訂閱者,因此消息會被廣播到所有訂閱了相同主題的消費者。
- 這種模式適用于一對多或多對多的通信,允許多個消費者同時接收和處理相同主題的消息。
- 發布訂閱模式通常用于構建實時事件處理系統、日志處理、通知系統等,其中多個消費者需要訂閱相同類型的消息并進行處理。
三、通訊框架
圖片
1.通訊協議
傳輸內容分為以下四個部分:
1)消息長度:總長度,四個字節存儲,占用一個 int 類型;
2)序列化類型 & 消息頭長度:占用一個 int 類型,第一個字節表示序列化類型,后面三個字節表示消息頭長度;
3)消息頭數據:經過序列化后的消息頭數據;
4)消息主體數據:消息主體的二進制字節數據內容。
消息頭數據序列化默認是 JSON 格式 ,示例如下:
圖片
2.Reactor 模型
Reactor 線程模型抽象出三種組件:
- Reactor(反應器):Reactor 負責監聽和分發事件,它是整個 Reactor 模型的調度中心。
- Acceptor(接收器):用于處理 IO 連接請求。
- Handlers(處理器):Handlers 負責具體的事件處理邏輯,即執行與事件相關的業務操作
Remoting 通訊框架采用了典型的主從多線程模型 ,但還是有變化,即:獨立的業務線程池對應不同的請求業務類型。
圖片
一個 Reactor 主線程 ( eventLoopGroupBoss
)責監聽 TCP網絡連接請求,建立好連接,創建 SocketChannel , 并注冊到 selector 上。
RocketMQ 源碼會自動根據 OS 的類型選擇 NIO 和 Epoll ,也可以通過參數配置 ), 然后監聽真正的網絡數據。
拿到網絡數據后,再丟給 Worker 線程池(eventLoopGroupSelector ),再真正執行業務邏輯之前需要進行 SSL 驗證、編解碼、空閑檢查、網絡連接管理,這些工作都交給 defaultEventExecutorGroup 去做。
而業務操作由業務線程池中處理,根據 RemotingCommand 的業務請求編號 requestCode , 從處理器表 processorTable 這個本地緩存中找到對應的處理器 , 然后封裝成 task 任務后,提交到對應的業務處理器的線程池執行。
圖片
RocketMQ 的線程模型如下所示 :
線程數 | 線程名 | 線程具體說明 |
1 | NettyBoss_%d | Reactor 主線程 |
N | NettyServerEPOLLSelector_%d_%d | Reactor 線程池 |
M1 | NettyServerCodecThread_%d | Worker線程池 |
M2 | RemotingExecutorThread_%d | 業務 processor 處理線程池 |
三、文件存儲機制
我們先進入 broker 的文件存儲目錄 。
圖片
消息存儲和下面三個文件關系非常緊密:
- 數據文件 commitlog
消息主體以及元數據的存儲主體 ; - 消費文件 consumequeue
消息消費隊列,引入的目的主要是提高消息消費的性能 ; - 索引文件 indexfile
索引文件,提供了一種可以通過 key 或時間區間來查詢消息。
圖片
RocketMQ 采用的是混合型的存儲結構,Broker 單個實例下所有的隊列共用一個數據文件(commitlog)來存儲。
生產者發送消息至 Broker 端,然后 Broker 端使用同步或者異步的方式對消息刷盤持久化,保存至 commitlog 文件中。只要消息被刷盤持久化至磁盤文件 commitlog 中,那么生產者發送的消息就不會丟失。
Broker 端的后臺服務線程會不停地分發請求并異步構建 consumequeue(消費文件)和 indexfile(索引文件)。
四、高性能讀寫
1.順序寫
首先消息是一條一條寫入到文件,每條消息的格式是固定的,這種設計對于文件讀寫來講有兩點優勢:
磁盤的存取速度相對內存來講并不快,一次磁盤 IO 的耗時主要取決于:尋道時間和盤片旋轉時間,提高磁盤 IO 性能最有效的方法就是:減少隨機 IO,增加順序 IO 。
圖片
《 The Pathologies of Big Data 》這篇文章指出:內存隨機讀寫的速度遠遠低于磁盤順序讀寫的速度。磁盤順序寫入速度可以達到幾百兆/s,而隨機寫入速度只有幾百 KB /s,相差上千倍。
因為消息是一條一條寫入到 commitlog 文件 ,寫入完成后,我們可以得到這條消息的物理偏移量。
每條消息的物理偏移量是唯一的, commitlog 文件名是遞增的,可以根據消息的物理偏移量通過二分查找,定位消息位于那個文件中,并獲取到消息實體數據。
2.內存映射機制
mmap 是 Linux 提供的一種內存映射文件的機制,它實現了將內核中讀緩沖區地址與用戶空間緩沖區地址進行映射,從而實現內核緩沖區與用戶緩沖區的共享。
圖片
基于 mmap + write 系統調用的零拷貝方式,整個拷貝過程會發生 4 次上下文切換,1 次 CPU 拷貝和 2 次 DMA 拷貝。
圖片
用戶程序讀寫數據的流程如下:
- 用戶進程通過 mmap() 函數向內核發起系統調用,上下文從用戶態切換為內核態。
- 將用戶進程的內核空間的讀緩沖區與用戶空間的緩存區進行內存地址映射。
- CPU 利用 DMA 控制器將數據從主存或硬盤拷貝到內核空間的讀緩沖區。
- 上下文從內核態切換回用戶態,mmap 系統調用執行返回。
- 用戶進程通過 write() 函數向內核發起系統調用,上下文從用戶態切換為內核態。
- CPU 將讀緩沖區中的數據拷貝到的網絡緩沖區。
- CPU 利用 DMA 控制器將數據從網絡緩沖區(socket buffer)拷貝到網卡進行數據傳輸。
- 上下文從內核態切換回用戶態,write 系統調用執行返回。
拷貝方式 | CPU拷貝 | DMA拷貝 | 系統調用 | 上下文切換 |
傳統方式(read + write) | 2 | 2 | read / write | 4 |
內存映射(mmap + write) | 1 | 2 | mmap / write | 4 |
sendfile | 1 | 2 | sendfile | 2 |
sendfile + DMA gather copy | 0 | 2 | sendfile | 2 |
RocketMQ 選擇了 mmap + write 這種零拷貝方式,適用于業務級消息這種小塊文件的數據持久化和傳輸;
而 Kafka 采用的是 sendfile 這種零拷貝方式,適用于系統日志消息這種高吞吐量的大塊文件的數據持久化和傳輸。
五、消費流程
圖片
核心流程如下:
- 消費者啟動后,觸發負載均衡服務 ,負載均衡服務為消費者實例分配對應的隊列 ;
- 分配完隊列后,負載均衡服務會為每個分配的新隊列創建一個消息拉取請求
pullRequest
, 拉取請求保存一個處理隊列processQueue
,內部是紅黑樹(TreeMap
),用來保存拉取到的消息 ; - 拉取消息服務單線程從拉取請求隊列
pullRequestQueue
中彈出拉取消息,執行拉取任務 ,拉取請求是異步回調模式,將拉取到的消息放入到處理隊列; - 拉取請求在一次拉取消息完成之后會復用,重新被放入拉取請求隊列
pullRequestQueue
中 ; - 拉取完成后,調用消費消息服務
consumeMessageService
的submitConsumeRequest
方法 ,消費消息服務內部有一個消費線程池; - 消費線程池的消費線程從消費任務隊列中獲取消費請求,執行消費監聽器
listener.consumeMessage
; - 消費完成后,若消費成功,則更新偏移量
updateOffset
,先更新到內存offsetTable
,定時上報到 Broker ;若消費失敗,則將失敗消費發送到 Broker 。 - Broker 端接收到請求后, 調用消費進度管理器的
commitOffset
方法修改內存的消費進度,定時刷盤到consumerOffset.json
。
六、傳統部署模式
1.雙 Master 模式
所有節點都是 master 主節點(比如 2 個或 3 個主節點),沒有 slave 從節點的模式。
圖片
該模式的優缺點如下:
- 優點
配置簡單 , 性能極高。一個 master 節點的宕機或者重啟(維護)對應用程序沒有影響。
當磁盤配置為 RAID10 時,消息不會丟失,因為 RAID10 磁盤非??煽?,即使機器不可恢復(消息異步刷盤模式的情況下,會丟失少量消息;如果消息是同步刷盤模式,不會丟失任何消息)。
- 缺點
單臺機器宕機時,本機未消費的消息,直到機器恢復后才會訂閱,影響消息實時性。
2.多 Master 多 Slave(異步)
每個主節點配置多個從節點,多對主從。HA 采用異步復制,主節點和從節點之間有短消息延遲(毫秒)。
圖片
所謂異步復制,是指消息發送到的 master 后直接返回,不必等待主從復制,而是內部通過異步的方式進行復制。
圖片
這種模式的優缺點如下:
- 優點
即使磁盤損壞,也只會丟失極少的消息,不影響消息的實時性能。
同時,當主節點宕機時,消費者仍然可以消費從節點的消息,這個過程對應用本身是透明的,不需要人為干預。
性能幾乎與多 Master 模式一樣高。
- 缺點:
主節點宕機、磁盤損壞時,會丟失少量消息。
3.多 Master 多 Slave (同步)
每個 master 節點配置多個 slave 節點,有多對 Master-Slave 。
HA 采用同步雙寫,即只有消息成功寫入到主節點并復制到多個從節點,才會返回成功響應給應用程序。
圖片
異步復制指 producer 發送一條消息給 broker 的主節點,只有主節點將數據同步到從節點才會返回結果。
圖片
這種模式的優缺點如下:
- 優點
數據和服務都沒有單點故障。在 master 節點關閉的情況下,消息也沒有延遲。同時服務可用性和數據可用性非常高。
- 缺點:
這種模式下的性能略低于異步復制模式(大約低 10%)。發送單條消息的 RT 略高,目前版本,master 節點宕機后,slave 節點無法自動切換到 master 。
七、Deleger 集群部署
在 RocketMQ 4.5 版本之前,RocketMQ 只有一種 Master/Slave 的部署方式。在這種模式下,一組 broker 包含一個 Master 和零到多個 Slave,Slave 通過同步或異步復制的方式與 Master 保持數據一致。
但這種部署模式提供了一定程度的高可用性,但也存在一些缺陷。例如,在故障轉移方面,如果主節點發生故障,仍然需要手動重啟或切換,無法自動將一個從節點轉換為主節點。
因此,核心問題是:多副本架構需要解決自動故障轉移的問題,也就是自動選主。
這個問題的解決方案基本可以分為兩種:
1、第三方協調服務
我們利用第三方協調服務集群(如 Zookeeper 或 etcd)進行選主,但這樣會引入額外的外部組件,增加了部署、運維和故障診斷的成本,我們不僅需要維護 RocketMQ 集群,還需要維護 Zookeeper 集群。
所以,我們看到 Kafka 的新版本已經擯棄了 Zookeeper 而是選擇了第二種方案。
2、不需要引入外部組件,使用 Raft 協議進行自動選主
自動選主邏輯集成在各個節點的進程中,節點之間通過通信即可完成選主。
因此,最終選擇 Raft 協議來解決這個問題,而 DLedger 就是基于 Raft 協議的 commitlog 存儲庫,是 RocketMQ 實現新的高可用多副本架構的關鍵。
圖片
如圖,我們定義了兩個 DLedger Group ,分別是:RaftNode00 和 RaftNode01。
每個 DLedger Group 要求包含 至少 3 臺機器 部署,每臺機器部署 Broker 服務 , 機器數量為奇數。
通過 Raft 自動選舉出一個 Leader,其余節點作為 Follower,并在 Leader 和 Follower 之間復制數據以保證高可用。
RocketMQ 的 DLedger 模式能自動容災切換,并保證數據一致,同時支持水平擴展的,即:部署任意多個 RocketMQ Group 同時對外提供服務。
圖片
八、事務消息
RocketMQ 事務消息是支持在分布式場景下保障消息生產和本地事務的最終一致性。交互流程如下圖所示:
圖片
1、生產者將消息發送至 Broker 。
2、Broker 將消息持久化成功之后,向生產者返回 Ack 確認消息已經發送成功,此時消息被標記為"暫不能投遞",這種狀態下的消息即為半事務消息。
3、生產者開始執行本地事務邏輯。
4、生產者根據本地事務執行結果向服務端提交二次確認結果( Commit 或是 Rollback ),Broker 收到確認結果后處理邏輯如下:
- 二次確認結果為 Commit :Broker 將半事務消息標記為可投遞,并投遞給消費者。
- 二次確認結果為 Rollback :Broker 將回滾事務,不會將半事務消息投遞給消費者。
5、在斷網或者是生產者應用重啟的特殊情況下,若 Broker 未收到發送者提交的二次確認結果,或 Broker 收到的二次確認結果為 Unknown 未知狀態,經過固定時間后,服務端將對消息生產者即生產者集群中任一生產者實例發起消息回查。
- 生產者收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。
- 生產者根據檢查到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟4對半事務消息進行處理。
九、廣播消息
當使用 RocketMQ 廣播消費模式時,每條消息推送給集群內所有的消費者,保證消息至少被每個消費者消費一次。
圖片
廣播消費主要用于兩種場景:消息推送和緩存同步。
1.消息推送
筆者第一次接觸廣播消費的業務場景是神州專車司機端的消息推送。
用戶下單之后,訂單系統生成專車訂單,派單系統會根據相關算法將訂單派給某司機,司機端就會收到派單推送。
圖片
推送服務是一個 TCP 服務(自定義協議),同時也是一個消費者服務,消息模式是廣播消費。
司機打開司機端 APP 后,APP 會通過負載均衡和推送服務創建長連接,推送服務會保存 TCP 連接引用 (比如司機編號和 TCP channel 的引用)。
派單服務是生產者,將派單數據發送到 MetaQ , 每個推送服務都會消費到該消息,推送服務判斷本地內存中是否存在該司機的 TCP channel , 若存在,則通過 TCP 連接將數據推送給司機端。
肯定有同學會問:假如網絡原因,推送失敗怎么處理 ?有兩個要點:
- 司機端 APP 定時主動拉取派單信息;
- 當推送服務沒有收到司機端的 ACK 時 ,也會一定時限內再次推送,達到閾值后,不再推送。
2.緩存同步
高并發場景下,很多應用使用本地緩存,提升系統性能 。
圖片
如上圖,應用A啟動后,作為一個 RocketMQ 消費者,消息模式設置為廣播消費。為了提升接口性能,每個應用節點都會將字典表加載到本地緩存里。
當字典表數據變更時,可以通過業務系統發送一條消息到 RocketMQ ,每個應用節點都會消費消息,刷新本地緩存。
十、順序消息
順序消息是指對于一個指定的 Topic ,消息嚴格按照先進先出(FIFO)的原則進行消息發布和消費,即先發布的消息先消費,后發布的消息后消費。
順序消息分為分區順序消息和全局順序消息。
1.分區順序消息
對于指定的一個 Topic ,所有消息根據 Sharding Key 進行區塊分區,同一個分區內的消息按照嚴格的先進先出(FIFO)原則進行發布和消費。同一分區內的消息保證順序,不同分區之間的消息順序不做要求。
- 適用場景:適用于性能要求高,以 Sharding Key 作為分區字段,在同一個區塊中嚴格地按照先進先出(FIFO)原則進行消息發布和消費的場景。
- 示例:電商的訂單創建,以訂單 ID 作為 Sharding Key ,那么同一個訂單相關的創建訂單消息、訂單支付消息、訂單退款消息、訂單物流消息都會按照發布的先后順序來消費。
2.全局順序消息
對于指定的一個 Topic ,所有消息按照嚴格的先入先出(FIFO)的順序來發布和消費。
- 適用場景:適用于性能要求不高,所有的消息嚴格按照 FIFO 原則來發布和消費的場景。
- 示例:在證券處理中,以人民幣兌換美元為 Topic,在價格相同的情況下,先出價者優先處理,則可以按照 FIFO 的方式發布和消費全局順序消息。
全局順序消息實際上是一種特殊的分區順序消息,即 Topic 中只有一個分區,因此全局順序和分區順序的實現原理相同。
因為分區順序消息有多個分區,所以分區順序消息比全局順序消息的并發度和性能更高。
圖片
消息的順序需要由兩個階段保證:
- 消息發送如上圖所示,A1、B1、A2、A3、B2、B3 是訂單 A 和訂單 B 的消息產生的順序,業務上要求同一訂單的消息保持順序,例如訂單 A 的消息發送和消費都按照 A1、A2、A3 的順序。如果是普通消息,訂單A 的消息可能會被輪詢發送到不同的隊列中,不同隊列的消息將無法保持順序,而順序消息發送時 RocketMQ 支持將 Sharding Key 相同(例如同一訂單號)的消息序路由到同一個隊列中。下圖是生產者發送順序消息的封裝,原理是發送消息時,實現 MessageQueueSelector 接口, 根據 Sharding Key 使用 Hash 取模法來選擇待發送的隊列。
生產者順序發送消息封裝
- 生產者順序發送消息封裝
- 消息消費
消費者消費消息時,需要保證單線程消費每個隊列的消息數據,從而實現消費順序和發布順序的一致。
順序消費服務的類是 ConsumeMessageOrderlyService ,在負載均衡階段,并發消費和順序消費并沒有什么大的差別。
最大的差別在于:順序消費會向 Borker 申請鎖 。消費者根據分配的隊列 messageQueue ,向 Borker 申請鎖 ,如果申請成功,則會拉取消息,如果失敗,則定時任務每隔20秒會重新嘗試。
圖片
十一、架構缺點
RocketMQ 包含兩種部署架構: Master-Slave 架構 和 Deleger 架構 。
圖片
首先是 Master-Slave 架構,它的問題很明顯,由于組內沒有 failover 能力,所以
- Master 故障后,故障組的消息發送將會中斷。雖然客戶端可以向其他 Master 進行發送,但Topic整體可寫入分區數將減少并短時間內無法恢復,這會影響對分區敏感的業務,比如順序消息或者流計算應用。
- Master 故障后,一些僅限于在Master上進行的操作將無法進行,這里包括一些順序消息的上鎖,管控中searchOffset、maxOffset、minOffset等操作,會影響到順序消息的消費以及一些管控操作。
- Master故障后,故障Broker組上的二級消息消費將會中斷,二級消息特點是它可以分為兩個階段,第一階段是把消息發送到CommitLog上的特殊Topic,第二階段是將滿足要求的消息還原投放回CommitLog。比如延遲消息,第一階段是投放到名為SCHEDULE_TOPIC_XXXX的Topic上,等掃描線程發現消息到期后再還原成原來的Topic重新投遞,這樣它就能被下游消費到。
但如果Master Broker下線,掃描和重投放都會停止,因此會出現二級消息的消費延遲或丟失,具體會影響到延遲消息、事務消息等二級消息。
然后是 Deleger 架構 ,通過 Master 故障后短時間內重新選出新的 Master 來解決上述問題,但是由于 Raft 選主和復制能力在復制鏈路上,因此存在以下問題:
- Broker 組內的副本數必須是 3副本 及以上才有切換能力,因此成本是有上升的。
- Raft 多數派限制導致三副本副本必須兩副本響應才能返回,五副本需要三副本才能返回,因此ACK是不夠靈活的,這也導致發送延遲和副本冗余間沒有一個很好的可協商的方案。
- 由于存儲復制鏈路用的是 OpenMessaging DLedger庫,導致 RocketMQ 原生的一些存儲能力沒辦法利用,包括像 TransientPool、零拷貝的能力,如果要在Raft模式下使用的話,就需要移植一遍到DLedger庫,開發特性以及bug修復也需要做兩次,這樣的維護和開發成本是非常高的。
同時,我們提到了 RocketMQ 4.X 的消費流程,它的消費邏輯有兩個非常明顯的特點:
- 客戶端代碼邏輯較重。假如要支持一種新的編程語言,那么客戶端就必須實現完整的負載均衡邏輯,此外還需要實現拉消息、位點管理、消費失敗后將消息發回 Broker 重試等邏輯。這給多語言客戶端的支持造成很大的阻礙。
- 保證冪等非常重要。當客戶端升級或者下線時,或者 Broker 宕機,都要進行負載均衡操作,可能造成消息堆積,同時有一定幾率造成重復消費。
RocketMQ 5.0 引入了全新的彈性無狀態代理模式,將當前的Broker職責進行拆分,對于客戶端協議適配、權限管理、消費管理等計算邏輯進行抽離,獨立無狀態的代理角色提供服務,Broker則繼續專注于存儲能力的持續優化。這套模式可以更好地實現在云環境的資源彈性調度。 值得注意的是RocketMQ 5.0的全新模式是和4.0的極簡架構模式相容相通的,5.0的代理架構完全可以以Local模式運行,實現與4.0架構完全一致的效果。開發者可以根據自身的業務場景自由選擇架構部署。
圖片