成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

萬字長文講透 RocketMQ 的消費(fèi)邏輯

開發(fā) 項(xiàng)目管理
名字服務(wù)是是一個(gè)幾乎無狀態(tài)節(jié)點(diǎn),可集群部署,節(jié)點(diǎn)之間無任何信息同步。它是一個(gè)非常簡單的 Topic 路由注冊(cè)中心,其角色類似 Dubbo 中的 zookeeper ,支持 Broker 的動(dòng)態(tài)注冊(cè)與發(fā)現(xiàn)。

RocketMQ 是筆者非常喜歡的消息隊(duì)列,4.9.X 版本是目前使用最廣泛的版本,但它的消費(fèi)邏輯相對(duì)較重,很多同學(xué)學(xué)習(xí)起來沒有頭緒。

這篇文章,筆者梳理了 RocketMQ 的消費(fèi)邏輯,希望對(duì)大家有所啟發(fā)。

圖片

一、架構(gòu)概覽

在展開集群消費(fèi)邏輯細(xì)節(jié)前,我們先對(duì) RocketMQ 4.9.X 架構(gòu)做一個(gè)概覽。

圖片

整體架構(gòu)中包含四種角色 :

1、NameServer

名字服務(wù)是是一個(gè)幾乎無狀態(tài)節(jié)點(diǎn),可集群部署,節(jié)點(diǎn)之間無任何信息同步。它是一個(gè)非常簡單的 Topic 路由注冊(cè)中心,其角色類似 Dubbo 中的 zookeeper ,支持 Broker 的動(dòng)態(tài)注冊(cè)與發(fā)現(xiàn)。

2、BrokerServer

Broker 主要負(fù)責(zé)消息的存儲(chǔ)、投遞和查詢以及服務(wù)高可用保證 。

3、Producer

消息發(fā)布的角色,Producer 通過 MQ 的負(fù)載均衡模塊選擇相應(yīng)的 Broker 集群隊(duì)列進(jìn)行消息投遞,投遞的過程支持快速失敗并且低延遲。

4、Consumer

消息消費(fèi)的角色,支持以 push 推,pull 拉兩種模式對(duì)消息進(jìn)行消費(fèi)。

RocketMQ 集群工作流程:

1、啟動(dòng) NameServer,NameServer 起來后監(jiān)聽端口,等待 Broker、Producer 、Consumer 連上來,相當(dāng)于一個(gè)路由控制中心。

2、Broker 啟動(dòng),跟所有的 NameServer 保持長連接,定時(shí)發(fā)送心跳包。心跳包中包含當(dāng)前 Broker信息( IP+端口等 )以及存儲(chǔ)所有 Topic 信息。注冊(cè)成功后,NameServer 集群中就有 Topic 跟 Broker 的映射關(guān)系。

3、收發(fā)消息前,先創(chuàng)建 Topic,創(chuàng)建 Topic 時(shí)需要指定該 Topic 要存儲(chǔ)在哪些 Broker 上,也可以在發(fā)送消息時(shí)自動(dòng)創(chuàng)建 Topic。

4、Producer 發(fā)送消息,啟動(dòng)時(shí)先跟 NameServer 集群中的其中一臺(tái)建立長連接,并從 NameServer 中獲取當(dāng)前發(fā)送的 Topic 存在哪些 Broker 上,輪詢從隊(duì)列列表中選擇一個(gè)隊(duì)列,然后與隊(duì)列所在的 Broker 建立長連接從而向 Broker 發(fā)消息。

5、Consumer 跟 Producer 類似,跟其中一臺(tái) NameServer 建立長連接,獲取當(dāng)前訂閱 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立連接通道,開始消費(fèi)消息。

二、發(fā)布訂閱

RocketMQ 的傳輸模型是:發(fā)布訂閱模型 。

發(fā)布訂閱模型具有如下特點(diǎn):

  • 消費(fèi)獨(dú)立相比隊(duì)列模型的匿名消費(fèi)方式,發(fā)布訂閱模型中消費(fèi)方都會(huì)具備的身份,一般叫做訂閱組(訂閱關(guān)系),不同訂閱組之間相互獨(dú)立不會(huì)相互影響。
  • 一對(duì)多通信基于獨(dú)立身份的設(shè)計(jì),同一個(gè)主題內(nèi)的消息可以被多個(gè)訂閱組處理,每個(gè)訂閱組都可以拿到全量消息。因此發(fā)布訂閱模型可以實(shí)現(xiàn)一對(duì)多通信。

RocketMQ 支持兩種消息模式:集群消費(fèi)( Clustering )和廣播消費(fèi)( Broadcasting )。

集群消費(fèi):同一 Topic 下的一條消息只會(huì)被同一消費(fèi)組中的一個(gè)消費(fèi)者消費(fèi)。也就是說,消息被負(fù)載均衡到了同一個(gè)消費(fèi)組的多個(gè)消費(fèi)者實(shí)例上。

圖片

廣播消費(fèi):當(dāng)使用廣播消費(fèi)模式時(shí),每條消息推送給集群內(nèi)所有的消費(fèi)者,保證消息至少被每個(gè)消費(fèi)者消費(fèi)一次。

圖片

為了實(shí)現(xiàn)這種發(fā)布訂閱模型 , RocketMQ 精心設(shè)計(jì)了它的存儲(chǔ)模型。先進(jìn)入 Broker 的文件存儲(chǔ)目錄。

圖片

RocketMQ 采用的是混合型的存儲(chǔ)結(jié)構(gòu)。

1、Broker 單個(gè)實(shí)例下所有的隊(duì)列共用一個(gè)數(shù)據(jù)文件(commitlog)來存儲(chǔ)

生產(chǎn)者發(fā)送消息至 Broker 端,然后 Broker 端使用同步或者異步的方式對(duì)消息刷盤持久化,保存至 commitlog 文件中。只要消息被刷盤持久化至磁盤文件 commitlog 中,那么生產(chǎn)者發(fā)送的消息就不會(huì)丟失。

單個(gè)文件大小默認(rèn) 1G , 文件名長度為 20 位,左邊補(bǔ)零,剩余為起始偏移量,比如 00000000000000000000 代表了第一個(gè)文件,起始偏移量為 0 ,文件大小為1 G = 1073741824 。

