成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

四種策略確保 RabbitMQ 消息發送可靠性!你用哪種?

開發 架構
微服務可以設計成消息驅動的微服務,響應式系統也可以基于消息中間件來做,從這個角度來說,在互聯網應用開發中,消息中間件真的是太重要了。

[[416528]]

微服務可以設計成消息驅動的微服務,響應式系統也可以基于消息中間件來做,從這個角度來說,在互聯網應用開發中,消息中間件真的是太重要了。

今天,以 RabbitMQ 為例,松哥來和大家聊一聊消息中間消息發送可靠性的問題。

注意,以下內容我主要和大家討論如何確保消息生產者將消息發送成功,并不涉及消息消費的問題。

1. RabbitMQ 消息發送機制

大家知道,RabbitMQ 中的消息發送引入了 Exchange(交換機)的概念,消息的發送首先到達交換機上,然后再根據既定的路由規則,由交換機將消息路由到不同的 Queue(隊列)中,再由不同的消費者去消費。

大致的流程就是這樣,所以要確保消息發送的可靠性,主要從兩方面去確認:

  • 消息成功到達 Exchange
  • 消息成功到達 Queue

如果能確認這兩步,那么我們就可以認為消息發送成功了。

如果這兩步中任一步驟出現問題,那么消息就沒有成功送達,此時我們可能要通過重試等方式去重新發送消息,多次重試之后,如果消息還是不能到達,則可能就需要人工介入了。

經過上面的分析,我們可以確認,要確保消息成功發送,我們只需要做好三件事就可以了:

  • 確認消息到達 Exchange。
  • 確認消息到達 Queue。
  • 開啟定時任務,定時投遞那些發送失敗的消息。

2. RabbitMQ 的努力

上面提出的三個步驟,第三步需要我們自己實現,前兩步 RabbitMQ 則有現成的解決方案。

如何確保消息成功到達 RabbitMQ?RabbitMQ 給出了兩種方案:

  • 開啟事務機制
  • 發送方確認機制

這是兩種不同的方案,不可以同時開啟,只能選擇其中之一,如果兩者同時開啟,則會報如下錯誤:

我們分別來看。以下所有案例都在 Spring Boot 中展開,文末可以下載相關源碼。

2.1 開啟事務機制

Spring Boot 中開啟 RabbitMQ 事務機制的方式如下:

首先需要先提供一個事務管理器,如下:

  1. @Bean 
  2. RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) { 
  3.     return new RabbitTransactionManager(connectionFactory); 

接下來,在消息生產者上面做兩件事:添加事務注解并設置通信信道為事務模式:

  1. @Service 
  2. public class MsgService { 
  3.     @Autowired 
  4.     RabbitTemplate rabbitTemplate; 
  5.  
  6.     @Transactional 
  7.     public void send() { 
  8.         rabbitTemplate.setChannelTransacted(true); 
  9.         rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes()); 
  10.         int i = 1 / 0; 
  11.     } 

這里注意兩點:

  • 發送消息的方法上添加 @Transactional 注解標記事務。
  • 調用 setChannelTransacted 方法設置為 true 開啟事務模式。

這就 OK 了。

在上面的案例中,我們在結尾來了個 1/0 ,這在運行時必然拋出異常,我們可以嘗試運行該方法,發現消息并未發送成功。

當我們開啟事務模式之后,RabbitMQ 生產者發送消息會多出四個步驟:

  • 客戶端發出請求,將信道設置為事務模式。
  • 服務端給出回復,同意將信道設置為事務模式。
  • 客戶端發送消息。
  • 客戶端提交事務。
  • 服務端給出響應,確認事務提交。

上面的步驟,除了第三步是本來就有的,其他幾個步驟都是平白無故多出來的。所以大家看到,事務模式其實效率有點低,這并非一個最佳解決方案。我們可以想想,什么項目會用到消息中間件?一般來說都是一些高并發的項目,這個時候并發性能尤為重要。

所以,RabbitMQ 還提供了發送方確認機制(publisher confirm)來確保消息發送成功,這種方式,性能要遠遠高于事務模式,一起來看下。

2.2 發送方確認機制

2.2.1 單條消息處理

首先我們移除剛剛關于事務的代碼,然后在 application.properties 中配置開啟消息發送方確認機制,如下:

  1. spring.rabbitmq.publisher-confirm-type=correlated 
  2. spring.rabbitmq.publisher-returns=true 

第一行是配置消息到達交換器的確認回調,第二行則是配置消息到達隊列的回調。

第一行屬性的配置有三個取值:

  • none:表示禁用發布確認模式,默認即此。
  • correlated:表示成功發布消息到交換器后會觸發的回調方法。
  • simple:類似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的調用。

接下來我們要開啟兩個監聽,具體配置如下:

  1. @Configuration 
  2. public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { 
  3.     public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name"
  4.     public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name"
  5.     private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class); 
  6.     @Autowired 
  7.     RabbitTemplate rabbitTemplate; 
  8.     @Bean 
  9.     Queue queue() { 
  10.         return new Queue(JAVABOY_QUEUE_NAME); 
  11.     } 
  12.     @Bean 
  13.     DirectExchange directExchange() { 
  14.         return new DirectExchange(JAVABOY_EXCHANGE_NAME); 
  15.     } 
  16.     @Bean 
  17.     Binding binding() { 
  18.         return BindingBuilder.bind(queue()) 
  19.                 .to(directExchange()) 
  20.                 .with(JAVABOY_QUEUE_NAME); 
  21.     } 
  22.  
  23.     @PostConstruct 
  24.     public void initRabbitTemplate() { 
  25.         rabbitTemplate.setConfirmCallback(this); 
  26.         rabbitTemplate.setReturnsCallback(this); 
  27.     } 
  28.  
  29.     @Override 
  30.     public void confirm(CorrelationData correlationData, boolean ack, String cause) { 
  31.         if (ack) { 
  32.             logger.info("{}:消息成功到達交換器",correlationData.getId()); 
  33.         }else
  34.             logger.error("{}:消息發送失敗", correlationData.getId()); 
  35.         } 
  36.     } 
  37.  
  38.     @Override 
  39.     public void returnedMessage(ReturnedMessage returned) { 
  40.         logger.error("{}:消息未成功路由到隊列",returned.getMessage().getMessageProperties().getMessageId()); 
  41.     } 

