消息消費(fèi)失敗如何處理?
本文轉(zhuǎn)載自微信公眾號「Java極客技術(shù)」,作者鴨血粉絲。轉(zhuǎn)載本文請聯(lián)系Java極客技術(shù)公眾號。
一、介紹
在介紹消息中間件 MQ 之前,我們先來簡單的了解一下,為何要引用消息中間件。
例如,在電商平臺中,常見的用戶下單,會經(jīng)歷以下幾個流程。
當(dāng)用戶下單時,創(chuàng)建完訂單之后,會調(diào)用第三方支付平臺,對用戶的賬戶金額進(jìn)行扣款,如果平臺支付扣款成功,會將結(jié)果通知到對應(yīng)的業(yè)務(wù)系統(tǒng),接著業(yè)務(wù)系統(tǒng)會更新訂單狀態(tài),同時調(diào)用倉庫接口,進(jìn)行減庫存,通知物流進(jìn)行發(fā)貨!
試想一下,從訂單狀態(tài)更新、到扣減庫存、通知物流發(fā)貨都在一個方法內(nèi)同步完成,假如用戶支付成功、訂單狀態(tài)更新也成功,但是在扣減庫存或者通知物流發(fā)貨步驟失敗了,那么就會造成一個問題,用戶已經(jīng)支付成功了,只是在倉庫扣減庫存方面失敗,從而導(dǎo)致整個交易失敗!
一單失敗,老板可以假裝看不見,但是如果上千個單子都因此失敗,那么因系統(tǒng)造成的業(yè)務(wù)損失,將是巨大的,老板可能坐不住了!
因此,針對這種業(yè)務(wù)場景,架構(gòu)師們引入了異步通信技術(shù)方案,從而保證服務(wù)的高可用,大體流程如下:
當(dāng)訂單系統(tǒng)收到支付平臺發(fā)送的扣款結(jié)果之后,會將訂單消息發(fā)送到 MQ 消息中間件,同時也會更新訂單狀態(tài)。
在另一端,由倉庫系統(tǒng)來異步監(jiān)聽訂單系統(tǒng)發(fā)送的消息,當(dāng)收到訂單消息之后,再操作扣減庫存、通知物流公司發(fā)貨等服務(wù)!
在優(yōu)化后的流程下,即使扣減庫存服務(wù)失敗,也不會影響用戶交易。
正如《人月神話》中所說的,軟件工程,沒有銀彈!
當(dāng)引入了 MQ 消息中間件之后,同樣也會帶來另一個問題,假如 MQ 消息中間件突然宕機(jī)了,導(dǎo)致消息無法發(fā)送出去,那倉庫系統(tǒng)就無法接受到訂單消息,進(jìn)而也無法發(fā)貨!
針對這個問題,業(yè)界主流的解決辦法是采用集群部署,一主多從模式,從而實現(xiàn)服務(wù)的高可用,即使一臺機(jī)器突然宕機(jī)了,也依然能保證服務(wù)可用,在服務(wù)器故障期間,通過運(yùn)維手段,將服務(wù)重新啟動,之后服務(wù)依然能正常運(yùn)行!
但是還有另一個問題,假如倉庫系統(tǒng)已經(jīng)收到訂單消息了,但是業(yè)務(wù)處理異常,或者服務(wù)器異常,導(dǎo)致當(dāng)前商品庫存并沒有扣減,也沒有發(fā)貨!
這個時候又改如何處理呢?
今天我們所要介紹的正是這種場景,假如消息消費(fèi)失敗,我們應(yīng)該如何處理?
二、解決方案
針對消息消費(fèi)失敗的場景,我們一般會通過如下方式進(jìn)行處理:
- 當(dāng)消息消費(fèi)失敗時,會對消息進(jìn)行重新推送
- 如果重試次數(shù)超過最大值,會將異常消息存儲到數(shù)據(jù)庫,然后人工介入排查問題,進(jìn)行手工重試
當(dāng)消息在客戶端消費(fèi)失敗時,我們會將異常的消息加入到一個消息重試對象中,同時設(shè)置最大重試次數(shù),并將消息重新推送到 MQ 消息中間件里,當(dāng)重試次數(shù)超過最大值時,會將異常的消息存儲到 MongoDB數(shù)據(jù)庫中,方便后續(xù)查詢異常的信息。
基于以上系統(tǒng)模型,我們可以編寫一個公共重試組件,話不多說,直接干!
三、代碼實踐
本次補(bǔ)償服務(wù)采用 rabbitmq 消息中間件進(jìn)行處理,其他消息中間件處理思路也類似!
3.1、創(chuàng)建一個消息重試實體類
- @Data
- @EqualsAndHashCode(callSuper = false)
- @Accessors(chain = true)
- public class MessageRetryDTO implements Serializable {
- private static final long serialVersionUID = 1L;
- /**
- * 原始消息body
- */
- private String bodyMsg;
- /**
- * 消息來源ID
- */
- private String sourceId;
- /**
- * 消息來源描述
- */
- private String sourceDesc;
- /**
- * 交換器
- */
- private String exchangeName;
- /**
- * 路由鍵
- */
- private String routingKey;
- /**
- * 隊列
- */
- private String queueName;
- /**
- * 狀態(tài),1:初始化,2:成功,3:失敗
- */
- private Integer status = 1;
- /**
- * 最大重試次數(shù)
- */
- private Integer maxTryCount = 3;
- /**
- * 當(dāng)前重試次數(shù)
- */
- private Integer currentRetryCount = 0;
- /**
- * 重試時間間隔(毫秒)
- */
- private Long retryIntervalTime = 0L;
- /**
- * 任務(wù)失敗信息
- */
- private String errorMsg;
- /**
- * 創(chuàng)建時間
- */
- private Date createTime;
- @Override
- public String toString() {
- return "MessageRetryDTO{" +
- "bodyMsg='" + bodyMsg + '\'' +
- ", sourceId='" + sourceId + '\'' +
- ", sourceDesc='" + sourceDesc + '\'' +
- ", exchangeName='" + exchangeName + '\'' +
- ", routingKey='" + routingKey + '\'' +
- ", queueName='" + queueName + '\'' +
- ", status=" + status +
- ", maxTryCount=" + maxTryCount +
- ", currentRetryCount=" + currentRetryCount +
- ", retryIntervalTime=" + retryIntervalTime +
- ", errorMsg='" + errorMsg + '\'' +
- ", createTime=" + createTime +
- '}';
- }
- /**
- * 檢查重試次數(shù)是否超過最大值
- *
- * @return
- */
- public boolean checkRetryCount() {
- retryCountCalculate();
- //檢查重試次數(shù)是否超過最大值
- if (this.currentRetryCount < this.maxTryCount) {
- return true;
- }
- return false;
- }
- /**
- * 重新計算重試次數(shù)
- */
- private void retryCountCalculate() {
- this.currentRetryCount = this.currentRetryCount + 1;
- }
- }
3.2、編寫服務(wù)重試抽象類
- public abstract class CommonMessageRetryService {
- private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class);
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Autowired
- private MongoTemplate mongoTemplate;
- /**
- * 初始化消息
- *
- * @param message
- */
- public void initMessage(Message message) {
- log.info("{} 收到消息: {},業(yè)務(wù)數(shù)據(jù):{}", this.getClass().getName(), message.toString(), new String(message.getBody()));
- try {
- //封裝消息
- MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message);
- if (log.isInfoEnabled()) {
- log.info("反序列化消息:{}", messageRetryDto.toString());
- }
- prepareAction(messageRetryDto);
- } catch (Exception e) {
- log.warn("處理消息異常,錯誤信息:", e);
- }
- }
- /**
- * 準(zhǔn)備執(zhí)行
- *
- * @param retryDto
- */
- protected void prepareAction(MessageRetryDTO retryDto) {
- try {
- execute(retryDto);
- doSuccessCallBack(retryDto);
- } catch (Exception e) {
- log.error("當(dāng)前任務(wù)執(zhí)行異常,業(yè)務(wù)數(shù)據(jù):" + retryDto.toString(), e);
- //執(zhí)行失敗,計算是否還需要繼續(xù)重試
- if (retryDto.checkRetryCount()) {
- if (log.isInfoEnabled()) {
- log.info("重試消息:{}", retryDto.toString());
- }
- retrySend(retryDto);
- } else {
- if (log.isWarnEnabled()) {
- log.warn("當(dāng)前任務(wù)重試次數(shù)已經(jīng)到達(dá)最大次數(shù),業(yè)務(wù)數(shù)據(jù):" + retryDto.toString(), e);
- }
- doFailCallBack(retryDto.setErrorMsg(e.getMessage()));
- }
- }
- }
- /**
- * 任務(wù)執(zhí)行成功,回調(diào)服務(wù)(根據(jù)需要進(jìn)行重寫)
- *
- * @param messageRetryDto
- */
- private void doSuccessCallBack(MessageRetryDTO messageRetryDto) {
- try {
- successCallback(messageRetryDto);
- } catch (Exception e) {
- log.warn("執(zhí)行成功回調(diào)異常,隊列描述:{},錯誤原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());
- }
- }
- /**
- * 任務(wù)執(zhí)行失敗,回調(diào)服務(wù)(根據(jù)需要進(jìn)行重寫)
- *
- * @param messageRetryDto
- */
- private void doFailCallBack(MessageRetryDTO messageRetryDto) {
- try {
- saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg()));
- failCallback(messageRetryDto);
- } catch (Exception e) {
- log.warn("執(zhí)行失敗回調(diào)異常,隊列描述:{},錯誤原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());
- }
- }
- /**
- * 執(zhí)行任務(wù)
- *
- * @param messageRetryDto
- */
- protected abstract void execute(MessageRetryDTO messageRetryDto);
- /**
- * 成功回調(diào)
- *
- * @param messageRetryDto
- */
- protected abstract void successCallback(MessageRetryDTO messageRetryDto);
- /**
- * 失敗回調(diào)
- *
- * @param messageRetryDto
- */
- protected abstract void failCallback(MessageRetryDTO messageRetryDto);
- /**
- * 構(gòu)建消息補(bǔ)償實體
- * @param message
- * @return
- */
- private MessageRetryDTO buildMessageRetryInfo(Message message){
- //如果頭部包含補(bǔ)償消息實體,直接返回
- Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders();
- if(messageHeaders.containsKey("message_retry_info")){
- Object retryMsg = messageHeaders.get("message_retry_info");
- if(Objects.nonNull(retryMsg)){
- return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class);
- }
- }
- //自動將業(yè)務(wù)消息加入補(bǔ)償實體
- MessageRetryDTO messageRetryDto = new MessageRetryDTO();
- messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));
- messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange());
- messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
- messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue());
- messageRetryDto.setCreateTime(new Date());
- return messageRetryDto;
- }
- /**
- * 異常消息重新入庫
- * @param retryDto
- */
- private void retrySend(MessageRetryDTO retryDto){
- //將補(bǔ)償消息實體放入頭部,原始消息內(nèi)容保持不變
- MessageProperties messageProperties = new MessageProperties();
- messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
- messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto));
- Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties);
- rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message);
- }
- /**
- * 將異常消息存儲到mongodb中
- * @param retryDto
- */
- private void saveMessageRetryInfo(MessageRetryDTO retryDto){
- try {
- mongoTemplate.save(retryDto, "message_retry_info");
- } catch (Exception e){
- log.error("將異常消息存儲到mongodb失敗,消息數(shù)據(jù):" + retryDto.toString(), e);
- }
- }
- }
3.3、編寫監(jiān)聽服務(wù)類
在消費(fèi)端應(yīng)用的時候,也非常簡單,例如,針對扣減庫存操作,我們可以通過如下方式進(jìn)行處理!
- @Component
- public class OrderServiceListener extends CommonMessageRetryService {
- private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class);
- /**
- * 監(jiān)聽訂單系統(tǒng)下單成功消息
- * @param message
- */
- @RabbitListener(queues = "mq.order.add")
- public void consume(Message message) {
- log.info("收到訂單下單成功消息: {}", message.toString());
- super.initMessage(message);
- }
- @Override
- protected void execute(MessageRetryDTO messageRetryDto) {
- //調(diào)用扣減庫存服務(wù),將業(yè)務(wù)異常拋出來
- }
- @Override
- protected void successCallback(MessageRetryDTO messageRetryDto) {
- //業(yè)務(wù)處理成功,回調(diào)
- }
- @Override
- protected void failCallback(MessageRetryDTO messageRetryDto) {
- //業(yè)務(wù)處理失敗,回調(diào)
- }
- }
當(dāng)消息消費(fèi)失敗,并超過最大次數(shù)時,會將消息存儲到 mongodb 中,然后像常規(guī)數(shù)據(jù)庫操作一樣,可以通過 web 接口查詢異常消息,并針對具體場景進(jìn)行重試!
四、小結(jié)
可能有的同學(xué)會問,為啥不將異常消息存在數(shù)據(jù)庫?
起初的確是存儲在 MYSQL 中,但是隨著業(yè)務(wù)的快速發(fā)展,訂單消息數(shù)據(jù)結(jié)構(gòu)越來越復(fù)雜,數(shù)據(jù)量也非常的大,甚至大到 MYSQL 中的 text 類型都無法存儲,同時這種數(shù)據(jù)結(jié)構(gòu)也不太適合在 MYSQL 中存儲,因此將其遷移到 mongodb!
本文主要圍繞消息消費(fèi)失敗這種場景,進(jìn)行基礎(chǔ)的方案和代碼實踐講解,可能有理解不到位的地方,歡迎批評指出!
五、參考
1、石杉的架構(gòu)筆記 - 如何處理消息消費(fèi)失敗問題