成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

消息消費(fèi)失敗如何處理?

開發(fā) 前端
在介紹消息中間件 MQ 之前,我們先來簡單的了解一下,為何要引用消息中間件。

[[384109]]

本文轉(zhuǎn)載自微信公眾號「Java極客技術(shù)」,作者鴨血粉絲。轉(zhuǎn)載本文請聯(lián)系Java極客技術(shù)公眾號。   

一、介紹

在介紹消息中間件 MQ 之前,我們先來簡單的了解一下,為何要引用消息中間件。

例如,在電商平臺中,常見的用戶下單,會經(jīng)歷以下幾個流程。

當(dāng)用戶下單時,創(chuàng)建完訂單之后,會調(diào)用第三方支付平臺,對用戶的賬戶金額進(jìn)行扣款,如果平臺支付扣款成功,會將結(jié)果通知到對應(yīng)的業(yè)務(wù)系統(tǒng),接著業(yè)務(wù)系統(tǒng)會更新訂單狀態(tài),同時調(diào)用倉庫接口,進(jìn)行減庫存,通知物流進(jìn)行發(fā)貨!

 

試想一下,從訂單狀態(tài)更新、到扣減庫存、通知物流發(fā)貨都在一個方法內(nèi)同步完成,假如用戶支付成功、訂單狀態(tài)更新也成功,但是在扣減庫存或者通知物流發(fā)貨步驟失敗了,那么就會造成一個問題,用戶已經(jīng)支付成功了,只是在倉庫扣減庫存方面失敗,從而導(dǎo)致整個交易失敗!

一單失敗,老板可以假裝看不見,但是如果上千個單子都因此失敗,那么因系統(tǒng)造成的業(yè)務(wù)損失,將是巨大的,老板可能坐不住了!

因此,針對這種業(yè)務(wù)場景,架構(gòu)師們引入了異步通信技術(shù)方案,從而保證服務(wù)的高可用,大體流程如下:

 

當(dāng)訂單系統(tǒng)收到支付平臺發(fā)送的扣款結(jié)果之后,會將訂單消息發(fā)送到 MQ 消息中間件,同時也會更新訂單狀態(tài)。

在另一端,由倉庫系統(tǒng)來異步監(jiān)聽訂單系統(tǒng)發(fā)送的消息,當(dāng)收到訂單消息之后,再操作扣減庫存、通知物流公司發(fā)貨等服務(wù)!

在優(yōu)化后的流程下,即使扣減庫存服務(wù)失敗,也不會影響用戶交易。

正如《人月神話》中所說的,軟件工程,沒有銀彈!

當(dāng)引入了 MQ 消息中間件之后,同樣也會帶來另一個問題,假如 MQ 消息中間件突然宕機(jī)了,導(dǎo)致消息無法發(fā)送出去,那倉庫系統(tǒng)就無法接受到訂單消息,進(jìn)而也無法發(fā)貨!

針對這個問題,業(yè)界主流的解決辦法是采用集群部署,一主多從模式,從而實現(xiàn)服務(wù)的高可用,即使一臺機(jī)器突然宕機(jī)了,也依然能保證服務(wù)可用,在服務(wù)器故障期間,通過運(yùn)維手段,將服務(wù)重新啟動,之后服務(wù)依然能正常運(yùn)行!

但是還有另一個問題,假如倉庫系統(tǒng)已經(jīng)收到訂單消息了,但是業(yè)務(wù)處理異常,或者服務(wù)器異常,導(dǎo)致當(dāng)前商品庫存并沒有扣減,也沒有發(fā)貨!

這個時候又改如何處理呢?

今天我們所要介紹的正是這種場景,假如消息消費(fèi)失敗,我們應(yīng)該如何處理?

二、解決方案

針對消息消費(fèi)失敗的場景,我們一般會通過如下方式進(jìn)行處理:

  • 當(dāng)消息消費(fèi)失敗時,會對消息進(jìn)行重新推送
  • 如果重試次數(shù)超過最大值,會將異常消息存儲到數(shù)據(jù)庫,然后人工介入排查問題,進(jìn)行手工重試

 