圖片

commitlog 目錄

這種設(shè)計(jì)有兩個(gè)優(yōu)點(diǎn):

  • 充分利用順序?qū)懀蟠筇嵘龑懭霐?shù)據(jù)的吞吐量;
  • 快讀定位消息。因?yàn)橄⑹且粭l一條寫入到 commitlog 文件 ,寫入完成后,我們可以得到這條消息的物理偏移量。每條消息的物理偏移量是唯一的, commitlog 文件名是遞增的,可以根據(jù)消息的物理偏移量通過二分查找,定位消息位于那個(gè)文件中,并獲取到消息實(shí)體數(shù)據(jù)。

2、Broker 端的后臺(tái)服務(wù)線程會(huì)不停地分發(fā)請(qǐng)求并異步構(gòu)建 consumequeue(消費(fèi)文件)和 indexfile(索引文件)

進(jìn)入索引文件存儲(chǔ)目錄 :

圖片

1、消費(fèi)文件按照主題存儲(chǔ),每個(gè)主題下有不同的隊(duì)列,圖中主題 my-mac-topic 有 16 個(gè)隊(duì)列 (0 到 15) ;

2、每個(gè)隊(duì)列目錄下 ,存儲(chǔ) consumequeue 文件,每個(gè) consumequeue 文件也是順序?qū)懭耄瑪?shù)據(jù)格式見下圖。

圖片

每個(gè) consumequeue 文件包含 30 萬個(gè)條目,每個(gè)條目大小是 20 個(gè)字節(jié),每個(gè)文件的大小是 30 萬 * 20 = 60萬字節(jié),每個(gè)文件大小約 5.72M 。

和 commitlog 文件類似,consumequeue 文件的名稱也是以偏移量來命名的,可以通過消息的邏輯偏移量定位消息位于哪一個(gè)文件里。

消費(fèi)文件按照主題-隊(duì)列來保存 ,這種方式特別適配發(fā)布訂閱模型。

消費(fèi)者從 Broker 獲取訂閱消息數(shù)據(jù)時(shí),不用遍歷整個(gè) commitlog 文件,只需要根據(jù)邏輯偏移量從 consumequeue 文件查詢消息偏移量 ,  最后通過定位到 commitlog 文件, 獲取真正的消息數(shù)據(jù)。

要實(shí)現(xiàn)發(fā)布訂閱模型,還需要一個(gè)重要文件:消費(fèi)進(jìn)度文件。原因有兩點(diǎn):

  • 不同消費(fèi)組之間相互獨(dú)立,不會(huì)相互影響 ;
  • 消費(fèi)者下次拉取數(shù)據(jù)時(shí),需要知道從哪個(gè)進(jìn)度開始拉取 ,就像我們小時(shí)候玩單機(jī)游戲存盤一樣。

因此消費(fèi)進(jìn)度文件需要保存消費(fèi)組所訂閱主題的消費(fèi)進(jìn)度。

我們?yōu)g覽下集群消費(fèi)場景下的 Broker 端的消費(fèi)進(jìn)度文件 consumerOffset.json 。

圖片

圖片

在進(jìn)度文件 consumerOffset.json 里,數(shù)據(jù)以 key-value 的結(jié)構(gòu)存儲(chǔ),key 表示:主題@消費(fèi)者組 , value 是 consumequeue 中每個(gè)隊(duì)列對(duì)應(yīng)的邏輯偏移量 。

寫到這里,我們粗糙模擬下 RocketMQ 存儲(chǔ)模型如何滿足發(fā)布訂閱模型(集群模式) 。

圖片

1、發(fā)送消息:生產(chǎn)者發(fā)送消息到 Broker ;

2、保存消息:Broker 將消息存儲(chǔ)到 commitlog 文件 ,異步線程會(huì)構(gòu)建消費(fèi)文件 consumequeue ;

3、消費(fèi)流程:消費(fèi)者啟動(dòng)后,會(huì)通過負(fù)載均衡分配對(duì)應(yīng)的隊(duì)列,然后向 Broker 發(fā)送拉取消息請(qǐng)求。Broker 收到消費(fèi)者拉取請(qǐng)求之后,根據(jù)訂閱組,消費(fèi)者編號(hào),主題,隊(duì)列名,邏輯偏移量等參數(shù) ,從該主題下的 consumequeue 文件查詢消息消費(fèi)條目,然后從 commitlog 文件中獲取消息實(shí)體。消費(fèi)者在收到消息數(shù)據(jù)之后,執(zhí)行消費(fèi)監(jiān)聽器,消費(fèi)完消息;

4、保存進(jìn)度:消費(fèi)者將消費(fèi)進(jìn)度提交到 Broker ,Broker 會(huì)將該消費(fèi)組的消費(fèi)進(jìn)度存儲(chǔ)在進(jìn)度文件里。

三、消費(fèi)流程

我們重點(diǎn)講解下集群消費(fèi)的消費(fèi)流程 ,因?yàn)榧合M(fèi)是使用最普遍的消費(fèi)模式,理解了集群消費(fèi),廣播消費(fèi)也就能順理成章的掌握了。

圖片

集群消費(fèi)示例代碼里,啟動(dòng)消費(fèi)者,我們需要配置三個(gè)核心屬性:消費(fèi)組名、訂閱主題、消息監(jiān)聽器,最后調(diào)用 start 方法啟動(dòng)。

消費(fèi)者啟動(dòng)后,我們可以將整個(gè)流程簡化成:

圖片

四、負(fù)載均衡

消費(fèi)端的負(fù)載均衡是指將 Broker 端中多個(gè)隊(duì)列按照某種算法分配給同一個(gè)消費(fèi)組中的不同消費(fèi)者,負(fù)載均衡是客戶端開始消費(fèi)的起點(diǎn)。

RocketMQ 負(fù)載均衡的核心設(shè)計(jì)理念是

  • 消費(fèi)隊(duì)列在同一時(shí)間只允許被同一消費(fèi)組內(nèi)的一個(gè)消費(fèi)者消費(fèi)
  • 一個(gè)消費(fèi)者能同時(shí)消費(fèi)多個(gè)消息隊(duì)列