關于這個配置類,我說如下幾點:

  • 定義配置類,實現 RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 兩個接口,這兩個接口,前者的回調用來確定消息到達交換器,后者則會在消息路由到隊列失敗時被調用。
  • 定義 initRabbitTemplate 方法并添加 @PostConstruct 注解,在該方法中為 rabbitTemplate 分別配置這兩個 Callback。

這就可以了。

接下來我們對消息發送進行測試。

首先我們嘗試將消息發送到一個不存在的交換機中,像下面這樣:

  1. rabbitTemplate.convertAndSend("RabbitConfig.JAVABOY_EXCHANGE_NAME",RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString())); 

注意第一個參數是一個字符串,不是變量,這個交換器并不存在,此時控制臺會報如下錯誤:

接下來我們給定一個真實存在的交換器,但是給一個不存在的隊列,像下面這樣:

  1. rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString())); 

注意此時第二個參數是一個字符串,不是變量。

可以看到,消息雖然成功達到交換器了,但是沒有成功路由到隊列(因為隊列不存在)。

這是一條消息的發送,我們再來看看消息的批量發送。

2.2.2 消息批量處理

如果是消息批量處理,那么發送成功的回調監聽是一樣的,這里不再贅述。

這就是 publisher-confirm 模式。

相比于事務,這種模式下的消息吞吐量會得到極大的提升。

3. 失敗重試

失敗重試分兩種情況,一種是壓根沒找到 MQ 導致的失敗重試,另一種是找到 MQ 了,但是消息發送失敗了。

兩種重試我們分別來看。

3.1 自帶重試機制

前面所說的事務機制和發送方確認機制,都是發送方確認消息發送成功的辦法。如果發送方一開始就連不上 MQ,那么 Spring Boot 中也有相應的重試機制,但是這個重試機制就和 MQ 本身沒有關系了,這是利用 Spring 中的 retry 機制來完成的,具體配置如下:

  1. spring.rabbitmq.template.retry.enabled=true 
  2. spring.rabbitmq.template.retry.initial-interval=1000ms 
  3. spring.rabbitmq.template.retry.max-attempts=10 
  4. spring.rabbitmq.template.retry.max-interval=10000ms 
  5. spring.rabbitmq.template.retry.multiplier=2 

從上往下配置含義依次是:

  • 開啟重試機制。
  • 重試起始間隔時間。
  • 最大重試次數。
  • 最大重試間隔時間。

間隔時間乘數。(這里配置間隔時間乘數為 2,則第一次間隔時間 1 秒,第二次重試間隔時間 2 秒,第三次 4 秒,以此類推)

配置完成后,再次啟動 Spring Boot 項目,然后關掉 MQ,此時嘗試發送消息,就會發送失敗,進而導致自動重試。

3.2 業務重試

業務重試主要是針對消息沒有到達交換器的情況。

如果消息沒有成功到達交換器,根據我們第二小節的講解,此時就會觸發消息發送失敗回調,在這個回調中,我們就可以做文章了!

整體思路是這樣:

首先創建一張表,用來記錄發送到中間件上的消息,像下面這樣:

