解決消息延遲和堆積問題
- 消息可靠性問題:如何確保發送的消息至少被消費一次?
- 延遲消息問題:如何實現消息的延遲投遞?
- 消息堆積問題:如何解決數百萬級以上消息堆積,無法及時消費問題?
我們在上篇已經說明了如何解決消息丟失的問題,也就是保證了消息的可靠性,那么其余兩個問題同樣重要,這篇我們將講述其余兩個問題的解決方式~!
一、延遲消息
延遲消息 字面意思就是讓延遲接收消息,那么如何能讓消息延遲到達?這就是我們要思考解決的問題,在了解延遲隊列之前我們需要先明白 RabbitMQ 中的兩個概念
- 死信交換機
- TTL
1)死信交換機
死信(dead letter),也就是廢棄已死亡的消息,那什么情況下一個普通的消息能夠成為死信?需要符合以下三個條件:
消費者使用 basic.reject 或 basic.nack 聲明消費失敗,并將消息的 requeue 參數設置為 false
消息是一個過期消息,超時后無人消費
要投遞的隊列消息堆積滿了,最早的消息就會成為死信
而 死信交換機 便是 死信 的歸屬。
如果一個隊列配置了 dead-letter-exchange 屬性,指定了一個交換機,那么隊列中的死信就會投遞到這個交換機中,而這個交換機就稱為 死信交換機 - DLX(Dead Letter Exchange )
步驟:當生產者正常投遞到隊列(simple.queue)中,如果消費者從隊列(simple.queue) 消費消息卻聲明了 reject,那并且隊列綁定了死信交換機(dl.queue),那么這個時候成為死信的消息就會投遞到這個死信隊列(dl.queue)中。
死信投遞過程
從正常隊列 --> 死信隊列 的過程,我們必須聲明兩個關鍵信息
- 死信交換機的名稱
- 死信交換機與死信隊列綁定的路由key
而這兩個信息也是我們投遞消息的基礎配置。
接下來我們簡單模擬一下 條件1 所產生的場景
1、首先聲明一個死信交換機和死信隊列
我們這邊是使用簡單的注解方式直接生成
生成死信交換機和死信隊列
通過 RabbitMQ 控制臺界面可以看出已經成功生成
2、聲明正常使用交換機與隊列
然后這個時候我們就可以創建一個正常使用的交換機與隊列,并指明死信交換機
同樣可以通過控制臺查看創建狀態
其中是否有聲明死信交換機我們可以通過隊列的 DLX 和 DLK 標志判斷
3、模擬拒收
然后我們現在通過代碼模擬客戶端拒絕消息的場景
1)消息發送
2)消息接收
查看控制臺,結果如下:
- 2021-11-06 23:56:52.095 INFO 2112 --- [ntContainer#0-1] c.l.m.c.listener.SpringRabbitListener : 正常業務交換機 | 接收到的消息 : [hello]
- 2021-11-06 23:56:52.118 INFO 2112 --- [ntContainer#1-1] c.l.m.c.listener.SpringRabbitListener : 死信交換機 | 接收到的消息 : hello
這說明我們死信交換機已經成功發揮作用
2)TTL
以上我們已經成功認識到了 死信交換機 的使用,但是這與我們一開始說的 延遲隊列 似乎并沒有太大關系,莫急~接下來說到的 TTL(Time-To-Live) 就是用來處理延遲消息的~!
在 TTL 的概念中,如果一個隊列中的消息 TTL 結束后仍未被消費,那么這個消息就會自動變為死信,而 TTL 超時情況分為兩種:
- 消息所在的隊列設置了存活時間
- 消息本身設置了存活時間
我們同樣進行上述 條件2 的模擬場景
- 1、聲明死信交換機與死信隊列(上述已完成)
- 2、聲明延遲隊列并指定死信交換機
同樣控制臺查看創建結果,并且我們發現不止有 DLX 和 DLK 標志,還多了個 TTL ,說明該隊列是延遲隊列
- 3、模擬消費超時情況
我們往延遲隊列中發送一條消息,并且沒有消費者進行消費,等待 1 分鐘后查看是否能進入 死信隊列 中
我們已經發送了一條消息到延遲隊列并且一分鐘后也成功在控制臺發現了這條信息已經進入到了死信交換機
- 2021-11-07 00:01:30.854 INFO 32752 --- [ntContainer#1-1] c.l.m.c.listener.SpringRabbitListener : 死信交換機 | 接收到的消息 : test ttl-message
以上是配置了隊列超時時間,消息本身自然也能配置超時時間,當 消息 和 隊列 都存在超時時間時,那么就以最短的 TTL 為準,消息的超時配置如下:
如上圖所示,我們可以利用 Message 這個類來傳遞消息信息,并設置上超時時間,我們設置的是 5000 ms,等待發送成功后,控制臺過5000 ms 也成功打印了死信交換機消費的消息:
- 2021-11-07 00:03:09.048 INFO 39996 --- [ntContainer#1-1] c.l.m.c.listener.SpringRabbitListener : 死信交換機 | 接收到的消息 : this is a ttl message
3)延遲隊列
我們上述是使用 死信交換機 來間接實現 延遲隊列 的效果,但實際在 RabbitMQ 不必如此麻煩,RabbitMQ 已經為我們封裝好了插件,我們只需要下載安裝即可~
RabbitMQ 插件下載地址
我們進入地址可以發現有許多插件,搜索 delay 關鍵字找到我們需要的插件進行下載
下載完后直接上傳到 RabbitMQ 的插件目錄 - plugins,小菜這邊是使用 docker 臨時安裝測試的,所以已經將該插件目錄掛載出來了:
- docker run -itd --name rabbitmq -v plugins:/plugins -p 15672:15672 -p 5672:5672 rabbitmq:management
因此我這邊直接將插件上傳到容器中的 plugins 目錄即可~
然后進入到容器中執行以下命令進行插件開啟
- rabbitmq-plugins enable rabbitmq_delayed_message_exchange
并且我們在控制臺創建交換機的時候可以看到 type 類型多了個選項
成功執行到這步就說明已經開啟了 RabbitMQ 的延遲隊列功能
那接下來我們就可以來使用 DelayExchange,首先我們需要了解代碼的方式創建延遲交換機:
方式1
方式2
當我們萬事具備之后就可以來發送消息了
在發送消息的時候,消息頭中一定要攜帶上 x-delay 參數,指定上延遲時間
通過這樣配置之后,我們可以在控制臺看到,經過10秒后 delay.queue 才收到對應消息,然后被對應消費者消費
3)總結
我們上面從 死信交換機 到 TTL 到 延遲隊列,一步步認識了如何實現延遲消息的功能,然后我們進行一個小小的總結:
問題1:什么樣的消息會成為死信?
消息被消費者 reject 或返回 nack
消息超時未及時消費
消息隊列滿了
問題2:消息超時的方式
給隊列設置 TTL 屬性
給消息設置 TTL 屬性
問題3:如何使用延遲隊列
下載并啟用 RabbitMQ 延遲隊列插件
聲明一個交換機,并將 delayed 屬性設置為 true
發送消息時,添加 x-delay 頭,值為超時時間
問題4:延遲隊列的使用場景
延遲發送短信通知
訂單自動取消
庫存自動回滾
二、惰性隊列
講完延遲隊列,我們繼續來認識惰性隊列
講惰性隊列之前,我們先拋出一個問題~
RabbitMQ 如何解決消息堆積問題
什么情況下會出現消息堆積問題?
- 當生產者生產速度遠遠消費者消費速度
- 當消費者宕機沒有及時重啟
那么如何解決這個問題?通常思路如下:
- 在消費者機器重啟后,增加更多的消費者進行處理
- 在消費者處理邏輯內部開辟線程池,利用多線程的方式提高處理速度
- 擴大隊列的容量,提高堆積上限
這幾個方式從理論上來說解決消息堆積問題也是沒有問題的,但是處理方式不夠優雅甚至不夠靈活~ 那么除了以上的幾種解決方式,我們可以利用 RabbitMQ 中自帶的一種隊列類型 -- 惰性隊列
什么是惰性隊列?我們認識一下惰性隊列的幾個特性:
- 接收到消息后直接存入磁盤而非內存
- 消費者要消費消息時才會從磁盤中讀取并加載到內存中
- 它支持百萬級消息的存儲
說到底,就是利用磁盤的緩沖機制,而這種機制的缺點就是消息的時效性會降低,性能受限于磁盤的IO,認識特性和缺點之后,我們便來看看如何創建惰性隊列
方式1
方式2
方式3
該方式是直接基于命令行修改將一個正在運行中的隊列修改為惰性隊列
- rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
其中幾個命令參數含義如下:
- rabbitmqctl:命令行工具
- set_policy:添加一個策略
- Lazy:策略名稱,可以自定義
- ^lazy-queue$:用正則表達式匹配隊列的名稱
- '{"queue-mode":"lazy"}':設置隊列為 lazy 模式
- --apply-to queues:策略的作用對象,是所有的隊列
這種惰性隊列的方式盡管缺點是消息時效性會降低,但是在某些場景下也不是不能接受,何況它的優點同樣明顯:
- 基于磁盤存儲,消息上限高
- 沒有間歇性的 page-out,性能穩定
到這里,我們就已經講述了 RabbitMQ 的常見問題,對于我們來說,普通的開發場景可能比較少遇到這些問題,但是沒遇到不等于沒有,所以我們還是需要多認識來防患于未然!