順序消息的實現-RocketMQ知識體系(五)
我們知道,kafka 如果要保證順序消費,必須保證消息保存到同一個patition上,而且為了有序性,只能有一個消費者進行消費。這種情況下,Kafka 就退化成了單一隊列,毫無并發性可言,極大降低系統性能。那么對于對業務比較友好的RocketMQ 是如何實現的呢?首先,我們循序漸進的來了解下順序消息的實現。
順序消息業務使用場景
1、電商場景中傳遞訂單狀態。
2、同步mysql 的binlong 日志,數據庫的操作是有順序的。
3、其他消息之間有先后的依賴關系,后一條消息需要依賴于前一條消息的處理結果的情況。
等等。。。
消息中間件中的順序消息
順序消息(FIFO 消息)是 MQ 提供的一種嚴格按照順序進行發布和消費的消息類型。順序消息由兩個部分組成:順序發布和順序消費。
順序消息包含兩種類型:
分區順序:一個Partition(queue)內所有的消息按照先進先出的順序進行發布和消費
全局順序:一個Topic內所有的消息按照先進先出的順序進行發布和消費.但是全局順序極大的降低了系統的吞吐量,不符合mq的設計初衷。
那么折中的辦法就是選擇分區順序。
【局部順序消費】
如何保證順序
在MQ的模型中,順序需要由3個階段去保障:
- 消息被發送時保持順序
- 消息被存儲時保持和發送的順序一致
- 消息被消費時保持和存儲的順序一致
發送時保持順序意味著對于有順序要求的消息,用戶應該在同一個線程中采用同步的方式發送。存儲保持和發送的順序一致則要求在同一線程中被發送出來的消息A和B,存儲時在空間上A一定在B之前。而消費保持和存儲一致則要求消息A、B到達Consumer之后必須按照先A后B的順序被處理。
第一點,消息順序發送,多線程發送的消息無法保證有序性,因此,需要業務方在發送時,針對同一個業務編號(如同一筆訂單)的消息需要保證在一個線程內順序發送,在上一個消息發送成功后,在進行下一個消息的發送。對應到mq中,消息發送方法就得使用同步發送,異步發送無法保證順序性。
第二點,消息順序存儲,mq的topic下會存在多個queue,要保證消息的順序存儲,同一個業務編號的消息需要被發送到一個queue中。對應到mq中,需要使用MessageQueueSelector來選擇要發送的queue,即對業務編號進行hash,然后根據隊列數量對hash值取余,將消息發送到一個queue中。
第三點,消息順序消費,要保證消息順序消費,同一個queue就只能被一個消費者所消費,因此對broker中消費隊列加鎖是無法避免的。同一時刻,一個消費隊列只能被一個消費者消費,消費者內部,也只能有一個消費線程來消費該隊列。即,同一時刻,一個消費隊列只能被一個消費者中的一個線程消費。
RocketMQ中順序的實現
【Producer端】
Producer端確保消息順序唯一要做的事情就是將消息路由到特定的分區,在RocketMQ中,通過MessageQueueSelector來實現分區的選擇。
- /**
- * 消息隊列選擇器
- */
- public interface MessageQueueSelector {
- /**
- * 選擇消息隊列
- *
- * @param mqs 消息隊列
- * @param msg 消息
- * @param arg 參數
- * @return 消息隊列
- */
- MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
- }
- List
mqs:消息要發送的Topic下所有的分區 - Message msg:消息對象
- 額外的參數:用戶可以傳遞自己的參數
比如如下實現就可以保證相同的訂單的消息被路由到相同的分區:
- long orderId = ((Order) object).getOrderId;
- return mqs.get(orderId % mqs.size());
【Consumer端】
嘗試鎖定鎖定MessageQueue。
首先我們如何保證一個隊列只被一個消費者消費?
消費隊列存在于broker端,如果想保證一個隊列被一個消費者消費,那么消費者在進行消息拉取消費時就必須向mq服務器申請隊列鎖,消費者申請隊列鎖的代碼存在于RebalanceService消息隊列負載的實現代碼中。
消費者重新負載,并且分配完消費隊列后,需要向mq服務器發起消息拉取請求,代碼實現在RebalanceImpl#updateProcessQueueTableInRebalance中,針對順序消息的消息拉取,mq做了如下判斷:
- // 增加 不在processQueueTable && 存在于mqSet 里的消息隊列。
- List<PullRequest> pullRequestList = new ArrayList<>(); // 拉消息請求數組
- for (MessageQueue mq : mqSet) {
- if (!this.processQueueTable.containsKey(mq)) {
- if (isOrder && !this.lock(mq)) { // 順序消息鎖定消息隊列
- log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
- continue;
- }
- this.removeDirtyOffset(mq);
- ProcessQueue pq = new ProcessQueue();
- long nextOffset = this.computePullFromWhere(mq);
- if (nextOffset >= 0) {
- ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
- if (pre != null) {
- log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
- } else {
- log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
- PullRequest pullRequest = new PullRequest();
- pullRequest.setConsumerGroup(consumerGroup);
- pullRequest.setNextOffset(nextOffset);
- pullRequest.setMessageQueue(mq);
- pullRequest.setProcessQueue(pq);
- pullRequestList.add(pullRequest);
- changed = true;
- }
- } else {
- log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
- }
- }
- }
- // 發起消息拉取請求
- this.dispatchPullRequest(pullRequestList);
核心思想就是,消費客戶端先向broker端發起對messageQueue的加鎖請求,只有加鎖成功時才創建pullRequest進行消息拉取,下面看下lock加鎖請求方法:
- /**
- * 請求Broker獲得指定消息隊列的分布式鎖
- *
- * @param mq 隊列
- * @return 是否成功
- */
- public boolean lock(final MessageQueue mq) {
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
- if (findBrokerResult != null) {
- LockBatchRequestBody requestBody = new LockBatchRequestBody();
- requestBody.setConsumerGroup(this.consumerGroup);
- requestBody.setClientId(this.mQClientFactory.getClientId());
- requestBody.getMqSet().add(mq);
- try {
- // 請求Broker獲得指定消息隊列的分布式鎖
- Set<MessageQueue> lockedMq =
- this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
- // 設置消息處理隊列鎖定成功。鎖定消息隊列成功,可能本地沒有消息處理隊列,設置鎖定成功會在lockAll()方法。
- for (MessageQueue mmqq : lockedMq) {
- ProcessQueue processQueue = this.processQueueTable.get(mmqq);
- if (processQueue != null) {
- processQueue.setLocked(true);
- processQueue.setLastLockTimestamp(System.currentTimeMillis());
- }
- }
- boolean lockOK = lockedMq.contains(mq);
- log.info("the message queue lock {}, {} {}",
- lockOK ? "OK" : "Failed",
- this.consumerGroup,
- mq);
- return lockOK;
- } catch (Exception e) {
- log.error("lockBatchMQ exception, " + mq, e);
- }
- }
- return false;
- }
代碼實現邏輯比較清晰,就是調用lockBatchMQ方法發送了一個加鎖請求,那么broker端收到加鎖請求后的處理邏輯又是怎么樣?
【broker端實現】
broker端收到加鎖請求的處理邏輯在RebalanceLockManager#tryLockBatch方法中,RebalanceLockManager中關鍵屬性如下:
- /**
- * 消息隊列鎖過期時間,默認60s
- */
- private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
- "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
- /**
- * 鎖
- */
- private final Lock lock = new ReentrantLock();
- /**
- * 消費分組的消息隊列鎖映射
- */
- private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
- new ConcurrentHashMap<>(1024);
LockEntry對象中關鍵屬性如下:
- /**
- * 鎖定記錄
- */
- static class LockEntry {
- /**
- * 客戶端編號
- */
- private String clientId;
- /**
- * 最后鎖定時間
- */
- private volatile long lastUpdateTimestamp = System.currentTimeMillis();
- public String getClientId() {
- return clientId;
- }
- public void setClientId(String clientId) {
- this.clientId = clientId;
- }
- public long getLastUpdateTimestamp() {
- return lastUpdateTimestamp;
- }
- public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
- this.lastUpdateTimestamp = lastUpdateTimestamp;
- }
- /**
- * 是否鎖定
- *
- * @param clientId 客戶端編號
- * @return 是否
- */
- public boolean isLocked(final String clientId) {
- boolean eq = this.clientId.equals(clientId);
- return eq && !this.isExpired();
- }
- /**
- * 鎖定是否過期
- *
- * @return 是否
- */
- public boolean isExpired() {
- boolean expired =
- (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
- return expired;
- }
- }
broker端通過對ConcurrentMap
【再次回到Consumer端,拿到鎖后】
消費者對messageQueue的加鎖已經成功,那么就進入到了第二個步驟,創建pullRequest進行消息拉取,消息拉取部分的代碼實現在PullMessageService中,消息拉取完后,需要提交到ConsumeMessageService中進行消費,順序消費的實現為ConsumeMessageOrderlyService,提交消息進行消費的方法為ConsumeMessageOrderlyService#submitConsumeRequest,具體實現如下:
- @Override
- public void submitConsumeRequest(//
- final List<MessageExt> msgs, //
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue, //
- final boolean dispathToConsume) {
- if (dispathToConsume) {
- ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
- this.consumeExecutor.submit(consumeRequest);
- }
- }
構建了一個ConsumeRequest對象,并提交給了ThreadPoolExecutor來并行消費,看下順序消費的ConsumeRequest的run方法實現:
- public void run() {
- if (this.processQueue.isDropped()) {
- log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
- return;
- }
- // 獲得 Consumer 消息隊列鎖
- final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
- synchronized (objLock) {
- // (廣播模式) 或者 (集群模式 && Broker消息隊列鎖有效)
- if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
- || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
- final long beginTime = System.currentTimeMillis();
- // 循環
- for (boolean continueConsume = true; continueConsume; ) {
- if (this.processQueue.isDropped()) {
- log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
- break;
- }
- // 消息隊列分布式鎖未鎖定,提交延遲獲得鎖并消費請求
- if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
- && !this.processQueue.isLocked()) {
- log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
- ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
- break;
- }
- // 消息隊列分布式鎖已經過期,提交延遲獲得鎖并消費請求
- if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
- && this.processQueue.isLockExpired()) {
- log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
- ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
- break;
- }
- // 當前周期消費時間超過連續時長,默認:60s,提交延遲消費請求。默認情況下,每消費1分鐘休息10ms。
- long interval = System.currentTimeMillis() - beginTime;
- if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
- ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
- break;
- }
- // 獲取消費消息。此處和并發消息請求不同,并發消息請求已經帶了消費哪些消息。
- final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
- List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
- if (!msgs.isEmpty()) {
- final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
- ConsumeOrderlyStatus status = null;
- // Hook:before
- ConsumeMessageContext consumeMessageContext = null;
- if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- consumeMessageContext = new ConsumeMessageContext();
- consumeMessageContext
- .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
- consumeMessageContext.setMq(messageQueue);
- consumeMessageContext.setMsgList(msgs);
- consumeMessageContext.setSuccess(false);
- // init the consume context type
- consumeMessageContext.setProps(new HashMap<String, String>());
- ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
- }
- // 執行消費
- long beginTimestamp = System.currentTimeMillis();
- ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
- boolean hasException = false;
- try {
- this.processQueue.getLockConsume().lock(); // 鎖定隊列消費鎖
- if (this.processQueue.isDropped()) {
- log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
- this.messageQueue);
- break;
- }
- status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
- } catch (Throwable e) {
- log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
- RemotingHelper.exceptionSimpleDesc(e), //
- ConsumeMessageOrderlyService.this.consumerGroup, //
- msgs, //
- messageQueue);
- hasException = true;
- } finally {
- this.processQueue.getLockConsume().unlock(); // 鎖定隊列消費鎖
- }
- if (null == status //
- || ConsumeOrderlyStatus.ROLLBACK == status//
- || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
- log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //
- ConsumeMessageOrderlyService.this.consumerGroup, //
- msgs, //
- messageQueue);
- }
- // 解析消費結果狀態
- long consumeRT = System.currentTimeMillis() - beginTimestamp;
- if (null == status) {
- if (hasException) {
- returnType = ConsumeReturnType.EXCEPTION;
- } else {
- returnType = ConsumeReturnType.RETURNNULL;
- }
- } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
- returnType = ConsumeReturnType.TIME_OUT;
- } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
- returnType = ConsumeReturnType.FAILED;
- } else if (ConsumeOrderlyStatus.SUCCESS == status) {
- returnType = ConsumeReturnType.SUCCESS;
- }
- if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
- }
- if (null == status) {
- status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- // Hook:after
- if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- consumeMessageContext.setStatus(status.toString());
- consumeMessageContext
- .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
- ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
- }
- ConsumeMessageOrderlyService.this.getConsumerStatsManager()
- .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
- // 處理消費結果
- continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
- } else {
- continueConsume = false;
- }
- }
- } else {
- if (this.processQueue.isDropped()) {
- log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
- return;
- }
- ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
- }
- }
- }
獲取到鎖對象后,使用synchronized嘗試申請線程級獨占鎖。
如果加鎖成功,同一時刻只有一個線程進行消息消費。
如果加鎖失敗,會延遲100ms重新嘗試向broker端申請鎖定messageQueue,鎖定成功后重新提交消費請求
至此,第三個關鍵點的解決思路也清晰了,基本上就兩個步驟。
創建消息拉取任務時,消息客戶端向broker端申請鎖定MessageQueue,使得一個MessageQueue同一個時刻只能被一個消費客戶端消費。
消息消費時,多線程針對同一個消息隊列的消費先嘗試使用synchronized申請獨占鎖,加鎖成功才能進行消費,使得一個MessageQueue同一個時刻只能被一個消費客戶端中一個線程消費。
【順序消費問題拆解】
- broke 上要保證一個隊列只有一個進程消費,即一個隊列同一時間只有一個consumer 消費
- broker 給consumer 的消息順序應該保持一致,這個通過 rpc傳輸,序列化后消息順序不變,所以很容易實現
- consumer 上的隊列消息要保證同一個時間只有一個線程消費
通過問題的拆分,問題變成同一個共享資源串行處理了,要解決這個問題,通常的做法都是訪問資源的時候加鎖,即broker 上一個隊列消息在被consumer 訪問的必須加鎖,單個consumer 端多線程并發處理消息的時候需要加鎖;這里還需要考慮broker 鎖的異常情況,假如一個broke 隊列上的消息被consumer 鎖住了,萬一consumer 崩潰了,這個鎖就釋放不了,所以broker 上的鎖需要加上鎖的過期時間。
實際上 RocketMQ 消費端也就是照著上面的思路做:
RocketMQ中順序消息注意事項
實際項目中并不是所有情況都需要用到順序消息,但這也是設計方案的時候容易忽略的一點
順序消息是生產者和消費者配合協調作用的結果,但是消費端保證順序消費,是保證不了順序消息的
消費端并行方式消費,只設置一次拉取消息的數量為 1(即配置參數 consumeBatchSize ),是否可以實現順序消費 ?這里實際是不能的,并發消費在消費端有多個線程同時消費,consumeBatchSize 只是一個線程一次拉取消息的數量,對順序消費沒有意義,這里大家有興趣可以看 ConsumeMessageConcurrentlyService 的代碼,并發消費的邏輯都在哪里。
在使用順序消息時,一定要注意其異常情況的出現,對于順序消息,當消費者消費消息失敗后,消息隊列 RocketMQ 版會自動不斷地進行消息重試(每次間隔時間為 1 秒),重試最大值是Integer.MAX_VALUE.這時,應用會出現消息消費被阻塞的情況。因此,建議您使用順序消息時,務必保證應用能夠及時監控并處理消費失敗的情況,避免阻塞現象的發生。
重要的事再強調一次:在使用順序消息時,一定要注意其異常情況的出現!防止資源不釋放!
小結
通過以上的了解,我們知道了實現順序消息所必要的條件:順序發送、順序存儲、順序消費。RocketMQ的設計中考慮到了這些,我們只需要簡單的使用API,不需要額外使用代碼來約束業務,使得實現順序消息更加簡單。