三分鐘白話RocketMQ系列—— 如何保證消息不丟失
回顧上一篇核心概念,我們知道RocketMQ的消息模型分為 生產、存儲(消息堆積)、消費 三大部分。
消息模型三大部分
因此,如何保證消息不丟失,也是從這三個環節來考慮。
關鍵字摘要
- 生產、存儲(消息堆積)、消費 三個環節保證消息不丟失
- 生產環節:消息類型,消息確認機制、失敗重試機制
- 存儲環節:同步/異步刷盤、同步/異步復制slave
- 消費環節:消息確認機制(至少消費成功一次)、失敗重試機制、死信隊列機制
Q1: 如何保證「消息生產」不丟失?
先想想什么情況下,消息生產會丟失消息呢?
生產者將發送消息時,如果出現了網絡抖動或者通信異常等問題,消息就有可能會丟失。
那怎么解決這個問題?
其實思路是比較直接的,就是 「消息確認機制」和「失敗重試機制」。
消息發送成功返回確認消息,那就能確保消息不丟失。如果發送失敗了,mq-client就嘗試自動重試,避免網絡抖動導致發送丟失。
如果超過一定超時時間還是失敗,那就拋出異常,由開發者自己在應用層面進行處理,手動重試發送 或者 記錄失敗消息后續補償。
不過我們需要特別注意是,RocketMQ支持多種「消息類型」,但是并不是對所有「消息類型」 都會有「消息確認機制」和「失敗重試機制」。
RocketMQ生產消息時,支持多種「消息類型」和「消息發送模式」。咱們白話為主,就不展開源碼了,有興趣同學可以參考org.apache.rocketmq.client.producer.MQProducer這個接口定義即可。
消息類型:
- 普通消息:發送普通消息,異常時默認重試。
- 普通有序消息:發送普通有序消息,通過指定「消息篩選器selector」,動態決定發送哪個隊列。異常默認不重試,可以用戶自己重試,并發送到其他隊列。
- 嚴格有序消息:發送嚴格有序消息,通過指定隊列,保證嚴格有序,異常默認不重試。
消息發送模式:
- 同步:調用發送消息方法后,同步阻塞,直到返回SendResult。配置retryTimesWhenSendFailed重試次數。
- 異步:調用發送消息方法后,立即返回,發送結果會通過開發者自己注冊的回調函數SendCallback進行處理。配置retryTimesWhenSendAsyncFailed重試次數。
- 單向發送:這種方法完全不關心發送后的返回結果。顯然,它具有最大吞吐量,但也存在消息丟失的潛在風險。
消息類型 和 消息發送模式 是 N * M 的關系,所以聰明的你一定已經想到了,存在9種不同組合,RocketMQ也是定義了9種不同接口方法。
這9種方法里面,涉及到「單向發送」模式的3種方法,都是不可靠的,存在丟失消息的風險。其他發送消息的模式和消息類型,可以通過 消息確認、mq-client自動「失敗重試機制」、業務自定義重試 等方式,確保消息發送不丟失。
注意,org.apache.rocketmq.client.producer.MQProducer還定義了「事務消息」的發送模式,是屬于分布式事務范疇了,跟我們這里討論的消息不丟失不太一樣,就不展開討論了。后面單獨寫一篇針對「事務消息」的分析。
Q2: 如何保證「消息存儲」不丟失?
先想想什么情況下,消息存儲會丟失呢?
場景1,消息保存到內存中,還沒來得及刷盤到磁盤,機器宕機或者重啟,導致內存中消息丟失。場景2,為了提高可用性,Broker通常采用一主多從的部署方式,為了確保消息不丟失,消息需要被復制到從節點。當消息發送到master但是還沒同步到slave broker時,master broker磁盤損壞,導致消息數據丟失。或者master宕機,consumer切換到slave消費數據,消息丟失。
針對場景1,默認情況下,消息在到達 Broker 端后會首先被保存在內存中,并立即向生產者返回確認響應。隨后,Broker 會定期批量將一組消息異步刷入磁盤。這種方式減少了 I/O 操作次數,提高了性能。
然而,如果發生機器掉電、異常宕機等情況,未及時將消息刷入磁盤,就可能導致消息丟失的情況。
如果要確保 Broker 端不丟失消息并保證消息的可靠性,我們需要修改消息保存機制為同步刷盤方式,即只有當消息成功存儲到磁盤后才返回響應。可以通過flushDiskType = SYNC_FLUSH 參數進行控制。
針對場景2,在默認方式下,當消息成功寫入主節點時,就會返回確認響應給生產者,并異步將消息復制到從節點。然而,如果主節點突然宕機且無法恢復,尚未復制到從節點的消息將會丟失。
為了進一步提高消息的可靠性,我們可以采用同步復制方式。主節點將會同步等待從節點完成復制,然后才返回確認響應。這樣可以確保消息的可靠性。可以通過brokerRole=SYNC_MASTER參數進行控制。
注意,同步刷盤 和 同步復制 雖然能夠保證消息不丟失,但是會嚴重降低性能,生產實踐中需要根據實際情況綜合評估。
Q3: 如何保證「消息消費」不丟失?
先想想什么情況下,消息存儲會丟失呢?
因為各種原因消費失敗,但是還是提交了消費位點,這條消息從業務角度來說就“丟失”了。
那怎么解決這個問題?
跟消息生產一樣,其實思路是比較直接的,就是 「消息確認機制」和「失敗重試機制」。
消費者從RocketMQ拉取消息后,需要返回"CONSUME_SUCCESS"來表示業務方已經正常完成消費。只有返回"CONSUME_SUCCESS"才算作消費完成。這就是消費時的「消息確認機制」。
如果返回"CONSUME_LATER",則會按照不同的消息延遲級別進行再次消費,延遲級別從秒到小時不等,最長延遲時間為2個小時后再次嘗試消費。這就是消費時的「失敗重試機制」。
重試消息會被存入名為 "%RETRY%+消費組名稱" 的Topic中,原始主題Topic會存入屬性中。然后會基于定時任務機制,在到期時將任務再次拉取出來。
另外,RocketMQ跟kafka不同的是,天然支持了 「死信隊列機制」。
如果在嘗試消費的過程中達到了最大重試次數(通常為16次),仍然無法成功消費,則消息將被發送到死信隊列,以確保消息存儲的可靠性。后續業務可以根據死信隊列,來做相關補償措施。
關鍵字總結
- 生產、存儲(消息堆積)、消費 三個環節保證不丟失
- 生產環節:消息類型,消息確認機制、失敗重試機制
- 存儲環節:同步/異步刷盤、同步/異步復制slave
- 消費環節:消息確認機制(至少消費成功一次)、失敗重試機制、死信隊列機制
3分鐘到了嗎?應該對RocketMQ如何生產消息有全面了解了吧。如果還想了解更多,歡迎關注下一期內容。