成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

順序消息的實現-RocketMQ知識體系(五)

開發 前端
順序消息(FIFO 消息)是 MQ 提供的一種嚴格按照順序進行發布和消費的消息類型。順序消息由兩個部分組成:順序發布和順序消費。

[[410981]]

我們知道,kafka 如果要保證順序消費,必須保證消息保存到同一個patition上,而且為了有序性,只能有一個消費者進行消費。這種情況下,Kafka 就退化成了單一隊列,毫無并發性可言,極大降低系統性能。那么對于對業務比較友好的RocketMQ 是如何實現的呢?首先,我們循序漸進的來了解下順序消息的實現。

順序消息業務使用場景

1、電商場景中傳遞訂單狀態。

2、同步mysql 的binlong 日志,數據庫的操作是有順序的。

3、其他消息之間有先后的依賴關系,后一條消息需要依賴于前一條消息的處理結果的情況。

等等。。。

消息中間件中的順序消息

順序消息(FIFO 消息)是 MQ 提供的一種嚴格按照順序進行發布和消費的消息類型。順序消息由兩個部分組成:順序發布和順序消費。

順序消息包含兩種類型:

分區順序:一個Partition(queue)內所有的消息按照先進先出的順序進行發布和消費

全局順序:一個Topic內所有的消息按照先進先出的順序進行發布和消費.但是全局順序極大的降低了系統的吞吐量,不符合mq的設計初衷。

那么折中的辦法就是選擇分區順序。

【局部順序消費】

如何保證順序

在MQ的模型中,順序需要由3個階段去保障:

  1. 消息被發送時保持順序
  2. 消息被存儲時保持和發送的順序一致
  3. 消息被消費時保持和存儲的順序一致

發送時保持順序意味著對于有順序要求的消息,用戶應該在同一個線程中采用同步的方式發送。存儲保持和發送的順序一致則要求在同一線程中被發送出來的消息A和B,存儲時在空間上A一定在B之前。而消費保持和存儲一致則要求消息A、B到達Consumer之后必須按照先A后B的順序被處理。

第一點,消息順序發送,多線程發送的消息無法保證有序性,因此,需要業務方在發送時,針對同一個業務編號(如同一筆訂單)的消息需要保證在一個線程內順序發送,在上一個消息發送成功后,在進行下一個消息的發送。對應到mq中,消息發送方法就得使用同步發送,異步發送無法保證順序性。

第二點,消息順序存儲,mq的topic下會存在多個queue,要保證消息的順序存儲,同一個業務編號的消息需要被發送到一個queue中。對應到mq中,需要使用MessageQueueSelector來選擇要發送的queue,即對業務編號進行hash,然后根據隊列數量對hash值取余,將消息發送到一個queue中。

第三點,消息順序消費,要保證消息順序消費,同一個queue就只能被一個消費者所消費,因此對broker中消費隊列加鎖是無法避免的。同一時刻,一個消費隊列只能被一個消費者消費,消費者內部,也只能有一個消費線程來消費該隊列。即,同一時刻,一個消費隊列只能被一個消費者中的一個線程消費。

RocketMQ中順序的實現

【Producer端】

Producer端確保消息順序唯一要做的事情就是將消息路由到特定的分區,在RocketMQ中,通過MessageQueueSelector來實現分區的選擇。

  1. /** 
  2.  * 消息隊列選擇器 
  3.  */ 
  4. public interface MessageQueueSelector { 
  5.  
  6.     /** 
  7.      * 選擇消息隊列 
  8.      * 
  9.      * @param mqs 消息隊列 
  10.      * @param msg 消息 
  11.      * @param arg 參數 
  12.      * @return 消息隊列 
  13.      */ 
  14.     MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg); 
  • List mqs:消息要發送的Topic下所有的分區
  • Message msg:消息對象
  • 額外的參數:用戶可以傳遞自己的參數