負(fù)載均衡是每個(gè)客戶端獨(dú)立進(jìn)行計(jì)算,那么何時(shí)觸發(fā)呢 ?

  • 消費(fèi)端啟動(dòng)時(shí),立即進(jìn)行負(fù)載均衡;
  • 消費(fèi)端定時(shí)任務(wù)每隔 20 秒觸發(fā)負(fù)載均衡;
  • 消費(fèi)者上下線,Broker 端通知消費(fèi)者觸發(fā)負(fù)載均衡。

負(fù)載均衡流程如下:

1、發(fā)送心跳

消費(fèi)者啟動(dòng)后,它就會(huì)通過定時(shí)任務(wù)不斷地向 RocketMQ 集群中的所有 Broker 實(shí)例發(fā)送心跳包(消息消費(fèi)分組名稱、訂閱關(guān)系集合、消息通信模式和客戶端實(shí)例編號(hào)等信息)。

Broker 端在收到消費(fèi)者的心跳消息后,會(huì)將它維護(hù)在 ConsumerManager 的本地緩存變量 consumerTable,同時(shí)并將封裝后的客戶端網(wǎng)絡(luò)通道信息保存在本地緩存變量 channelInfoTable 中,為之后做消費(fèi)端的負(fù)載均衡提供可以依據(jù)的元數(shù)據(jù)信息。

2、啟動(dòng)負(fù)載均衡服務(wù)

負(fù)載均衡服務(wù)會(huì)根據(jù)消費(fèi)模式為”廣播模式”還是“集群模式”做不同的邏輯處理,這里主要來看下集群模式下的主要處理流程:

(1) 獲取該主題下的消息消費(fèi)隊(duì)列集合;

(2) 查詢 Broker 端獲取該消費(fèi)組下消費(fèi)者 Id 列表;

(3) 先對(duì) Topic 下的消息消費(fèi)隊(duì)列、消費(fèi)者 Id 排序,然后用消息隊(duì)列分配策略算法(默認(rèn)為:消息隊(duì)列的平均分配算法),計(jì)算出待拉取的消息隊(duì)列;

平均分配算法

這里的平均分配算法,類似于分頁的算法,將所有 MessageQueue 排好序類似于記錄,將所有消費(fèi)端排好序類似頁數(shù),并求出每一頁需要包含的平均 size 和每個(gè)頁面記錄的范圍 range ,最后遍歷整個(gè) range 而計(jì)算出當(dāng)前消費(fèi)端應(yīng)該分配到的記錄。

(4) 分配到的消息隊(duì)列集合與 processQueueTable 做一個(gè)過濾比對(duì)操作。

消費(fèi)者實(shí)例內(nèi) ,processQueueTable 對(duì)象存儲(chǔ)著當(dāng)前負(fù)載均衡的隊(duì)列 ,以及該隊(duì)列的處理隊(duì)列 processQueue (消費(fèi)快照)。

  1. 標(biāo)紅的 Entry 部分表示與分配到的消息隊(duì)列集合互不包含,則需要將這些紅色隊(duì)列 Dropped 屬性為 true , 然后從 processQueueTable 對(duì)象中移除。
  2. 綠色的 Entry 部分表示與分配到的消息隊(duì)列集合的交集,processQueueTable 對(duì)象中已經(jīng)存在該隊(duì)列。
  3. 黃色的 Entry 部分表示這些隊(duì)列需要添加到 processQueueTable 對(duì)象中,為每個(gè)分配的新隊(duì)列創(chuàng)建一個(gè)消息拉取請(qǐng)求  pullRequest  ,  在消息拉取請(qǐng)求中保存一個(gè)處理隊(duì)列 processQueue (隊(duì)列消費(fèi)快照),內(nèi)部是紅黑樹(TreeMap),用來保存拉取到的消息。

最后創(chuàng)建拉取消息請(qǐng)求列表,并將請(qǐng)求分發(fā)到消息拉取服務(wù),進(jìn)入拉取消息環(huán)節(jié)。

五、長輪詢

在負(fù)載均衡這一小節(jié),我們已經(jīng)知道負(fù)載均衡觸發(fā)了拉取消息的流程。

消費(fèi)者啟動(dòng)的時(shí)候,會(huì)創(chuàng)建一個(gè)拉取消息服務(wù) PullMessageService ,它是一個(gè)單線程的服務(wù)。

核心流程如下:

1、負(fù)載均衡服務(wù)將消息拉取請(qǐng)求放入到拉取請(qǐng)求隊(duì)列 pullRequestQueue , 拉取消息服務(wù)從隊(duì)列中獲取拉取消息請(qǐng)求 ;

2、拉取消息服務(wù)向 Brorker 服務(wù)發(fā)送拉取請(qǐng)求 ,拉取請(qǐng)求的通訊模式是異步回調(diào)模式 ;

消費(fèi)者的拉取消息服務(wù)本身就是一個(gè)單線程,使用異步回調(diào)模式,發(fā)送拉取消息請(qǐng)求到 Broker 后,拉取消息線程并不會(huì)阻塞 ,可以繼續(xù)處理隊(duì)列 pullRequestQueue 中的其他拉取任務(wù)。

3、Broker 收到消費(fèi)者拉取消息請(qǐng)求后,從存儲(chǔ)中查詢出消息數(shù)據(jù),然后返回給消費(fèi)者;

4、消費(fèi)者的網(wǎng)絡(luò)通訊層會(huì)執(zhí)行拉取回調(diào)函數(shù)相關(guān)邏輯,首先會(huì)將消息數(shù)據(jù)存儲(chǔ)在隊(duì)列消費(fèi)快照 processQueue 里;

消費(fèi)快照使用紅黑樹 msgTreeMap 存儲(chǔ)拉取服務(wù)拉取到的消息 。

5、回調(diào)函數(shù)將消費(fèi)請(qǐng)求提交到消息消費(fèi)服務(wù) ,而消息消費(fèi)服務(wù)會(huì)異步的消費(fèi)這些消息;

6、回調(diào)函數(shù)會(huì)將處理中隊(duì)列的拉取請(qǐng)放入到定時(shí)任務(wù)中;

7、定時(shí)任務(wù)再次將消息拉取請(qǐng)求放入到隊(duì)列 pullRequestQueue 中,形成了閉環(huán):負(fù)載均衡后的隊(duì)列總會(huì)有任務(wù)執(zhí)行拉取消息請(qǐng)求,不會(huì)中斷。

