RocketMQ消息重試機制解析!
由于網絡抖動、服務宕機等一些不確定的因素,RocketMQ在發送消息的時候很有可能出現消息發送或者消費失敗的問題。
所以RocketMQ消息重試分為2種:
?Producer端重試和Consumer端重試。
Producer端重試
?生產者端的消息失敗,也就是Producer往MQ上發消息沒有發送成功。
- 比如網絡抖動致使生產者發送消息到MQ失敗。
這種消息失敗重試可以手動設置發送失敗重試的次數。
producer.setRetryTimesWhenSendFailed(3);
官方說明
?Producer的send方法本身支持內部重試。
重試邏輯:
- 默認至多重試2次。
- 這個方法的總耗時時間不超過sendMsgTimeout設置的值,默認10s。
如果本身向Broker發送消息產生超時異常,就不會再重試。
- 以上策略也是在一定程度上保證了消息可以發送成功。
如果業務對消息可靠性要求比較高,建議增加相應的重試邏輯:
- 比如調用send同步方法發送失敗時,則嘗試將消息存儲到DB。
- 然后由后臺線程定時重試,確保消息一定到達Broker。
重試策略
消息發送重試有三種策略:
?同步發送失敗策略、異步發送失敗策略和消息刷盤失敗策略。
同步發送失敗策略:
?普通消息,消息發送默認采用round-robin策略(輪轉)來選擇所發送到的隊列。
- 如果發送失敗,默認重試2次。
但在重試時是不會選擇上次發送失敗的Broker,而是選擇其它Broker。
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 設置同步發送失敗時重試發送的次數,默認為2次
producer.setRetryTimesWhenSendFailed(3);
// 設置發送超時時限為5s,默認10s
producer.setSendMsgTimeout(5000);
異步發送失敗策略:
?異步發送失敗重試時,異步重試不會選擇其他Broker,僅在當前Broker上做重試。
- 所以該策略無法保證消息不丟失。
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
// 指定異步發送失敗后不進行重試發送
producer.setRetryTimesWhenSendAsyncFailed(0);
消息刷盤失敗策略:
?消息刷盤超時,默認是不會將消息嘗試發送到其他Broker。
對于重要消息可以通過在Broker的配置文件設置retryAnotherBrokerWhenNotStoreOK屬性為true來開啟。
幾種情況
異步發送在發送過程中出現異常進行重試:
?在解析請求結果時,發現響應狀態碼有其它異常(消息可能未正確被Broker處理)會繼續進行重試。
- 重試依然選擇當前Broker。
但是如果響應結果不為空的話,即使處理響應時發生異常也不會進行重試。
同步發送時:
?如果發送過程中沒有異常,但是發送結果不OK,也會選擇另一個Broker繼續進行重試。
順序消息發送失敗不進行重試:
?順序消息:指的是同步+指定消息隊列的方式發送。
Consumer端重試
消息正常的到了消費者,結果消費者發生異常,處理失敗了。
?例如反序列化失敗,消息數據本身無法處理等。
順序消息
順序消息的消費重試
?順序消息,當Consumer消費消息失敗后,為了保證消息的順序性,其會自動不斷地進行消息重試,直到消費成功。
- 消費重試默認間隔時間為1000ms。
重試期間應用會出現消息消費被阻塞的情況。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 順序消息消費失敗的消費重試時間間隔,單位毫秒,默認為1000,其取值范圍為[10, 30000]
consumer.setSuspendCurrentQueueTimeMillis(100);
由于對順序消息的重試是無休止的,不間斷的,直至消費成功。
- 所以,對于順序消息的消費,務必要保證應用能夠及時監控并處理消費失敗的情況,避免消費被永久性阻塞。
?注意:順序消息沒有發送失敗重試機制,但具有消費失敗重試機制。
消費狀態
?順序消費目前兩個狀態:SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT。
SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暫停消費一下:
- 過SuspendCurrentQueueTimeMillis時間間隔后再重試一下,而不是放到重試隊列里。
public enum ConsumeOrderlyStatus {
SUCCESS,
@Deprecated
ROLLBACK,
@Deprecated
COMMIT,
SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
并發消息
并發消息的消費重試
?在并發消費中,可能會有多個線程同時消費一個隊列的消息。
因此即使發送端通過發送順序消息保證消息在同一個隊列中按照FIFO的順序,也無法保證消息實際被順序消費。
- 所有并發消費也可以稱之為無序消費。
對于無序消息(普通消息、延時消息、事務消息):
- 當Consumer消費消息失敗時,可以通過設置返回狀態達到消息重試的效果。
注意:
?無序消息的重試只針對集群消費模式生效。
廣播消費模式不提供失敗重試特性:即消費失敗后,失敗消息不再重試,繼續消費新的消息。
消費狀態
Consumer端消息消費有兩種狀態:
?一個是成功(CONSUME_SUCCESS),一個是失敗&稍后重試(RECONSUME_LATER)。
Consumer為了保證消息消費成功,只有使用方明確表示消費成功。
- 返回CONSUME_SUCCESS,RocketMQ才會認為消息消費成功。
若是消息消費失敗,只要返回:ConsumeConcurrentlyStatus.RECONSUME_LATER。
- RocketMQ就會認為消息消費失敗了,要重新投遞。
public enum ConsumeConcurrentlyStatus {
CONSUME_SUCCESS,
RECONSUME_LATER;
}
重試機制
?為了保證消息是確定被至少消費成功一次,RocketMQ會把這批消息重發回Broker。
- Topic不是原Topic而是一個RETRY Topic。
在延遲的某個時間點(默認是10秒,業務可設置)后,再次投遞。
?而若是一直這樣重復消費都持續失敗到必定次數(默認16次),就會投遞到死信隊列。
在啟動Broker的過程當中,能夠觀察到以下輸出:
2024-09-19 16:29:58 INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
RECONSUME_LATER策略:
?若是消費失敗,那么1S后再次消費,若是失敗,那么5S后,再次消費,…… 直至2H后若是消費還失敗。
- 那么該條消息就會終止發送給消費者了。
消息重試間隔時間如下:
重試次數 | 與上次重試的間隔時間 | 重試次數 | 與上次重試的間隔時間 |
1 | 10秒 | 9 | 7分鐘 |
2 | 30秒 | 10 | 8分鐘 |
3 | 1分鐘 | 11 | 9分鐘 |
4 | 2分鐘 | 12 | 10分鐘 |
5 | 3分鐘 | 13 | 20分鐘 |
6 | 4分鐘 | 14 | 30分鐘 |
7 | 5分鐘 | 15 | 1小時 |
8 | 6分鐘 | 16 | 2小時 |
?某條消息在一直消費失敗的前提下,將會在接下來的4小時46分鐘之內進行16次重試。
- 超過這個時間范圍消息將不再重試投遞,而被投遞至死信隊列。
修改消費重試次數:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 修改消費重試次數
consumer.setMaxReconsumeTimes(10);
基本原理
?重試的 Message,RocketMQ 的做法并不是將其投遞回原 Topic重試隊列。
每個 ConsumerGroup 都有自己的重試隊列:
- 其名稱是由特定的前綴拼接上 ConsumerGroup 所組成,默認 %RETRY%+消費者組名稱。
- 所以在 Consumer 啟動時,就會同時消費其 ConsumerGroup 對應的重試隊列和普通隊列。
消費失敗的 Message,Consumer 會將其投回 Broker:
- 相當于這條 Message 已經被消費掉了,之后重試的只是內容相同、但實際不是同一條的 Message。
- 然后會校驗重試的次數,如果達到16次則會進入死信隊列 ,組成為 %DLQ%+消費者組名稱。
- 未達到最大重試次數,則會根據重試間隔時間等級將其投遞到延遲隊列SCHEDULE_TOPIC_XXXX中。
- 然后等到了延遲等級對應的時間之后,再投遞到 ConsumerGroup 所對應的重試隊列當中,供后續消費。
消息重復
如果消費端收到兩條一樣的消息,應該怎樣處理?
《RocketMQ 原理簡介》中講到:
?RocketMQ 無法避免消息重復。
所以如果業務對消費重復非常敏感,務必要在業務側去重,有以下幾種去重方式:
?
消費端處理消息的業務邏輯保持冪等性。
- 如何保證冪等性,可以看我之前的文章!
保證每條消息都有唯一編號且保證消息處理成功與去重表的日志同時出現。
- 利用一張日志表來記錄已經處理成功的消息的ID。
- 如果新到的消息ID已經在日志表中,那么就不再處理這條消息。