比如如下實現就可以保證相同的訂單的消息被路由到相同的分區:

  1. long orderId = ((Order) object).getOrderId; 
  2. return mqs.get(orderId % mqs.size()); 

【Consumer端】

嘗試鎖定鎖定MessageQueue。

首先我們如何保證一個隊列只被一個消費者消費?

消費隊列存在于broker端,如果想保證一個隊列被一個消費者消費,那么消費者在進行消息拉取消費時就必須向mq服務器申請隊列鎖,消費者申請隊列鎖的代碼存在于RebalanceService消息隊列負載的實現代碼中。

消費者重新負載,并且分配完消費隊列后,需要向mq服務器發起消息拉取請求,代碼實現在RebalanceImpl#updateProcessQueueTableInRebalance中,針對順序消息的消息拉取,mq做了如下判斷:

  1. // 增加 不在processQueueTable && 存在于mqSet 里的消息隊列。 
  2.        List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息請求數組 
  3.        for (MessageQueue mq : mqSet) { 
  4.            if (!this.processQueueTable.containsKey(mq)) { 
  5.                if (isOrder && !this.lock(mq)) { // 順序消息鎖定消息隊列 
  6.                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); 
  7.                    continue
  8.                } 
  9.  
  10.                this.removeDirtyOffset(mq); 
  11.                ProcessQueue pq = new ProcessQueue(); 
  12.                long nextOffset = this.computePullFromWhere(mq); 
  13.                if (nextOffset >= 0) { 
  14.                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); 
  15.                    if (pre != null) { 
  16.                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); 
  17.                    } else { 
  18.                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); 
  19.                        PullRequest pullRequest = new PullRequest(); 
  20.                        pullRequest.setConsumerGroup(consumerGroup); 
  21.                        pullRequest.setNextOffset(nextOffset); 
  22.                        pullRequest.setMessageQueue(mq); 
  23.                        pullRequest.setProcessQueue(pq); 
  24.                        pullRequestList.add(pullRequest); 
  25.                        changed = true
  26.                    } 
  27.                } else { 
  28.                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); 
  29.                } 
  30.            } 
  31.        } 
  32.  
  33.        // 發起消息拉取請求 
  34.        this.dispatchPullRequest(pullRequestList); 

核心思想就是,消費客戶端先向broker端發起對messageQueue的加鎖請求,只有加鎖成功時才創建pullRequest進行消息拉取,下面看下lock加鎖請求方法:

  1. /** 
  2.     * 請求Broker獲得指定消息隊列的分布式鎖 
  3.     * 
  4.     * @param mq 隊列 
  5.     * @return 是否成功 
  6.     */ 
  7.    public boolean lock(final MessageQueue mq) { 
  8.        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); 
  9.        if (findBrokerResult != null) { 
  10.            LockBatchRequestBody requestBody = new LockBatchRequestBody(); 
  11.            requestBody.setConsumerGroup(this.consumerGroup); 
  12.            requestBody.setClientId(this.mQClientFactory.getClientId()); 
  13.            requestBody.getMqSet().add(mq); 
  14.  
  15.            try { 
  16.                // 請求Broker獲得指定消息隊列的分布式鎖 
  17.                Set<MessageQueue> lockedMq = 
  18.                    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); 
  19.  
  20.                // 設置消息處理隊列鎖定成功。鎖定消息隊列成功,可能本地沒有消息處理隊列,設置鎖定成功會在lockAll()方法。 
  21.                for (MessageQueue mmqq : lockedMq) { 
  22.                    ProcessQueue processQueue = this.processQueueTable.get(mmqq); 
  23.                    if (processQueue != null) { 
  24.                        processQueue.setLocked(true); 
  25.                        processQueue.setLastLockTimestamp(System.currentTimeMillis()); 
  26.                    } 
  27.                } 
  28.  
  29.                boolean lockOK = lockedMq.contains(mq); 
  30.                log.info("the message queue lock {}, {} {}"
  31.                    lockOK ? "OK" : "Failed"
  32.                    this.consumerGroup, 
  33.                    mq); 
  34.                return lockOK; 
  35.            } catch (Exception e) { 
  36.                log.error("lockBatchMQ exception, " + mq, e); 
  37.            } 
  38.        } 
  39.  
  40.        return false
  41.    } 