細(xì)心的同學(xué)肯定有疑問:既然消費(fèi)端是拉取消息,為什么是長輪詢呢 ?

雖然拉模式的主動(dòng)權(quán)在消費(fèi)者這一側(cè),但是缺點(diǎn)很明顯。

因?yàn)橄M(fèi)者并不知曉 Broker 端什么時(shí)候有新的消息 ,所以會(huì)不停地去 Broker 端拉取消息,但拉取頻率過高, Broker 端壓力就會(huì)很大,頻率過低則會(huì)導(dǎo)致消息延遲。

所以要想消費(fèi)消息的延遲低,服務(wù)端的推送必不可少。

下圖展示了 RocketMQ 如何通過長輪詢減小拉取消息的延遲。

核心流程如下:

1、Broker 端接收到消費(fèi)者的拉取消息請(qǐng)求后,拉取消息處理器開始處理請(qǐng)求,根據(jù)拉取請(qǐng)求查詢消息存儲(chǔ) ;

2、從消息存儲(chǔ)中獲取消息數(shù)據(jù) ,若存在新消息 ,則將消息數(shù)據(jù)通過網(wǎng)絡(luò)返回給消費(fèi)者。若無新消息,則將拉取請(qǐng)求放入到拉取請(qǐng)求表 pullRequestTable 。

3、長輪詢請(qǐng)求管理服務(wù) pullRequestHoldService 每隔 5 秒從拉取請(qǐng)求表中判斷拉取消息請(qǐng)求的隊(duì)列是否有新的消息。

判定標(biāo)準(zhǔn)是:拉取消息請(qǐng)求的偏移量是否小于當(dāng)前消費(fèi)隊(duì)列最大偏移量,如果條件成立則說明有新消息了。

若存在新的消息 ,  長輪詢請(qǐng)求管理服務(wù)會(huì)觸發(fā)拉取消息處理器重新處理該拉取消息請(qǐng)求。

4、當(dāng) commitlog 中新增了新的消息,消息分發(fā)服務(wù)會(huì)構(gòu)建消費(fèi)文件和索引文件,并且會(huì)通知長輪詢請(qǐng)求管理服務(wù),觸發(fā)拉取消息處理器重新處理該拉取消息請(qǐng)求。

六、消費(fèi)消息

在拉取消息的流程里, Broker 端返回消息數(shù)據(jù),消費(fèi)者的通訊框架層會(huì)執(zhí)行回調(diào)函數(shù)。

回調(diào)線程會(huì)將數(shù)據(jù)存儲(chǔ)在隊(duì)列消費(fèi)快照 processQueue(內(nèi)部使用紅黑樹 msgTreeMap)里,然后將消息提交到消費(fèi)消息服務(wù),消費(fèi)消息服務(wù)會(huì)異步消費(fèi)這些消息。

消息消費(fèi)服務(wù)有兩種類型:并發(fā)消費(fèi)服務(wù)和順序消費(fèi)服務(wù) 。

6.1 并發(fā)消費(fèi)

并發(fā)消費(fèi)是指消費(fèi)者將并發(fā)消費(fèi)消息,消費(fèi)的時(shí)候可能是無序的。

消費(fèi)消息并發(fā)服務(wù)啟動(dòng)后,會(huì)初始化三個(gè)組件:消費(fèi)線程池、清理過期消息定時(shí)任務(wù)、處理失敗消息定時(shí)任務(wù)。

核心流程如下:

0、通訊框架回調(diào)線程會(huì)將數(shù)據(jù)存儲(chǔ)在消費(fèi)快照里,然后將消息列表 msgList 提交到消費(fèi)消息服務(wù)

1、 消息列表 msgList 組裝成消費(fèi)對(duì)象

2、將消費(fèi)對(duì)象提交到消費(fèi)線程池

我們看到10 條消息被組裝成三個(gè)消費(fèi)請(qǐng)求對(duì)象,不同的消費(fèi)線程會(huì)執(zhí)行不同的消費(fèi)請(qǐng)求對(duì)象。

3、消費(fèi)線程執(zhí)行消息監(jiān)聽器

執(zhí)行完消費(fèi)監(jiān)聽器,會(huì)返回消費(fèi)結(jié)果。

4、處理異常消息

當(dāng)消費(fèi)異常時(shí),異常消息將重新發(fā)回 Broker 端的重試隊(duì)列( RocketMQ 會(huì)為每個(gè) topic 創(chuàng)建一個(gè)重試隊(duì)列,以 %RETRY% 開頭),達(dá)到重試時(shí)間后將消息投遞到重試隊(duì)列中進(jìn)行消費(fèi)重試。

我們將在重試機(jī)制這一節(jié)重點(diǎn)講解 RocketMQ 如何實(shí)現(xiàn)延遲消費(fèi)功能 。

假如異常的消息發(fā)送到 Broker 端失敗,則重新將這些失敗消息通過處理失敗消息定時(shí)任務(wù)重新提交到消息消費(fèi)服務(wù)。

5、更新本地消費(fèi)進(jìn)度

消費(fèi)者消費(fèi)一批消息完成之后,需要保存消費(fèi)進(jìn)度到進(jìn)度管理器的本地內(nèi)存。

首先我們會(huì)從隊(duì)列消費(fèi)快照 processQueue 中移除消息,返回消費(fèi)快照 msgTreeMap 第一個(gè)偏移量 ,然后調(diào)用消費(fèi)消息進(jìn)度管理器 offsetStore 更新消費(fèi)進(jìn)度。

待更新的偏移量是如何計(jì)算的呢?

  • 場景1:快照中1001(消息1)到1010(消息10)消費(fèi)了,快照中沒有了消息,返回已消費(fèi)的消息最大偏移量 + 1 也就是1011。
  • 場景2:快照中1001(消息1)到1008(消息8)消費(fèi)了,快照中只剩下兩條消息了,返回最小的偏移量 1009。
  • 場景3:1001(消息1)在消費(fèi)對(duì)象中因?yàn)槟撤N原因一直沒有被消費(fèi),即使后面的消息1005-1010都消費(fèi)完成了,返回的最小偏移量是1001。

