RocketMQ 用法詳解,你學(xué)會了嗎?
大家好,我是指北君。
消息中間件是我們工作中使用最頻繁的一類中間件,它具有低耦合、可靠投遞、廣播、流量控制、最終一致性等一系列功能,成為異步RPC的主要手段之一。當(dāng)今市面上有很多主流的消息中間件,如老牌的ActiveMQ、RabbitMQ,炙手可熱的Kafka,阿里巴巴自主開發(fā)RocketMQ等。今天,指北君就來詳細講講RocketMQ生產(chǎn)者和消費者在使用時的一些注意事項。
一. 生產(chǎn)者
1.1 發(fā)送消息注意事項
1)消息大小
建議消息大小不要超過512K。
2)異步發(fā)送
默認的發(fā)送為同步發(fā)送,send方法會一直阻塞,等待broker端的響應(yīng)。如果你關(guān)注性能問題,可以通過send(msg, callback)來發(fā)起異步調(diào)用。
3)生產(chǎn)者組
正常情況下生產(chǎn)者組是沒有作用的,但是在發(fā)送事務(wù)消息時,如果producer中途意外宕機,broker會主動回調(diào)producer group 內(nèi)的任意一臺機器來確認事務(wù)的狀態(tài)。(目前開源版本還不支持事務(wù)消息)。
4)線程安全問題
生產(chǎn)者實例是線程安全的,在應(yīng)用中只需要實例化一次即可。
5)性能問題
如果你希望在一個jvm進程內(nèi)使用多個producer實例來提高發(fā)送能,我們建議:
使用異步發(fā)送,并且producer實例只需要3 ~ 5個即可 對每一個producer 調(diào)用 setInstanceName,區(qū)別不同的生產(chǎn)者。
6)發(fā)送超時時間
當(dāng)客戶端向broker發(fā)送請求超時時,客戶端會拋出 RemotingTimeoutException,默認的超時時間是3秒。通過調(diào)用send(msg, timeout) 可以設(shè)置超時時間。超時時間建議不要設(shè)置過小,因為 broker 可能需要時間刷盤或向 slave 同步數(shù)據(jù)。
7)對于同一個應(yīng)用最好只使用一個Topic,消息的子類型可以使用 tags 來標(biāo)識,tags 可以由應(yīng)用自由設(shè)置。當(dāng)發(fā)送的消息設(shè)置了 tags 時,消費方在訂閱消息時可以使用 tags 在 broker 做消息過濾。注意這里的命名雖然是復(fù)數(shù),但是一條消息只能有一個tag。
8)消息在業(yè)務(wù)層面的唯一標(biāo)識可以設(shè)置到 keys 字段,方便根據(jù) keys 來定位消息。broker 會為每個消息創(chuàng)建索引(哈希索引),應(yīng)用可以通過topic 、key 查詢這條消息的內(nèi)容(MessageExt),以及消息被誰消費(MessageTrack,精確到consumer group)。由于是哈希索引,請盡量保證key 的唯一,這樣可以避免潛在的哈希沖突。
9)消息發(fā)送不管是成功還是失敗都要打印消息日志,日志內(nèi)容務(wù)必包含 sendResult 和 key 字段。
10)對于消息不可丟失的應(yīng)用,務(wù)必要有消息重發(fā)機制。例如如果消息發(fā)送失敗,可以將消息存儲到數(shù)據(jù)庫,然后通過定時程序或者人工的方式觸發(fā)重發(fā)。
11)調(diào)用send 同步發(fā)送消息時,假定此時設(shè)置了 isWaitStoreMsgOK=true(default is true),只要不拋出異常就代表發(fā)送成功,但當(dāng) isWaitStoreMsgOK = false 時,發(fā)送永遠返回 SEND_OK。但是對于發(fā)送“成功”會有多個狀態(tài),在 SendStatus 中定義如下:
FLUSH_DISK_TIMEOUT
如果 broker 設(shè)置的 FlushDiskType = SYNC_FLUSH,當(dāng) broker 的在刷盤超時時(MessageStoreConfig.syncFlushTimeout,默認5秒)會返回該狀態(tài)。此時消息任然保存在內(nèi)存中,只有broker 宕機時消息才會丟失。
FLUSH_SLAVE_TIMEOU
如果 broker 的 role 是 SYNC_MASTER,當(dāng) slave 同步數(shù)據(jù)的時間超過了 MessageStoreConfig.syncFlushTimeout (默認5秒) 時會返回此狀態(tài)。此時只有主從都宕機,并且主也沒有刷盤時,消息才會丟失。
SLAVE_NOT_AVAILABLE
如果 broker 的 role 是 SYNC_MASTER,并且此時 slave 不可用時會返回該狀態(tài)。
SEND_OK
發(fā)送成功。為了保證消息不丟失還需要配置 SYNC_MASTER or SYNC_FLUSH。
12)消息重復(fù)
當(dāng)發(fā)送消息時返回 FLUSH_DISK_TIMEOUT/FLUSH_SLAVE_TIMEOUT,若非常不幸的 broker 也宕機了,消息將會丟失。此時如果什么都不做,消息可能會丟失,如果重發(fā)消息,消息可能會出現(xiàn)重復(fù)。
通常我們建議發(fā)送端重發(fā)消息,由消費方來保證消息消費的冪等性。
1.2 消息發(fā)送失敗如何處理
Producer 的 send 方法本生支持內(nèi)部重試,重試邏輯如下:
至多重試3次 如果發(fā)送失敗,則輪轉(zhuǎn)到下一個broker 這個方法的總耗時時間不超過 sendMsgTimeout,默認3秒 所以發(fā)送消息已經(jīng)產(chǎn)生超時異常的話就不會再重試。以上策略仍不能保證消息發(fā)送一定成功,為保證消息發(fā)送一定成功,建議應(yīng)用這么做:如果調(diào)用 send 同步發(fā)送失敗,則嘗試將消息存儲到db,由后臺線程定時重試,保證消息一定到達 Broker。
1.3 oneway 的發(fā)送形式
對于可靠性要求不高的應(yīng)用,可以采用 oneway 的發(fā)送形式,oneway 形式不等待應(yīng)答。
1.4 發(fā)送順序消息
順序消息分為分區(qū)有序和全局有序。
分區(qū)有序要求 producer 在send 時傳入 MessageQueueSelector 的實現(xiàn)類,最終將某一類消息發(fā)送到同一隊列。但是一旦發(fā)生通信異常、broker 重啟等,由于隊列總數(shù)發(fā)生變化,哈希取模后定位的隊列會變化,會產(chǎn)生短暫的順序不一致。如果業(yè)務(wù)能容忍在集群異常情況下(如某個 broker 宕機或者重啟)消息短暫的亂序,使用分區(qū)有序比較合適。
全局嚴格有序的消息即便在異常情況下也能保證消息的有序性,但是卻犧牲了分布式的 failover 特性,即 broker 集群中只有要一臺機器不可用,則整個集群都不可用,服務(wù)可用性會大大降低。
順序消息的缺點:
發(fā)送順序消息無法利用集群的 FailOver 特性 消費順序消息的并行度依賴于隊列數(shù)量 隊列熱點問題,個別隊列由于哈希不均導(dǎo)致消息過多,消費速度跟不上,產(chǎn)生消費堆積問題 遇到消費失敗的消息,無法跳過,當(dāng)前隊列需要暫停 5.發(fā)送事務(wù)消息 目前暫不支持。
二. 消費者
2.1 消費者組和訂閱
不同的消費者組可以獨立消費相同的topic,這點類似于ActiveMQ的虛擬 topic. 另外對于相同的消費者組,需要確保組內(nèi)的消費者訂閱消息的規(guī)則是一致的!
MQ 里的一個Consumer Group 代表一個 Consumer 實例群組。對于大多數(shù)分布式應(yīng)用來說,一個 Consumer Group 下通常會掛載多個 Consumer 實例。訂閱關(guān)系一致指的是同一個 Consumer Group 下所有 Consumer 實例的處理邏輯必須完全一致。一旦訂閱關(guān)系不一致,消息消費的邏輯就會混亂,甚至導(dǎo)致消息丟失。
由于 MQ 的訂閱關(guān)系主要由 Topic+Tag 共同組成,因此,保持訂閱關(guān)系一致意味著同一個 Consumer Group 下所有的實例需在以下兩方面均保持一致:
訂閱的 Topic 必須一致;訂閱的 Topic 中的 Tag 必須一致。
技術(shù)架構(gòu) > Consumer 最佳實踐 > image2017-11-15 15:50:13.png
2.2 MessageListener
1)順序消費 MessageListenerOrderly
順序消費時消費者會鎖定隊列,以確保消息被順序消費,但是這樣也會造成一定的性能損耗。當(dāng)消費出現(xiàn)異常的時候,建議不要拋出異常,而是返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,讓消費暫停一會,暫停時間由 context.setSuspendCurrentQueueTimeMillis 方法指定。
2)并發(fā)消費
并發(fā)消費是推薦的消費方式,在此種模式下,消息將被并發(fā)的消費。消費出現(xiàn)異常時不建議拋出異常,只需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 即可。為了保證消息肯定被至少消費一次,消息將會被重發(fā)回 broker (topic不是原topic而是這個消費組的RETRY topic),在延遲的某個時間點(默認是10秒,業(yè)務(wù)可設(shè)置,通過 delayLevelWhenNextConsume 和 MessageStoreConfig.messageDelayLevel 設(shè)置)后,再次投遞到這個 ConsumerGroup,而如果一直這樣重復(fù)消費都持續(xù)失敗到一定次數(shù)(默認是16次,DefaultMQPushConsumer.maxReconsumeTimes),就會投遞到DLQ隊列。應(yīng)用可以監(jiān)控死信隊列來做人工干預(yù)。
3)返回狀態(tài)
在并行消費時可以通過返回 RECONSUME_LATER 來告訴 Consumer 當(dāng)前無法消費該消息,等延時一段時間再重新消費,但是此時消費不會停止,你可以繼續(xù)消費其他消息。但在順序消費時,因為要保證消費的順序性,所以你不能跳過失敗的消息,此時你可以通過返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 來告訴 Consumer 先暫停一會。
4)阻塞
不建議阻塞Listener,因為這會阻塞住線程池,同時也有可能造成消費者線程終止。
2.3 線程數(shù)
consumer 內(nèi)部通過一個 ThreadPoolExecutor 來消費消息,可以通過 setConsumeThreadMin 和 setConsumeThreadMax 來改變線程池的大小。
2.4 ConsumeFromWhere
當(dāng)新實例啟動的時候,PushConsumer會拿到本消費組broker已經(jīng)記錄好的消費進度(consumer offset),按照這個進度發(fā)起自己的第一次Pull請求。
如果這個消費進度在Broker并沒有存儲起來,證明這個是一個全新的消費組,這時候客戶端有幾個策略可以選擇:
CONSUME_FROM_LAST_OFFSET //默認策略,從該隊列最尾開始消費,即跳過歷史消息。
CONSUME_FROM_FIRST_OFFSET //從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍。
CONSUME_FROM_TIMESTAMP//從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前 注意:這些配置只對全新的消費組有效,老的消費組都是按已經(jīng)存儲過的消費進度繼續(xù)消費。
對于老消費組想跳過歷史消息可以采用以下幾種方法:
1)判斷消息的發(fā)送時間,太老的消息直接返回 CONSUME_SUCCESS。
2)判斷消息的 offset 和 MAX_OFFSET 的差距,如果落后太多,可以直接。返回 CONSUME_SUCCESS。
3)消費者啟動前,先調(diào)整該消費組的消費進度,再開始消費。可以人工使用命令 resetOffsetByTimeStamp,詳見 ResetOffsetByTimeCommand.java。
2.5 消息冪等
由于 RocketMQ 無法避免消費重復(fù),所以如果業(yè)務(wù)對消息重復(fù)非常敏感,務(wù)必在業(yè)務(wù)層面去重。
2.6 消費速度慢處理方式
1)提高消費并行度
大部分消息消費行為都屬于 IO 密集型業(yè)務(wù),適當(dāng)?shù)奶岣卟l(fā)度可以顯著的改善消費的吞吐量。
2)批量方式消費
默認情況下 consumer 的 consumeMessageBatchMaxSize 為1,即一次只消費一個消息,如果應(yīng)用可以批量消費消息,則可以很大程度上提高消費吞吐量。
3)跳過非重要消息
當(dāng)消堆積嚴重時可以丟棄不重要的消息。
4)優(yōu)化消息消費過程
2.7 打印消費日志
建議在消費入口方法打印消息,方便后續(xù)排查問題,消費失敗時也打印失敗日志。
2.8 利用broker過濾消息,避免多余的消息傳輸
三. 小結(jié)
好了,RocketMQ生產(chǎn)者與消費者的使用事項就總結(jié)完畢了,相信大家對RocketMQ的使用應(yīng)該會更有信心了。