成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

RocketMQ消息重試機制解析!

開發 架構
由于網絡抖動、服務宕機等一些不確定的因素,RocketMQ在發送消息的時候很有可能出現消息發送或者消費失敗的問題。

由于網絡抖動、服務宕機等一些不確定的因素,RocketMQ在發送消息的時候很有可能出現消息發送或者消費失敗的問題。

所以RocketMQ消息重試分為2種:

?Producer端重試和Consumer端重試。

Producer端重試

?生產者端的消息失敗,也就是Producer往MQ上發消息沒有發送成功。

  • 比如網絡抖動致使生產者發送消息到MQ失敗。

這種消息失敗重試可以手動設置發送失敗重試的次數。

producer.setRetryTimesWhenSendFailed(3);

官方說明

?Producer的send方法本身支持內部重試。

重試邏輯:

  • 默認至多重試2次。
  • 這個方法的總耗時時間不超過sendMsgTimeout設置的值,默認10s。

如果本身向Broker發送消息產生超時異常,就不會再重試。

  • 以上策略也是在一定程度上保證了消息可以發送成功。

如果業務對消息可靠性要求比較高,建議增加相應的重試邏輯:

  • 比如調用send同步方法發送失敗時,則嘗試將消息存儲到DB。
  • 然后由后臺線程定時重試,確保消息一定到達Broker。

重試策略

消息發送重試有三種策略:

?同步發送失敗策略、異步發送失敗策略和消息刷盤失敗策略。

同步發送失敗策略:

?普通消息,消息發送默認采用round-robin策略(輪轉)來選擇所發送到的隊列。

  • 如果發送失敗,默認重試2次。

但在重試時是不會選擇上次發送失敗的Broker,而是選擇其它Broker。

DefaultMQProducer producer = new DefaultMQProducer("pg");
// 設置同步發送失敗時重試發送的次數,默認為2次
producer.setRetryTimesWhenSendFailed(3);
// 設置發送超時時限為5s,默認10s
producer.setSendMsgTimeout(5000);

異步發送失敗策略:

?異步發送失敗重試時,異步重試不會選擇其他Broker,僅在當前Broker上做重試。

  • 所以該策略無法保證消息不丟失。
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
// 指定異步發送失敗后不進行重試發送
producer.setRetryTimesWhenSendAsyncFailed(0);

消息刷盤失敗策略:

?消息刷盤超時,默認是不會將消息嘗試發送到其他Broker。

對于重要消息可以通過在Broker的配置文件設置retryAnotherBrokerWhenNotStoreOK屬性為true來開啟。

幾種情況

異步發送在發送過程中出現異常進行重試:

?在解析請求結果時,發現響應狀態碼有其它異常(消息可能未正確被Broker處理)會繼續進行重試。

  • 重試依然選擇當前Broker。

但是如果響應結果不為空的話,即使處理響應時發生異常也不會進行重試。

同步發送時:

?如果發送過程中沒有異常,但是發送結果不OK,也會選擇另一個Broker繼續進行重試。

順序消息發送失敗不進行重試:

?順序消息:指的是同步+指定消息隊列的方式發送。

Consumer端重試

消息正常的到了消費者,結果消費者發生異常,處理失敗了。

?例如反序列化失敗,消息數據本身無法處理等。

順序消息

順序消息的消費重試

?順序消息,當Consumer消費消息失敗后,為了保證消息的順序性,其會自動不斷地進行消息重試,直到消費成功。

  • 消費重試默認間隔時間為1000ms。

重試期間應用會出現消息消費被阻塞的情況。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 順序消息消費失敗的消費重試時間間隔,單位毫秒,默認為1000,其取值范圍為[10, 30000]
consumer.setSuspendCurrentQueueTimeMillis(100);

由于對順序消息的重試是無休止的,不間斷的,直至消費成功。

  • 所以,對于順序消息的消費,務必要保證應用能夠及時監控并處理消費失敗的情況,避免消費被永久性阻塞。

?注意:順序消息沒有發送失敗重試機制,但具有消費失敗重試機制。

消費狀態

?順序消費目前兩個狀態:SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT。

SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暫停消費一下:

  • 過SuspendCurrentQueueTimeMillis時間間隔后再重試一下,而不是放到重試隊列里。
public enum ConsumeOrderlyStatus {
    SUCCESS,
    
    @Deprecated
    ROLLBACK,
    
    @Deprecated
    COMMIT,
    
    SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

并發消息

并發消息的消費重試

?在并發消費中,可能會有多個線程同時消費一個隊列的消息。

因此即使發送端通過發送順序消息保證消息在同一個隊列中按照FIFO的順序,也無法保證消息實際被順序消費。

  • 所有并發消費也可以稱之為無序消費。

對于無序消息(普通消息、延時消息、事務消息):

  • 當Consumer消費消息失敗時,可以通過設置返回狀態達到消息重試的效果。

注意:

?無序消息的重試只針對集群消費模式生效。

廣播消費模式不提供失敗重試特性:即消費失敗后,失敗消息不再重試,繼續消費新的消息。

消費狀態

Consumer端消息消費有兩種狀態:

?一個是成功(CONSUME_SUCCESS),一個是失敗&稍后重試(RECONSUME_LATER)。

Consumer為了保證消息消費成功,只有使用方明確表示消費成功。

  • 返回CONSUME_SUCCESS,RocketMQ才會認為消息消費成功。

若是消息消費失敗,只要返回:ConsumeConcurrentlyStatus.RECONSUME_LATER。