在場景3,RocketMQ 為了保證消息肯定被消費(fèi)成功,消費(fèi)進(jìn)度只能維持在1001(消息1),直到1001也被消費(fèi)完,本地的消費(fèi)進(jìn)度才會(huì)一下子更新到1011。

假設(shè)1001(消息1)還沒有消費(fèi)完成,消費(fèi)者實(shí)例突然退出(機(jī)器斷電,或者被 kill ),就存在重復(fù)消費(fèi)的風(fēng)險(xiǎn)。

因?yàn)殛?duì)列的消費(fèi)進(jìn)度還是維持在1001,當(dāng)隊(duì)列重新被分配給新的消費(fèi)者實(shí)例的時(shí)候,新的實(shí)例從 Broker 上拿到的消費(fèi)進(jìn)度還是維持在1001,這時(shí)候就會(huì)又從1001開始消費(fèi),1001-1010這批消息實(shí)際上已經(jīng)被消費(fèi)過還是會(huì)投遞一次。

所以業(yè)務(wù)必須要保證消息消費(fèi)的冪等性。

寫到這里,我們會(huì)有一個(gè)疑問:假設(shè)1001(消息1)因?yàn)榧渔i或者消費(fèi)監(jiān)聽器邏輯非常耗時(shí),導(dǎo)致極長時(shí)間沒有消費(fèi)完成,那么消費(fèi)進(jìn)度就會(huì)一直卡住 ,怎么解決呢 ?

RocketMQ 提供兩種方式一起配合解決:

  • 拉取服務(wù)根據(jù)并發(fā)消費(fèi)間隔配置限流拉取消息服務(wù)在拉取消息時(shí)候,會(huì)判斷當(dāng)前隊(duì)列的 processQueue 消費(fèi)快照里消息的最大偏移量 - 消息的最小偏移量大于消費(fèi)并發(fā)間隔(2000)的時(shí)候 , 就會(huì)觸發(fā)流控 ,  這樣就可以避免消費(fèi)者無限循環(huán)的拉取新的消息。
  • 清理過期消息消費(fèi)消息并發(fā)服務(wù)啟動(dòng)后,會(huì)定期掃描所有消費(fèi)的消息,若當(dāng)前時(shí)間減去開始消費(fèi)的時(shí)間大于消費(fèi)超時(shí)時(shí)間,首先會(huì)將過期消息發(fā)送 sendMessageBack 命令發(fā)送到 Broker ,然后從快照中刪除該消息。

6.2 順序消費(fèi)

順序消息是指對(duì)于一個(gè)指定的 Topic ,消息嚴(yán)格按照先進(jìn)先出(FIFO)的原則進(jìn)行消息發(fā)布和消費(fèi),即先發(fā)布的消息先消費(fèi),后發(fā)布的消息后消費(fèi)。

順序消息分為分區(qū)順序消息和全局順序消息。

1、分區(qū)順序消息

對(duì)于指定的一個(gè) Topic ,所有消息根據(jù) Sharding Key 進(jìn)行區(qū)塊分區(qū),同一個(gè)分區(qū)內(nèi)的消息按照嚴(yán)格的先進(jìn)先出(FIFO)原則進(jìn)行發(fā)布和消費(fèi)。同一分區(qū)內(nèi)的消息保證順序,不同分區(qū)之間的消息順序不做要求。

  • 適用場景:適用于性能要求高,以 Sharding Key 作為分區(qū)字段,在同一個(gè)區(qū)塊中嚴(yán)格地按照先進(jìn)先出(FIFO)原則進(jìn)行消息發(fā)布和消費(fèi)的場景。
  • 示例:電商的訂單創(chuàng)建,以訂單 ID 作為 Sharding Key ,那么同一個(gè)訂單相關(guān)的創(chuàng)建訂單消息、訂單支付消息、訂單退款消息、訂單物流消息都會(huì)按照發(fā)布的先后順序來消費(fèi)。

2、全局順序消息

對(duì)于指定的一個(gè) Topic ,所有消息按照嚴(yán)格的先入先出(FIFO)的順序來發(fā)布和消費(fèi)。

  • 適用場景:適用于性能要求不高,所有的消息嚴(yán)格按照 FIFO 原則來發(fā)布和消費(fèi)的場景。
  • 示例:在證券處理中,以人民幣兌換美元為 Topic,在價(jià)格相同的情況下,先出價(jià)者優(yōu)先處理,則可以按照 FIFO 的方式發(fā)布和消費(fèi)全局順序消息。

全局順序消息實(shí)際上是一種特殊的分區(qū)順序消息,即 Topic 中只有一個(gè)分區(qū),因此全局順序和分區(qū)順序的實(shí)現(xiàn)原理相同。

因?yàn)榉謪^(qū)順序消息有多個(gè)分區(qū),所以分區(qū)順序消息比全局順序消息的并發(fā)度和性能更高。

消息的順序需要由兩個(gè)階段保證:

  • 消息發(fā)送如上圖所示,A1、B1、A2、A3、B2、B3 是訂單 A 和訂單 B 的消息產(chǎn)生的順序,業(yè)務(wù)上要求同一訂單的消息保持順序,例如訂單 A 的消息發(fā)送和消費(fèi)都按照 A1、A2、A3 的順序。如果是普通消息,訂單A 的消息可能會(huì)被輪詢發(fā)送到不同的隊(duì)列中,不同隊(duì)列的消息將無法保持順序,而順序消息發(fā)送時(shí) RocketMQ 支持將 Sharding Key 相同(例如同一訂單號(hào))的消息序路由到同一個(gè)隊(duì)列中。下圖是生產(chǎn)者發(fā)送順序消息的封裝,原理是發(fā)送消息時(shí),實(shí)現(xiàn) MessageQueueSelector 接口, 根據(jù) Sharding Key 使用 Hash 取模法來選擇待發(fā)送的隊(duì)列。生產(chǎn)者順序發(fā)送消息封裝
  • 消息消費(fèi)消費(fèi)者消費(fèi)消息時(shí),需要保證單線程消費(fèi)每個(gè)隊(duì)列的消息數(shù)據(jù),從而實(shí)現(xiàn)消費(fèi)順序和發(fā)布順序的一致。

順序消費(fèi)服務(wù)的類是 ConsumeMessageOrderlyService ,在負(fù)載均衡階段,并發(fā)消費(fèi)和順序消費(fèi)并沒有什么大的差別。