當(dāng)消息在客戶端消費(fèi)失敗時,我們會將異常的消息加入到一個消息重試對象中,同時設(shè)置最大重試次數(shù),并將消息重新推送到 MQ 消息中間件里,當(dāng)重試次數(shù)超過最大值時,會將異常的消息存儲到 MongoDB數(shù)據(jù)庫中,方便后續(xù)查詢異常的信息。

基于以上系統(tǒng)模型,我們可以編寫一個公共重試組件,話不多說,直接干!

三、代碼實踐

本次補(bǔ)償服務(wù)采用 rabbitmq 消息中間件進(jìn)行處理,其他消息中間件處理思路也類似!

3.1、創(chuàng)建一個消息重試實體類

  1. @Data 
  2. @EqualsAndHashCode(callSuper = false
  3. @Accessors(chain = true
  4. public class MessageRetryDTO implements Serializable { 
  5.  
  6.     private static final long serialVersionUID = 1L; 
  7.  
  8.     /** 
  9.      * 原始消息body 
  10.      */ 
  11.     private String bodyMsg; 
  12.  
  13.     /** 
  14.      * 消息來源ID 
  15.      */ 
  16.     private String sourceId; 
  17.  
  18.     /** 
  19.      * 消息來源描述 
  20.      */ 
  21.     private String sourceDesc; 
  22.  
  23.     /** 
  24.      * 交換器 
  25.      */ 
  26.     private String exchangeName; 
  27.  
  28.     /** 
  29.      * 路由鍵 
  30.      */ 
  31.     private String routingKey; 
  32.  
  33.     /** 
  34.      * 隊列 
  35.      */ 
  36.     private String queueName; 
  37.  
  38.     /** 
  39.      * 狀態(tài),1:初始化,2:成功,3:失敗 
  40.      */ 
  41.     private Integer status = 1; 
  42.  
  43.     /** 
  44.      * 最大重試次數(shù) 
  45.      */ 
  46.     private Integer maxTryCount = 3; 
  47.  
  48.     /** 
  49.      * 當(dāng)前重試次數(shù) 
  50.      */ 
  51.     private Integer currentRetryCount = 0; 
  52.  
  53.     /** 
  54.      * 重試時間間隔(毫秒) 
  55.      */ 
  56.     private Long retryIntervalTime = 0L; 
  57.  
  58.     /** 
  59.      * 任務(wù)失敗信息 
  60.      */ 
  61.     private String errorMsg; 
  62.  
  63.     /** 
  64.      * 創(chuàng)建時間 
  65.      */ 
  66.     private Date createTime; 
  67.  
  68.     @Override 
  69.     public String toString() { 
  70.         return "MessageRetryDTO{" + 
  71.                 "bodyMsg='" + bodyMsg + '\'' + 
  72.                 ", sourceId='" + sourceId + '\'' + 
  73.                 ", sourceDesc='" + sourceDesc + '\'' + 
  74.                 ", exchangeName='" + exchangeName + '\'' + 
  75.                 ", routingKey='" + routingKey + '\'' + 
  76.                 ", queueName='" + queueName + '\'' + 
  77.                 ", status=" + status + 
  78.                 ", maxTryCount=" + maxTryCount + 
  79.                 ", currentRetryCount=" + currentRetryCount + 
  80.                 ", retryIntervalTime=" + retryIntervalTime + 
  81.                 ", errorMsg='" + errorMsg + '\'' + 
  82.                 ", createTime=" + createTime + 
  83.                 '}'
  84.     } 
  85.  
  86.     /** 
  87.      * 檢查重試次數(shù)是否超過最大值 
  88.      * 
  89.      * @return 
  90.      */ 
  91.     public boolean checkRetryCount() { 
  92.         retryCountCalculate(); 
  93.         //檢查重試次數(shù)是否超過最大值 
  94.         if (this.currentRetryCount < this.maxTryCount) { 
  95.             return true
  96.         } 
  97.         return false
  98.     } 
  99.  
  100.     /** 
  101.      * 重新計算重試次數(shù) 
  102.      */ 
  103.     private void retryCountCalculate() { 
  104.         this.currentRetryCount = this.currentRetryCount + 1; 
  105.     } 
  106.  

3.2、編寫服務(wù)重試抽象類

  1. public abstract class CommonMessageRetryService { 
  2.  
  3.     private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class); 
  4.  
  5.     @Autowired 
  6.     private RabbitTemplate rabbitTemplate; 
  7.  
  8.     @Autowired 
  9.     private MongoTemplate mongoTemplate; 
  10.  
  11.  
  12.     /** 
  13.      * 初始化消息 
  14.      * 
  15.      * @param message 
  16.      */ 
  17.     public void initMessage(Message message) { 
  18.         log.info("{} 收到消息: {},業(yè)務(wù)數(shù)據(jù):{}", this.getClass().getName(), message.toString(), new String(message.getBody())); 
  19.         try { 
  20.             //封裝消息 
  21.             MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message); 
  22.             if (log.isInfoEnabled()) { 
  23.                 log.info("反序列化消息:{}", messageRetryDto.toString()); 
  24.             } 
  25.             prepareAction(messageRetryDto); 
  26.         } catch (Exception e) { 
  27.             log.warn("處理消息異常,錯誤信息:", e); 
  28.         } 
  29.     } 
  30.  
  31.     /** 
  32.      * 準(zhǔn)備執(zhí)行 
  33.      * 
  34.      * @param retryDto 
  35.      */ 
  36.     protected void prepareAction(MessageRetryDTO retryDto) { 
  37.         try { 
  38.             execute(retryDto); 
  39.             doSuccessCallBack(retryDto); 
  40.         } catch (Exception e) { 
  41.             log.error("當(dāng)前任務(wù)執(zhí)行異常,業(yè)務(wù)數(shù)據(jù):" + retryDto.toString(), e); 
  42.             //執(zhí)行失敗,計算是否還需要繼續(xù)重試 
  43.             if (retryDto.checkRetryCount()) { 
  44.                 if (log.isInfoEnabled()) { 
  45.                     log.info("重試消息:{}", retryDto.toString()); 
  46.                 } 
  47.                 retrySend(retryDto); 
  48.             } else { 
  49.                 if (log.isWarnEnabled()) { 
  50.                     log.warn("當(dāng)前任務(wù)重試次數(shù)已經(jīng)到達(dá)最大次數(shù),業(yè)務(wù)數(shù)據(jù):" + retryDto.toString(), e); 
  51.                 } 
  52.                 doFailCallBack(retryDto.setErrorMsg(e.getMessage())); 
  53.             } 
  54.         } 
  55.     } 
  56.  
  57.     /** 
  58.      * 任務(wù)執(zhí)行成功,回調(diào)服務(wù)(根據(jù)需要進(jìn)行重寫) 
  59.      * 
  60.      * @param messageRetryDto 
  61.      */ 
  62.     private void doSuccessCallBack(MessageRetryDTO messageRetryDto) { 
  63.         try { 
  64.             successCallback(messageRetryDto); 
  65.         } catch (Exception e) { 
  66.             log.warn("執(zhí)行成功回調(diào)異常,隊列描述:{},錯誤原因:{}", messageRetryDto.getSourceDesc(), e.getMessage()); 
  67.         } 
  68.     } 
  69.  
  70.     /** 
  71.      * 任務(wù)執(zhí)行失敗,回調(diào)服務(wù)(根據(jù)需要進(jìn)行重寫) 
  72.      * 
  73.      * @param messageRetryDto 
  74.      */ 
  75.     private void doFailCallBack(MessageRetryDTO messageRetryDto) { 
  76.         try { 
  77.             saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg())); 
  78.             failCallback(messageRetryDto); 
  79.         } catch (Exception e) { 
  80.             log.warn("執(zhí)行失敗回調(diào)異常,隊列描述:{},錯誤原因:{}", messageRetryDto.getSourceDesc(), e.getMessage()); 
  81.         } 
  82.     } 
  83.  
  84.     /** 
  85.      * 執(zhí)行任務(wù) 
  86.      * 
  87.      * @param messageRetryDto 
  88.      */ 
  89.     protected abstract void execute(MessageRetryDTO messageRetryDto); 
  90.  
  91.     /** 
  92.      * 成功回調(diào) 
  93.      * 
  94.      * @param messageRetryDto 
  95.      */ 
  96.     protected abstract void successCallback(MessageRetryDTO messageRetryDto); 
  97.  
  98.     /** 
  99.      * 失敗回調(diào) 
  100.      * 
  101.      * @param messageRetryDto 
  102.      */ 
  103.     protected abstract void failCallback(MessageRetryDTO messageRetryDto); 
  104.  
  105.     /** 
  106.      * 構(gòu)建消息補(bǔ)償實體 
  107.      * @param message 
  108.      * @return 
  109.      */ 
  110.     private MessageRetryDTO buildMessageRetryInfo(Message message){ 
  111.         //如果頭部包含補(bǔ)償消息實體,直接返回 
  112.         Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders(); 
  113.         if(messageHeaders.containsKey("message_retry_info")){ 
  114.             Object retryMsg = messageHeaders.get("message_retry_info"); 
  115.             if(Objects.nonNull(retryMsg)){ 
  116.                 return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class); 
  117.             } 
  118.         } 
  119.         //自動將業(yè)務(wù)消息加入補(bǔ)償實體 
  120.         MessageRetryDTO messageRetryDto = new MessageRetryDTO(); 
  121.         messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8)); 
  122.         messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange()); 
  123.         messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey()); 
  124.         messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue()); 
  125.         messageRetryDto.setCreateTime(new Date()); 
  126.         return messageRetryDto; 
  127.     } 
  128.  
  129.     /** 
  130.      * 異常消息重新入庫 
  131.      * @param retryDto 
  132.      */ 
  133.     private void retrySend(MessageRetryDTO retryDto){ 
  134.         //將補(bǔ)償消息實體放入頭部,原始消息內(nèi)容保持不變 
  135.         MessageProperties messageProperties = new MessageProperties(); 
  136.         messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); 
  137.         messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto)); 
  138.         Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties); 
  139.         rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message); 
  140.     } 
  141.  
  142.  
  143.  
  144.     /** 
  145.      * 將異常消息存儲到mongodb中 
  146.      * @param retryDto 
  147.      */ 
  148.     private void saveMessageRetryInfo(MessageRetryDTO retryDto){ 
  149.         try { 
  150.             mongoTemplate.save(retryDto, "message_retry_info"); 
  151.         } catch (Exception e){ 
  152.             log.error("將異常消息存儲到mongodb失敗,消息數(shù)據(jù):" + retryDto.toString(), e); 
  153.         } 
  154.     } 