代碼實現邏輯比較清晰,就是調用lockBatchMQ方法發送了一個加鎖請求,那么broker端收到加鎖請求后的處理邏輯又是怎么樣?

【broker端實現】

broker端收到加鎖請求的處理邏輯在RebalanceLockManager#tryLockBatch方法中,RebalanceLockManager中關鍵屬性如下:

  1. /** 
  2.      * 消息隊列鎖過期時間,默認60s 
  3.      */ 
  4.     private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty( 
  5.         "rocketmq.broker.rebalance.lockMaxLiveTime""60000")); 
  6.     /** 
  7.      * 鎖 
  8.      */ 
  9.     private final Lock lock = new ReentrantLock(); 
  10.     /** 
  11.      * 消費分組的消息隊列鎖映射 
  12.      */ 
  13.     private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable = 
  14.             new ConcurrentHashMap<>(1024); 

LockEntry對象中關鍵屬性如下:

  1. /** 
  2.     * 鎖定記錄 
  3.     */ 
  4.    static class LockEntry { 
  5.        /** 
  6.         * 客戶端編號 
  7.         */ 
  8.        private String clientId; 
  9.        /** 
  10.         * 最后鎖定時間 
  11.         */ 
  12.        private volatile long lastUpdateTimestamp = System.currentTimeMillis(); 
  13.  
  14.        public String getClientId() { 
  15.            return clientId; 
  16.        } 
  17.  
  18.        public void setClientId(String clientId) { 
  19.            this.clientId = clientId; 
  20.        } 
  21.  
  22.        public long getLastUpdateTimestamp() { 
  23.            return lastUpdateTimestamp; 
  24.        } 
  25.  
  26.        public void setLastUpdateTimestamp(long lastUpdateTimestamp) { 
  27.            this.lastUpdateTimestamp = lastUpdateTimestamp; 
  28.        } 
  29.  
  30.        /** 
  31.         * 是否鎖定 
  32.         * 
  33.         * @param clientId 客戶端編號 
  34.         * @return 是否 
  35.         */ 
  36.        public boolean isLocked(final String clientId) { 
  37.            boolean eq = this.clientId.equals(clientId); 
  38.            return eq && !this.isExpired(); 
  39.        } 
  40.  
  41.        /** 
  42.         * 鎖定是否過期 
  43.         * 
  44.         * @return 是否 
  45.         */ 
  46.        public boolean isExpired() { 
  47.            boolean expired = 
  48.                (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; 
  49.  
  50.            return expired; 
  51.        } 
  52.    } 

broker端通過對ConcurrentMap> mqLockTable的維護來達到messageQueue加鎖的目的,使得同一時刻,一個messageQueue只能被一個消費者消費。

【再次回到Consumer端,拿到鎖后】

消費者對messageQueue的加鎖已經成功,那么就進入到了第二個步驟,創建pullRequest進行消息拉取,消息拉取部分的代碼實現在PullMessageService中,消息拉取完后,需要提交到ConsumeMessageService中進行消費,順序消費的實現為ConsumeMessageOrderlyService,提交消息進行消費的方法為ConsumeMessageOrderlyService#submitConsumeRequest,具體實現如下:

  1. @Override 
  2.  public void submitConsumeRequest(// 
  3.      final List<MessageExt> msgs, // 
  4.      final ProcessQueue processQueue, // 
  5.      final MessageQueue messageQueue, // 
  6.      final boolean dispathToConsume) { 
  7.      if (dispathToConsume) { 
  8.          ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); 
  9.          this.consumeExecutor.submit(consumeRequest); 
  10.      } 
  11.  } 