最大的差別在于:順序消費(fèi)會(huì)向 Borker 申請(qǐng)鎖 。消費(fèi)者根據(jù)分配的隊(duì)列 messageQueue ,向 Borker 申請(qǐng)鎖 ,如果申請(qǐng)成功,則會(huì)拉取消息,如果失敗,則定時(shí)任務(wù)每隔20秒會(huì)重新嘗試。

順序消費(fèi)核心流程如下:

1、 組裝成消費(fèi)對(duì)象

2、 將請(qǐng)求對(duì)象提交到消費(fèi)線程池

和并發(fā)消費(fèi)不同的是,這里的消費(fèi)請(qǐng)求包含消費(fèi)快照 processQueue ,消息隊(duì)列 messageQueue 兩個(gè)對(duì)象,并不對(duì)消息列表做任何處理。

3、 消費(fèi)線程內(nèi),對(duì)消費(fèi)隊(duì)列加鎖

順序消費(fèi)也是通過線程池消費(fèi)的,synchronized 鎖用來保證同一時(shí)刻對(duì)于同一個(gè)隊(duì)列只有一個(gè)線程去消費(fèi)它

4、 從消費(fèi)快照中取得待消費(fèi)的消息列表

消費(fèi)快照 processQueue 對(duì)象里,創(chuàng)建了一個(gè)紅黑樹對(duì)象 consumingMsgOrderlyTreeMap 用于臨時(shí)存儲(chǔ)的待消費(fèi)的消息。

5、 執(zhí)行消息監(jiān)聽器

消費(fèi)快照的消費(fèi)鎖 consumeLock 的作用是:防止負(fù)載均衡線程把當(dāng)前消費(fèi)的 MessageQueue 對(duì)象移除掉。

6、 處理消費(fèi)結(jié)果

消費(fèi)成功時(shí),首先計(jì)算需要提交的偏移量,然后更新本地消費(fèi)進(jìn)度。

消費(fèi)失敗時(shí),分兩種場景:

  • 假如已消費(fèi)次數(shù)小于最大重試次數(shù),則將對(duì)象 consumingMsgOrderlyTreeMap 中臨時(shí)存儲(chǔ)待消費(fèi)的消息,重新加入到消費(fèi)快照紅黑樹 msgTreeMap 中,然后使用定時(shí)任務(wù)嘗試重新消費(fèi)。
  • 假如已消費(fèi)次數(shù)大于等于最大重試次數(shù),則將失敗消息發(fā)送到 Broker ,Broker 接收到消息后,會(huì)加入到死信隊(duì)列里 , 最后計(jì)算需要提交的偏移量,然后更新本地消費(fèi)進(jìn)度。

我們做一個(gè)關(guān)于順序消費(fèi)的總結(jié) :

  1. 順序消費(fèi)需要由兩個(gè)階段消息發(fā)送和消息消費(fèi)協(xié)同配合,底層支撐依靠的是 RocketMQ 的存儲(chǔ)模型;
  2. 順序消費(fèi)服務(wù)啟動(dòng)后,隊(duì)列的數(shù)據(jù)都會(huì)被消費(fèi)者實(shí)例單線程的執(zhí)行消費(fèi);
  3. 假如消費(fèi)者擴(kuò)容,消費(fèi)者重啟,或者 Broker 宕機(jī) ,順序消費(fèi)也會(huì)有一定幾率較短時(shí)間內(nèi)亂序,所以消費(fèi)者的業(yè)務(wù)邏輯還是要保障冪等。

七、保存進(jìn)度

RocketMQ 消費(fèi)者消費(fèi)完一批數(shù)據(jù)后, 會(huì)將隊(duì)列的進(jìn)度保存在本地內(nèi)存,但還需要將隊(duì)列的消費(fèi)進(jìn)度持久化。

1、 集群模式

圖片

集群模式下,分兩種場景:

  • 拉取消息服務(wù)會(huì)在拉取消息時(shí),攜帶該隊(duì)列的消費(fèi)進(jìn)度,提交給 Broker 的拉取消息處理器。
  • 消費(fèi)者定時(shí)任務(wù),每隔5秒將本地緩存中的消費(fèi)進(jìn)度提交到 Broker 的消費(fèi)者管理處理器。

Broker 的這兩個(gè)處理器都調(diào)用消費(fèi)者進(jìn)度管理器 consumerOffsetManager 的 commitOffset 方法,定時(shí)任務(wù)異步將消費(fèi)進(jìn)度持久化到消費(fèi)進(jìn)度文件 consumerOffset.json 中。

圖片

2、 廣播模式

廣播模式消費(fèi)進(jìn)度存儲(chǔ)在消費(fèi)者本地,定時(shí)任務(wù)每隔 5 秒通過 LocalFileOffsetStore 持久化到本地文件offsets.json ,數(shù)據(jù)格式為 MessageQueue:Offset 。

圖片

廣播模式下,消費(fèi)進(jìn)度和消費(fèi)組沒有關(guān)系,本地文件 offsets.json 存儲(chǔ)在配置的目錄,文件中包含訂閱主題中所有的隊(duì)列以及隊(duì)列的消費(fèi)進(jìn)度。

八、重試機(jī)制

集群消費(fèi)下,重試機(jī)制的本質(zhì)是 RocketMQ 的延遲消息功能。

消費(fèi)消息失敗后,消費(fèi)者實(shí)例會(huì)通過 CONSUMER_SEND_MSG_BACK 請(qǐng)求,將失敗消息發(fā)回到 Broker 端。

Broker 端會(huì)為每個(gè) topic 創(chuàng)建一個(gè)重試隊(duì)列 ,隊(duì)列名稱是:%RETRY% + 消費(fèi)者組名 ,達(dá)到重試時(shí)間后將消息投遞到重試隊(duì)列中進(jìn)行消費(fèi)重試(消費(fèi)者組會(huì)自動(dòng)訂閱重試 Topic)。最多重試消費(fèi) 16 次,重試的時(shí)間間隔逐漸變長,若達(dá)到最大重試次數(shù)后消息還沒有成功被消費(fèi),則消息將被投遞至死信隊(duì)列。

第幾次重試

與上次重試的間隔時(shí)間