每次發送消息的時候,就往數據庫中添加一條記錄。這里的字段都很好理解,有三個我額外說下:

  • status:表示消息的狀態,有三個取值,0,1,2 分別表示消息發送中、消息發送成功以及消息發送失敗。
  • tryTime:表示消息的第一次重試時間(消息發出去之后,在 tryTime 這個時間點還未顯示發送成功,此時就可以開始重試了)。
  • count:表示消息重試次數。

其他字段都很好理解,我就不一一啰嗦了。

在消息發送的時候,我們就往該表中保存一條消息發送記錄,并設置狀態 status 為 0,tryTime 為 1 分鐘之后。

在 confirm 回調方法中,如果收到消息發送成功的回調,就將該條消息的 status 設置為1(在消息發送時為消息設置 msgId,在消息發送成功回調時,通過 msgId 來唯一鎖定該條消息)。

另外開啟一個定時任務,定時任務每隔 10s 就去數據庫中撈一次消息,專門去撈那些 status 為 0 并且已經過了 tryTime 時間記錄,把這些消息拎出來后,首先判斷其重試次數是否已超過 3 次,如果超過 3 次,則修改該條消息的 status 為 2,表示這條消息發送失敗,并且不再重試。對于重試次數沒有超過 3 次的記錄,則重新去發送消息,并且為其 count 的值+1。

大致的思路就是上面這樣,松哥這里就不給出代碼了,松哥的 vhr 里邊郵件發送就是這樣的思路來處理的,完整代碼大家可以參考 vhr 項目(https://github.com/lenve/vhr)。

當然這種思路有兩個弊端:

  • 去數據庫走一遭,可能拖慢 MQ 的 Qos,不過有的時候我們并不需要 MQ 有很高的 Qos,所以這個應用時要看具體情況。
  • 按照上面的思路,可能會出現同一條消息重復發送的情況,不過這都不是事,我們在消息消費時,解決好冪等性問題就行了。

當然,大家也要注意,消息是否要確保 100% 發送成功,也要看具體情況。

4. 小結

好啦,這就是關于消息生產者的一些常見問題以及對應的解決方案,下篇文章松哥和大家探討如果保證消息消費成功并解決冪等性問題。 

本文涉及到的相關源代碼大家可以在這里下載:https://github.com/lenve/javaboy-code-samples。

本文轉載自微信公眾號「江南一點雨」,可以通過以下二維碼關注。轉載本文請聯系江南一點雨公眾號。

 

責任編輯:武曉燕 來源: 江南一點雨
相關推薦

2024-05-09 08:04:23

RabbitMQ消息可靠性

2018-09-27 14:13:27

云服務可靠故障

2025-01-02 09:23:05

2023-12-18 08:36:39

消息隊列微服務開發

2019-07-26 08:00:00

微服務架構

2021-02-02 11:01:31

RocketMQ消息分布式

2010-11-09 15:50:47

SQL Server安

2010-12-28 19:50:21

可靠性產品可靠性

2021-04-27 07:52:18

RocketMQ消息投遞

2020-11-24 05:59:41

容器

2022-03-07 08:13:06

MQ消息可靠性異步通訊

2023-07-21 08:00:00

API數字世界

2011-05-25 19:31:07

Stratus信息化

2010-12-28 20:16:24

2024-02-18 19:10:13

CIO

2023-11-27 13:42:00

消息隊列RocketMQ

2019-08-30 12:10:05

磁盤數據可靠性RAID

2024-07-08 13:19:34

2023-06-02 11:43:58

人工智能AI

2023-09-24 14:49:35

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 午夜伦4480yy私人影院 | 欧美日韩国产欧美 | 中文字幕亚洲视频 | 亚洲成人精品免费 | 在线四虎 | 国产精品自产拍在线观看蜜 | 日韩免费一区二区 | 精品国产视频在线观看 | 日韩欧美在线免费 | 成年人精品视频在线观看 | 日韩性在线 | 中文字幕亚洲一区 | 午夜精品一区二区三区免费视频 | 欧美日韩精品在线免费观看 | 91精品国产91久久久久游泳池 | 日韩欧美一区二区在线播放 | 精品一区二区三区中文字幕 | 欧美国产日韩一区二区三区 | 国产精品美女久久久 | 手机av在线 | 亚洲精品4 | 日韩在线观看一区 | 最新黄色在线观看 | 欧美亚洲成人网 | 涩爱av一区二区三区 | 久久久久久久av麻豆果冻 | 精品久久久久久久 | 国产精品一区二区精品 | 国产欧美日韩一区 | 国内精品久久精品 | 久久精品超碰 | 成年人在线观看视频 | 亚洲精品在线免费 | 成人在线视频一区 | 亚洲va在线va天堂va狼色在线 | 亚洲一区二区三区福利 | 中文字幕一区二区三区不卡 | 色婷婷综合久久久中字幕精品久久 | 大乳boobs巨大吃奶挤奶 | 一区欧美| 亚洲精品一区二区网址 |