構建了一個ConsumeRequest對象,并提交給了ThreadPoolExecutor來并行消費,看下順序消費的ConsumeRequest的run方法實現:

  1. public void run() { 
  2.            if (this.processQueue.isDropped()) { 
  3.                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); 
  4.                return
  5.            } 
  6.  
  7.            // 獲得 Consumer 消息隊列鎖 
  8.            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); 
  9.            synchronized (objLock) { 
  10.                // (廣播模式) 或者 (集群模式 && Broker消息隊列鎖有效) 
  11.                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) 
  12.                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { 
  13.                    final long beginTime = System.currentTimeMillis(); 
  14.                    // 循環 
  15.                    for (boolean continueConsume = true; continueConsume; ) { 
  16.                        if (this.processQueue.isDropped()) { 
  17.                            log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); 
  18.                            break; 
  19.                        } 
  20.  
  21.                        // 消息隊列分布式鎖未鎖定,提交延遲獲得鎖并消費請求 
  22.                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) 
  23.                            && !this.processQueue.isLocked()) { 
  24.                            log.warn("the message queue not locked, so consume later, {}", this.messageQueue); 
  25.                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); 
  26.                            break; 
  27.                        } 
  28.                        // 消息隊列分布式鎖已經過期,提交延遲獲得鎖并消費請求 
  29.                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) 
  30.                            && this.processQueue.isLockExpired()) { 
  31.                            log.warn("the message queue lock expired, so consume later, {}", this.messageQueue); 
  32.                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); 
  33.                            break; 
  34.                        } 
  35.  
  36.                        // 當前周期消費時間超過連續時長,默認:60s,提交延遲消費請求。默認情況下,每消費1分鐘休息10ms。 
  37.                        long interval = System.currentTimeMillis() - beginTime; 
  38.                        if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { 
  39.                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); 
  40.                            break; 
  41.                        } 
  42.  
  43.                        // 獲取消費消息。此處和并發消息請求不同,并發消息請求已經帶了消費哪些消息。 
  44.                        final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); 
  45.                        List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); 
  46.                        if (!msgs.isEmpty()) { 
  47.                            final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); 
  48.  
  49.                            ConsumeOrderlyStatus status = null
  50.  
  51.                            // Hook:before 
  52.                            ConsumeMessageContext consumeMessageContext = null
  53.                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { 
  54.                                consumeMessageContext = new ConsumeMessageContext(); 
  55.                                consumeMessageContext 
  56.                                    .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup()); 
  57.                                consumeMessageContext.setMq(messageQueue); 
  58.                                consumeMessageContext.setMsgList(msgs); 
  59.                                consumeMessageContext.setSuccess(false); 
  60.                                // init the consume context type 
  61.                                consumeMessageContext.setProps(new HashMap<String, String>()); 
  62.                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); 
  63.                            } 
  64.  
  65.                            // 執行消費 
  66.                            long beginTimestamp = System.currentTimeMillis(); 
  67.                            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; 
  68.                            boolean hasException = false
  69.                            try { 
  70.                                this.processQueue.getLockConsume().lock(); // 鎖定隊列消費鎖 
  71.  
  72.                                if (this.processQueue.isDropped()) { 
  73.                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}"
  74.                                        this.messageQueue); 
  75.                                    break; 
  76.                                } 
  77.  
  78.                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); 
  79.                            } catch (Throwable e) { 
  80.                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", // 
  81.                                    RemotingHelper.exceptionSimpleDesc(e), // 
  82.                                    ConsumeMessageOrderlyService.this.consumerGroup, // 
  83.                                    msgs, // 
  84.                                    messageQueue); 
  85.                                hasException = true
  86.                            } finally { 
  87.                                this.processQueue.getLockConsume().unlock(); // 鎖定隊列消費鎖 
  88.                            } 
  89.  
  90.                            if (null == status // 
  91.                                || ConsumeOrderlyStatus.ROLLBACK == status// 
  92.                                || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { 
  93.                                log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", // 
  94.                                    ConsumeMessageOrderlyService.this.consumerGroup, // 
  95.                                    msgs, // 
  96.                                    messageQueue); 
  97.                            } 
  98.  
  99.                            // 解析消費結果狀態 
  100.                            long consumeRT = System.currentTimeMillis() - beginTimestamp; 
  101.                            if (null == status) { 
  102.                                if (hasException) { 
  103.                                    returnType = ConsumeReturnType.EXCEPTION; 
  104.                                } else { 
  105.                                    returnType = ConsumeReturnType.RETURNNULL; 
  106.                                } 
  107.                            } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { 
  108.                                returnType = ConsumeReturnType.TIME_OUT; 
  109.                            } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { 
  110.                                returnType = ConsumeReturnType.FAILED; 
  111.                            } else if (ConsumeOrderlyStatus.SUCCESS == status) { 
  112.                                returnType = ConsumeReturnType.SUCCESS; 
  113.                            } 
  114.  
  115.                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { 
  116.                                consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); 
  117.                            } 
  118.  
  119.                            if (null == status) { 
  120.                                status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; 
  121.                            } 
  122.  
  123.                            // Hook:after 
  124.                            if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { 
  125.                                consumeMessageContext.setStatus(status.toString()); 
  126.                                consumeMessageContext 
  127.                                    .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status); 
  128.                                ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); 
  129.                            } 
  130.  
  131.                            ConsumeMessageOrderlyService.this.getConsumerStatsManager() 
  132.                                .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); 
  133.  
  134.                            // 處理消費結果 
  135.                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); 
  136.                        } else { 
  137.                            continueConsume = false
  138.                        } 
  139.                    } 
  140.                } else { 
  141.                    if (this.processQueue.isDropped()) { 
  142.                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); 
  143.                        return
  144.                    } 
  145.  
  146.                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); 
  147.                } 
  148.            } 
  149.        } 