第幾次重試

與上次重試的間隔時(shí)間

1

10 秒

9

7 分鐘

2

30 秒

10

8 分鐘

3

1 分鐘

11

9 分鐘

4

2 分鐘

12

10 分鐘

5

3 分鐘

13

20 分鐘

6

4 分鐘

14

30 分鐘

7

5 分鐘

15

1 小時(shí)

8

6 分鐘

16

2 小時(shí)

圖片

開源 RocketMQ 4.X 支持延遲消息,默認(rèn)支持18 個(gè) level 的延遲消息,這是通過 broker 端的 messageDelayLevel 配置項(xiàng)確定的,如下:

圖片

Broker 在啟動(dòng)時(shí),內(nèi)部會(huì)創(chuàng)建一個(gè)內(nèi)部主題:SCHEDULE_TOPIC_XXXX,根據(jù)延遲 level 的個(gè)數(shù),創(chuàng)建對(duì)應(yīng)數(shù)量的隊(duì)列,也就是說18個(gè) level 對(duì)應(yīng)了18個(gè)隊(duì)列。

我們先梳理下延遲消息的實(shí)現(xiàn)機(jī)制。

1、生產(chǎn)者發(fā)送延遲消息

Message msg = new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//設(shè)置延遲level為5,對(duì)應(yīng)延遲1分鐘
msg.setDelayTimeLevel(5);
producer.send(msg);

2、Broker端存儲(chǔ)延遲消息

延遲消息在 RocketMQ Broker 端的流轉(zhuǎn)如下圖所示:

圖片

第一步:修改消息 Topic 名稱和隊(duì)列信息

Broker 端接收到生產(chǎn)者的寫入消息請(qǐng)求后,首先都會(huì)將消息寫到 commitlog 中。假如是正常非延遲消息,MessageStore 會(huì)根據(jù)消息中的 Topic 信息和隊(duì)列信息,將其轉(zhuǎn)發(fā)到目標(biāo) Topic 的指定隊(duì)列 consumequeue 中。

但由于消息一旦存儲(chǔ)到 consumequeue 中,消費(fèi)者就能消費(fèi)到,而延遲消息不能被立即消費(fèi),所以 RocketMQ 將 Topic 的名稱修改為SCHEDULE_TOPIC_XXXX,并根據(jù)延遲級(jí)別確定要投遞到哪個(gè)隊(duì)列下。

同時(shí),還會(huì)將消息原來要發(fā)送到的目標(biāo) Topic 和隊(duì)列信息存儲(chǔ)到消息的屬性中。

圖片

第二步:構(gòu)建 consumequeue 文件時(shí),計(jì)算并存儲(chǔ)投遞時(shí)間

圖片

圖片

上圖是 consumequeue 文件一條消息的格式,最后 8 個(gè)字節(jié)存儲(chǔ) Tag 的哈希值,此時(shí)存儲(chǔ)消息的投遞時(shí)間。

第三步:定時(shí)調(diào)度服務(wù)啟動(dòng)

ScheduleMessageService 類是一個(gè)定時(shí)調(diào)度服務(wù),讀取 SCHEDULE_TOPIC_XXXX 隊(duì)列的消息,并將消息投遞到目標(biāo) Topic 中。

定時(shí)調(diào)度服務(wù)啟動(dòng)時(shí),創(chuàng)建一個(gè)定時(shí)調(diào)度線程池 ,并根據(jù)延遲級(jí)別的個(gè)數(shù),啟動(dòng)對(duì)應(yīng)數(shù)量的 HandlePutResultTask ,每個(gè) HandlePutResultTask 負(fù)責(zé)一個(gè)延遲級(jí)別的消費(fèi)與投遞。

圖片

第四步:投遞時(shí)間到了,將消息數(shù)據(jù)重新寫入到 commitlog

消息到期后,需要投遞到目標(biāo) Topic 。第一步已經(jīng)記錄了原來的 Topic 和隊(duì)列信息,這里需要重新設(shè)置,再存儲(chǔ)到 commitlog 中。

第五步:將消息投遞到目標(biāo) Topic 中

Broker 端的后臺(tái)服務(wù)線程會(huì)不停地分發(fā)請(qǐng)求并異步構(gòu)建 consumequeue(消費(fèi)文件)和 indexfile(索引文件)。因此消息會(huì)直接投遞到目標(biāo) Topic 的 consumequeue 中,之后消費(fèi)者就可以消費(fèi)到這條消息。

回顧了延遲消息的機(jī)制,消費(fèi)消息失敗后,消費(fèi)者實(shí)例會(huì)通過 CONSUMER_SEND_MSG_BACK 請(qǐng)求,將失敗消息發(fā)回到 Broker 端。

Broker 端 SendMessageProcessor 處理器會(huì)調(diào)用 asyncConsumerSendMsgBack 方法。

圖片

首先判斷消息的當(dāng)前重試次數(shù)是否大于等于最大重試次數(shù),如果達(dá)到最大重試次數(shù),或者配置的重試級(jí)別小于0,則重新創(chuàng)建 Topic ,規(guī)則是 %DLQ% + consumerGroup,后續(xù)處理消息發(fā)送到死信隊(duì)列。

正常的消息會(huì)進(jìn)入 else 分支,對(duì)于首次重試的消息,默認(rèn)的 delayLevel 是 0 ,RocketMQ 會(huì)將 delayLevel + 3,也就是加到 3 ,這就是說,如果沒有顯示的配置延時(shí)級(jí)別,消息消費(fèi)重試首次,是延遲了第三個(gè)級(jí)別發(fā)起的重試,也就是距離首次發(fā)送 10s 后重試,其主題的默認(rèn)規(guī)則是 %RETRY% + consumerGroup。

當(dāng)延時(shí)級(jí)別設(shè)置完成,刷新消息的重試次數(shù)為當(dāng)前次數(shù)加 1 ,Broker 端將該消息刷盤,邏輯如下:

圖片

延遲消息寫入到 commitlog 里 ,這里其實(shí)和延遲消息機(jī)制的第一步類似,后面按照延遲消息機(jī)制的流程執(zhí)行即可(第二步到第六步)。

九、總結(jié)

下圖展示了集群模式下消費(fèi)者并發(fā)消費(fèi)流程 :

圖片