3.3、編寫監(jiān)聽服務(wù)類

在消費(fèi)端應(yīng)用的時候,也非常簡單,例如,針對扣減庫存操作,我們可以通過如下方式進(jìn)行處理!

  1. @Component 
  2. public class OrderServiceListener extends CommonMessageRetryService { 
  3.  
  4.     private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class); 
  5.  
  6.     /** 
  7.      * 監(jiān)聽訂單系統(tǒng)下單成功消息 
  8.      * @param message 
  9.      */ 
  10.     @RabbitListener(queues = "mq.order.add"
  11.     public void consume(Message message) { 
  12.         log.info("收到訂單下單成功消息: {}", message.toString()); 
  13.         super.initMessage(message); 
  14.     } 
  15.  
  16.  
  17.     @Override 
  18.     protected void execute(MessageRetryDTO messageRetryDto) { 
  19.         //調(diào)用扣減庫存服務(wù),將業(yè)務(wù)異常拋出來 
  20.     } 
  21.  
  22.     @Override 
  23.     protected void successCallback(MessageRetryDTO messageRetryDto) { 
  24.         //業(yè)務(wù)處理成功,回調(diào) 
  25.     } 
  26.  
  27.     @Override 
  28.     protected void failCallback(MessageRetryDTO messageRetryDto) { 
  29.         //業(yè)務(wù)處理失敗,回調(diào) 
  30.     } 

當(dāng)消息消費(fèi)失敗,并超過最大次數(shù)時,會將消息存儲到 mongodb 中,然后像常規(guī)數(shù)據(jù)庫操作一樣,可以通過 web 接口查詢異常消息,并針對具體場景進(jìn)行重試!

四、小結(jié)

可能有的同學(xué)會問,為啥不將異常消息存在數(shù)據(jù)庫?

起初的確是存儲在 MYSQL 中,但是隨著業(yè)務(wù)的快速發(fā)展,訂單消息數(shù)據(jù)結(jié)構(gòu)越來越復(fù)雜,數(shù)據(jù)量也非常的大,甚至大到 MYSQL 中的 text 類型都無法存儲,同時這種數(shù)據(jù)結(jié)構(gòu)也不太適合在 MYSQL 中存儲,因此將其遷移到 mongodb!

本文主要圍繞消息消費(fèi)失敗這種場景,進(jìn)行基礎(chǔ)的方案和代碼實踐講解,可能有理解不到位的地方,歡迎批評指出!

五、參考

 

1、石杉的架構(gòu)筆記 - 如何處理消息消費(fèi)失敗問題

 

責(zé)任編輯:武曉燕 來源: Java極客技術(shù)
相關(guān)推薦

2024-09-23 08:04:45

MYSQL數(shù)據(jù)存儲

2012-07-03 11:18:20

運(yùn)維disable tab

2019-08-15 10:20:19

云計算技術(shù)安全

2018-12-25 09:44:42

2012-12-12 09:49:41

2020-12-29 09:11:33

LinuxLinux內(nèi)核

2017-03-13 13:21:34

Git處理大倉庫

2017-10-26 08:43:18

JavaScript內(nèi)存處理

2019-12-23 10:20:12

Web圖片優(yōu)化前端

2022-06-02 10:54:16

BrokerRocketMQ

2024-12-18 07:43:49

2024-05-23 12:11:39

2010-05-17 10:04:45

2024-08-26 10:47:22

2021-05-31 10:47:17

SpringSecuritySession

2022-04-19 09:00:52

ReactTypeScript

2023-07-03 13:50:13

ReactonResize事件

2021-03-24 10:40:26

Python垃圾語言

2011-02-28 14:08:31

網(wǎng)速變慢局域網(wǎng)網(wǎng)速

2024-04-16 13:32:57

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 日韩av啪啪网站大全免费观看 | 精品国产欧美一区二区三区成人 | 欧美日韩精品专区 | 在线视频一区二区三区 | 97国产精品视频人人做人人爱 | 美女天堂在线 | 欧美综合久久 | 国产精品久久久久久久久久 | 午夜精品导航 | 国产精品99久久久久久动医院 | 久久精品日产第一区二区三区 | 国产精品视频 | 嫩草视频入口 | 四虎影院在线观看av | 午夜国产精品视频 | 欧美一区视频 | 免费国产一区二区 | 激情五月综合 | 日本天天操 | 欧美成人精品一区 | 五月天国产 | 中文字幕精品视频在线观看 | 日韩av在线免费 | 国产一区二区影院 | 久久精品国产久精国产 | 国产精品成人一区二区三区 | 午夜精品久久久久久久久久久久久 | 亚洲欧美精品久久 | 日本不卡视频 | 亚洲综合无码一区二区 | 日本精品一区二区三区在线观看 | 国产精品国产三级国产aⅴ中文 | 婷婷色网 | 国产精品一区二区av | 欧美国产精品一区二区三区 | 亚洲精品一区二区三区 | 国产小视频在线 | 天堂在线91 | 99精品视频一区二区三区 | 久久国产亚洲 | 成人依人 |