獲取到鎖對象后,使用synchronized嘗試申請線程級獨占鎖。

如果加鎖成功,同一時刻只有一個線程進行消息消費。

如果加鎖失敗,會延遲100ms重新嘗試向broker端申請鎖定messageQueue,鎖定成功后重新提交消費請求

至此,第三個關鍵點的解決思路也清晰了,基本上就兩個步驟。

創建消息拉取任務時,消息客戶端向broker端申請鎖定MessageQueue,使得一個MessageQueue同一個時刻只能被一個消費客戶端消費。

消息消費時,多線程針對同一個消息隊列的消費先嘗試使用synchronized申請獨占鎖,加鎖成功才能進行消費,使得一個MessageQueue同一個時刻只能被一個消費客戶端中一個線程消費。

【順序消費問題拆解】

  1. broke 上要保證一個隊列只有一個進程消費,即一個隊列同一時間只有一個consumer 消費
  2. broker 給consumer 的消息順序應該保持一致,這個通過 rpc傳輸,序列化后消息順序不變,所以很容易實現
  3. consumer 上的隊列消息要保證同一個時間只有一個線程消費

通過問題的拆分,問題變成同一個共享資源串行處理了,要解決這個問題,通常的做法都是訪問資源的時候加鎖,即broker 上一個隊列消息在被consumer 訪問的必須加鎖,單個consumer 端多線程并發處理消息的時候需要加鎖;這里還需要考慮broker 鎖的異常情況,假如一個broke 隊列上的消息被consumer 鎖住了,萬一consumer 崩潰了,這個鎖就釋放不了,所以broker 上的鎖需要加上鎖的過期時間。

實際上 RocketMQ 消費端也就是照著上面的思路做:

RocketMQ中順序消息注意事項