核心流程如下:

  1. 消費(fèi)者啟動(dòng)后,觸發(fā)負(fù)載均衡服務(wù) ,負(fù)載均衡服務(wù)為消費(fèi)者實(shí)例分配對(duì)應(yīng)的隊(duì)列 ;
  2. 分配完隊(duì)列后,負(fù)載均衡服務(wù)會(huì)為每個(gè)分配的新隊(duì)列創(chuàng)建一個(gè)消息拉取請(qǐng)求  pullRequest  ,  拉取請(qǐng)求保存一個(gè)處理隊(duì)列 processQueue,內(nèi)部是紅黑樹(TreeMap),用來保存拉取到的消息 ;
  3. 拉取消息服務(wù)單線程從拉取請(qǐng)求隊(duì)列  pullRequestQueue 中彈出拉取消息,執(zhí)行拉取任務(wù) ,拉取請(qǐng)求是異步回調(diào)模式,將拉取到的消息放入到處理隊(duì)列;
  4. 拉取請(qǐng)求在一次拉取消息完成之后會(huì)復(fù)用,重新被放入拉取請(qǐng)求隊(duì)列 pullRequestQueue 中 ;
  5. 拉取完成后,調(diào)用消費(fèi)消息服務(wù)  consumeMessageService 的  submitConsumeRequest 方法 ,消費(fèi)消息服務(wù)內(nèi)部有一個(gè)消費(fèi)線程池;
  6. 消費(fèi)線程池的消費(fèi)線程從消費(fèi)任務(wù)隊(duì)列中獲取消費(fèi)請(qǐng)求,執(zhí)行消費(fèi)監(jiān)聽器  listener.consumeMessage ;
  7. 消費(fèi)完成后,若消費(fèi)成功,則更新偏移量 updateOffset,先更新到內(nèi)存 offsetTable,定時(shí)上報(bào)到 Broker ;若消費(fèi)失敗,則將失敗消費(fèi)發(fā)送到 Broker 。
  8. Broker 端接收到請(qǐng)求后, 調(diào)用消費(fèi)進(jìn)度管理器的 commitOffset 方法修改內(nèi)存的消費(fèi)進(jìn)度,定時(shí)刷盤到  consumerOffset.json。

RocketMQ 4.X 的消費(fèi)邏輯有兩個(gè)非常明顯的特點(diǎn):

  1. 客戶端代碼邏輯較重。假如要支持一種新的編程語言,那么客戶端就必須實(shí)現(xiàn)完整的負(fù)載均衡邏輯,此外還需要實(shí)現(xiàn)拉消息、位點(diǎn)管理、消費(fèi)失敗后將消息發(fā)回 Broker 重試等邏輯。這給多語言客戶端的支持造成很大的阻礙。
  2. 保證冪等非常重要。當(dāng)客戶端升級(jí)或者下線時(shí),或者 Broker 宕機(jī),都要進(jìn)行負(fù)載均衡操作,可能造成消息堆積,同時(shí)有一定幾率造成重復(fù)消費(fèi)。

參考資料:

1、RocketMQ 4.9.4 Github 文檔

https://github.com/apache/rocketmq/tree/rocketmq-all-4.9.4/docs

2、RocketMQ 技術(shù)內(nèi)幕

3、消息隊(duì)列核心知識(shí)點(diǎn)

https://mp.weixin.qq.com/s/v7_ih9X5mG3X4E4ecfgYXA

4、消息ACK機(jī)制及消費(fèi)進(jìn)度管理

https://zhuanlan.zhihu.com/p/25265380

責(zé)任編輯:武曉燕 來源: 勇哥java實(shí)戰(zhàn)分享
相關(guān)推薦

2021-08-26 05:02:50

分布式設(shè)計(jì)

2021-10-18 11:58:56

負(fù)載均衡虛擬機(jī)

2022-09-06 08:02:40

死鎖順序鎖輪詢鎖

2021-01-19 05:49:44

DNS協(xié)議

2022-09-14 09:01:55

shell可視化

2024-03-07 18:11:39

Golang采集鏈接

2020-07-15 08:57:40

HTTPSTCP協(xié)議

2020-11-16 10:47:14

FreeRTOS應(yīng)用嵌入式

2020-07-09 07:54:35

ThreadPoolE線程池

2022-10-10 08:35:17

kafka工作機(jī)制消息發(fā)送

2022-07-19 16:03:14

KubernetesLinux

2024-05-10 12:59:58

PyTorch人工智能

2024-01-05 08:30:26

自動(dòng)駕駛算法

2024-01-11 09:53:31

面試C++

2022-07-15 16:31:49

Postman測試

2022-09-08 10:14:29

人臉識(shí)別算法

2024-09-09 05:00:00

RedisString數(shù)據(jù)庫

2022-04-25 10:56:33

前端優(yōu)化性能

2022-02-15 18:45:35

Linux進(jìn)程調(diào)度器

2021-06-04 07:27:24

sourcemap前端技術(shù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 日韩欧美国产一区二区三区 | 九九热精品视频 | 91xx在线观看| 亚洲二区在线 | 在线免费黄色小视频 | 波多野结衣在线观看一区二区三区 | 欧美日韩国产在线观看 | 日本亚洲一区 | 午夜精品久久久久久久久久久久久 | 国产草草视频 | av手机免费在线观看 | 成人av免费 | 久久久亚洲 | 亚州精品天堂中文字幕 | 免费一看一级毛片 | 国产精品精品久久久 | 成人伊人 | 国产日产欧产精品精品推荐蛮挑 | 国产精品久久一区二区三区 | 操人视频在线观看 | 日本手机看片 | 国产精品毛片一区二区在线看 | 亚洲免费视频一区 | 国产一区 | 69av片| 一道本不卡视频 | 成人精品一区亚洲午夜久久久 | 日韩亚洲视频 | 亚洲一区二区三区在线 | 在线看片网站 | 99精品国产一区二区三区 | 97色伦网 | 中文字幕在线播放第一页 | 久久午夜精品福利一区二区 | 欧美中文字幕一区 | av在线一区二区三区 | 欧美成人a∨高清免费观看 色999日韩 | 中文字幕一区在线 | 久久免费高清视频 | 成人午夜免费视频 | 日韩aⅴ在线观看 |