  • RocketMQ就會認為消息消費失敗了,要重新投遞。
public enum ConsumeConcurrentlyStatus {
    CONSUME_SUCCESS,
    RECONSUME_LATER;   
}

重試機制

?為了保證消息是確定被至少消費成功一次,RocketMQ會把這批消息重發回Broker。

  • Topic不是原Topic而是一個RETRY Topic。

在延遲的某個時間點(默認是10秒,業務可設置)后,再次投遞。

?而若是一直這樣重復消費都持續失敗到必定次數(默認16次),就會投遞到死信隊列

在啟動Broker的過程當中,能夠觀察到以下輸出:

2024-09-19 16:29:58 INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

RECONSUME_LATER策略:

?若是消費失敗,那么1S后再次消費,若是失敗,那么5S后,再次消費,…… 直至2H后若是消費還失敗。

  • 那么該條消息就會終止發送給消費者了。

消息重試間隔時間如下:

重試次數

與上次重試的間隔時間

重試次數

與上次重試的間隔時間

1

10秒

9

7分鐘

2

30秒

10

8分鐘

3

1分鐘

11

9分鐘

4

2分鐘

12

10分鐘

5

3分鐘

13

20分鐘

6

4分鐘

14

30分鐘

7

5分鐘

15

1小時

8

6分鐘

16

2小時

?某條消息在一直消費失敗的前提下,將會在接下來的4小時46分鐘之內進行16次重試。

  • 超過這個時間范圍消息將不再重試投遞,而被投遞至死信隊列

修改消費重試次數:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 修改消費重試次數
consumer.setMaxReconsumeTimes(10);

基本原理

?重試的 Message,RocketMQ 的做法并不是將其投遞回原 Topic重試隊列。

每個 ConsumerGroup 都有自己的重試隊列:

  • 其名稱是由特定的前綴拼接上 ConsumerGroup 所組成,默認 %RETRY%+消費者組名稱。
  • 所以在 Consumer 啟動時,就會同時消費其 ConsumerGroup 對應的重試隊列和普通隊列。

消費失敗的 Message,Consumer 會將其投回 Broker:

  • 相當于這條 Message 已經被消費掉了,之后重試的只是內容相同、但實際不是同一條的 Message。
  • 然后會校驗重試的次數,如果達到16次則會進入死信隊列 ,組成為 %DLQ%+消費者組名稱。
  • 未達到最大重試次數,則會根據重試間隔時間等級將其投遞到延遲隊列SCHEDULE_TOPIC_XXXX中。
  • 然后等到了延遲等級對應的時間之后,再投遞到 ConsumerGroup 所對應的重試隊列當中,供后續消費。

消息重復

如果消費端收到兩條一樣的消息,應該怎樣處理?

《RocketMQ 原理簡介》中講到:

?RocketMQ 無法避免消息重復。

所以如果業務對消費重復非常敏感,務必要在業務側去重,有以下幾種去重方式:

?

消費端處理消息的業務邏輯保持冪等性。

  • 如何保證冪等性,可以看我之前的文章!

保證每條消息都有唯一編號且保證消息處理成功與去重表的日志同時出現。

  • 利用一張日志表來記錄已經處理成功的消息的ID。
  • 如果新到的消息ID已經在日志表中,那么就不再處理這條消息。
責任編輯:姜華 來源: 月伴飛魚
相關推薦

2025-01-03 08:44:37

kafka消息發送策略

2022-11-14 08:19:59

重試機制Kafka

2022-05-06 07:44:10

微服務系統設計重試機制

2017-06-16 15:16:15

2021-02-20 10:02:22

Spring重試機制Java

2017-07-02 16:50:21

2025-02-26 10:49:14

2020-07-19 15:39:37

Python開發工具

2023-10-27 08:20:12

springboot微服務

2023-11-27 07:44:59

RabbitMQ機制

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2024-10-29 08:34:27

RocketMQ消息類型事務消息

2024-11-14 09:10:13

消費者RocketMQ負載均衡

2023-11-06 08:00:38

接口高可用機制

2025-05-28 01:15:00

Golang重試機制

2024-08-22 18:49:23

2025-04-18 03:00:00

2024-01-04 18:01:55

高并發SpringBoot

2017-12-18 11:09:45

消息轉發DemoPerson

2022-06-13 11:05:35

RocketMQ消費者線程
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 综合自拍 | 国产精品v | 午夜私人影院 | 超碰97免费在线 | 91www在线观看 | 99久久久无码国产精品 | 久久99视频| 日本精品国产 | 蜜臀久久 | 久久99国产精品 | 亚洲成人午夜在线 | 日韩视频在线免费观看 | 精品国产一区二区三区av片 | 欧美一级二级在线观看 | 一区二区三区四区在线 | 日韩精品一区二区三区 | 91成人在线视频 | 日韩一区二区三区在线观看 | 亚洲精品一区在线观看 | 国产成人在线免费 | 性色视频在线观看 | 日本久久福利 | 日韩精品极品视频在线观看免费 | 欧美精品在线一区二区三区 | 四虎av电影 | 91看片在线观看 | 国产乱码精品一品二品 | 国产精品成人一区二区三区夜夜夜 | 欧美成人免费电影 | 亚洲人成人一区二区在线观看 | 91av精品| 欧美日韩成人影院 | 亚洲综合色网站 | 欧美日韩一卡 | 高清亚洲 | 一区二区三区视频 | 精品视频在线免费观看 | 三级成人片 | 国产精品免费在线 | 欧美亚洲视频 | 粉嫩在线 |