實際項目中并不是所有情況都需要用到順序消息,但這也是設計方案的時候容易忽略的一點

順序消息是生產者和消費者配合協調作用的結果,但是消費端保證順序消費,是保證不了順序消息的

消費端并行方式消費,只設置一次拉取消息的數量為 1(即配置參數 consumeBatchSize ),是否可以實現順序消費 ?這里實際是不能的,并發消費在消費端有多個線程同時消費,consumeBatchSize 只是一個線程一次拉取消息的數量,對順序消費沒有意義,這里大家有興趣可以看 ConsumeMessageConcurrentlyService 的代碼,并發消費的邏輯都在哪里。

在使用順序消息時,一定要注意其異常情況的出現,對于順序消息,當消費者消費消息失敗后,消息隊列 RocketMQ 版會自動不斷地進行消息重試(每次間隔時間為 1 秒),重試最大值是Integer.MAX_VALUE.這時,應用會出現消息消費被阻塞的情況。因此,建議您使用順序消息時,務必保證應用能夠及時監控并處理消費失敗的情況,避免阻塞現象的發生。

重要的事再強調一次:在使用順序消息時,一定要注意其異常情況的出現!防止資源不釋放!

小結

通過以上的了解,我們知道了實現順序消息所必要的條件:順序發送、順序存儲、順序消費。RocketMQ的設計中考慮到了這些,我們只需要簡單的使用API,不需要額外使用代碼來約束業務,使得實現順序消息更加簡單。

 

責任編輯:姜華 來源: 小汪哥寫代碼
相關推薦

2021-07-14 17:18:14

RocketMQ消息分布式

2021-07-08 07:16:24

RocketMQ數據結構Message

2021-07-07 15:29:52

存儲RocketMQ體系

2021-07-16 18:44:42

RocketMQ知識

2021-07-09 07:15:48

RocketMQ數據結構kafka

2021-07-12 10:25:03

RocketMQ數據結構kafka

2022-06-27 11:04:24

RocketMQ順序消息

2021-07-07 07:06:31

Brokerkafka架構

2015-07-28 17:52:36

IOS知識體系

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2017-06-22 13:07:21

2012-03-08 11:13:23

企業架構

2017-04-03 15:35:13

知識體系架構

2017-02-27 16:42:23

Spark識體系

2021-07-05 06:26:08

生產者kafka架構

2021-07-08 05:52:34

Kafka架構主從架構

2023-09-04 08:00:53

提交事務消息

2015-07-16 10:15:44

web前端知識體系

2020-09-09 09:15:58

Nginx體系進程

2020-10-26 08:34:18

知識體系普適性
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 伦理二区| 日韩免费av | 日日操av| 欧洲一区二区三区 | 国产成视频在线观看 | 国产一区二区影院 | 日韩成人一区 | 日韩视频一区二区在线 | 在线国产一区二区 | 在线观看第一区 | 欧洲毛片| 亚洲成人一区二区 | 99视频在线免费观看 | 亚洲精品2 | 欧美激情在线一区二区三区 | 黄免费观看视频 | 中文字幕av在线 | 久久福利网站 | www.日本精品 | 欧美大片一区二区 | 国产一区二区免费 | 亚洲福利在线视频 | 亚洲精品乱码久久久久久按摩 | 免费网站国产 | 国产精品久久久久久妇女6080 | 日韩一区二区三区视频在线播放 | 亚洲综合婷婷 | 久一精品 | 精品欧美一区二区三区久久久 | 国产欧美久久一区二区三区 | 国产小网站| 久草在线青青草 | 一级a性色生活片久久毛片波多野 | 国产成人精品免费视频大全最热 | 免费一级欧美在线观看视频 | 久久国产精品偷 | 免费一区 | 国产成人精品一区二区在线 | 日韩精品免费一区 | 欧洲一区二区在线 | 亚洲视频免费在线观看 |