成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

RocketMQ 用法詳解,你學(xué)會了嗎?

開發(fā) 架構(gòu)
正常情況下生產(chǎn)者組是沒有作用的,但是在發(fā)送事務(wù)消息時,如果producer中途意外宕機,broker會主動回調(diào)producer group 內(nèi)的任意一臺機器來確認事務(wù)的狀態(tài)。

大家好,我是指北君。

圖片

消息中間件是我們工作中使用最頻繁的一類中間件,它具有低耦合、可靠投遞、廣播、流量控制、最終一致性等一系列功能,成為異步RPC的主要手段之一。當(dāng)今市面上有很多主流的消息中間件,如老牌的ActiveMQ、RabbitMQ,炙手可熱的Kafka,阿里巴巴自主開發(fā)RocketMQ等。今天,指北君就來詳細講講RocketMQ生產(chǎn)者和消費者在使用時的一些注意事項。

一. 生產(chǎn)者

1.1 發(fā)送消息注意事項

1)消息大小

建議消息大小不要超過512K。

2)異步發(fā)送

默認的發(fā)送為同步發(fā)送,send方法會一直阻塞,等待broker端的響應(yīng)。如果你關(guān)注性能問題,可以通過send(msg, callback)來發(fā)起異步調(diào)用。

3)生產(chǎn)者組

正常情況下生產(chǎn)者組是沒有作用的,但是在發(fā)送事務(wù)消息時,如果producer中途意外宕機,broker會主動回調(diào)producer group 內(nèi)的任意一臺機器來確認事務(wù)的狀態(tài)。(目前開源版本還不支持事務(wù)消息)。

4)線程安全問題

生產(chǎn)者實例是線程安全的,在應(yīng)用中只需要實例化一次即可。

5)性能問題

如果你希望在一個jvm進程內(nèi)使用多個producer實例來提高發(fā)送能,我們建議:

使用異步發(fā)送,并且producer實例只需要3 ~ 5個即可 對每一個producer 調(diào)用 setInstanceName,區(qū)別不同的生產(chǎn)者。

6)發(fā)送超時時間

當(dāng)客戶端向broker發(fā)送請求超時時,客戶端會拋出 RemotingTimeoutException,默認的超時時間是3秒。通過調(diào)用send(msg, timeout) 可以設(shè)置超時時間。超時時間建議不要設(shè)置過小,因為 broker 可能需要時間刷盤或向 slave 同步數(shù)據(jù)。

7)對于同一個應(yīng)用最好只使用一個Topic,消息的子類型可以使用 tags 來標(biāo)識,tags 可以由應(yīng)用自由設(shè)置。當(dāng)發(fā)送的消息設(shè)置了 tags 時,消費方在訂閱消息時可以使用 tags 在 broker 做消息過濾。注意這里的命名雖然是復(fù)數(shù),但是一條消息只能有一個tag。

8)消息在業(yè)務(wù)層面的唯一標(biāo)識可以設(shè)置到 keys 字段,方便根據(jù) keys 來定位消息。broker 會為每個消息創(chuàng)建索引(哈希索引),應(yīng)用可以通過topic 、key 查詢這條消息的內(nèi)容(MessageExt),以及消息被誰消費(MessageTrack,精確到consumer group)。由于是哈希索引,請盡量保證key 的唯一,這樣可以避免潛在的哈希沖突。

9)消息發(fā)送不管是成功還是失敗都要打印消息日志,日志內(nèi)容務(wù)必包含 sendResult 和 key 字段。

10)對于消息不可丟失的應(yīng)用,務(wù)必要有消息重發(fā)機制。例如如果消息發(fā)送失敗,可以將消息存儲到數(shù)據(jù)庫,然后通過定時程序或者人工的方式觸發(fā)重發(fā)。

11)調(diào)用send 同步發(fā)送消息時,假定此時設(shè)置了 isWaitStoreMsgOK=true(default is true),只要不拋出異常就代表發(fā)送成功,但當(dāng) isWaitStoreMsgOK = false 時,發(fā)送永遠返回 SEND_OK。但是對于發(fā)送“成功”會有多個狀態(tài),在 SendStatus 中定義如下:

FLUSH_DISK_TIMEOUT

如果 broker 設(shè)置的 FlushDiskType = SYNC_FLUSH,當(dāng) broker 的在刷盤超時時(MessageStoreConfig.syncFlushTimeout,默認5秒)會返回該狀態(tài)。此時消息任然保存在內(nèi)存中,只有broker 宕機時消息才會丟失。

FLUSH_SLAVE_TIMEOU

如果 broker 的 role 是 SYNC_MASTER,當(dāng) slave 同步數(shù)據(jù)的時間超過了 MessageStoreConfig.syncFlushTimeout (默認5秒) 時會返回此狀態(tài)。此時只有主從都宕機,并且主也沒有刷盤時,消息才會丟失。

SLAVE_NOT_AVAILABLE

如果 broker 的 role 是 SYNC_MASTER,并且此時 slave 不可用時會返回該狀態(tài)。

SEND_OK

發(fā)送成功。為了保證消息不丟失還需要配置 SYNC_MASTER or SYNC_FLUSH。

12)消息重復(fù)

當(dāng)發(fā)送消息時返回 FLUSH_DISK_TIMEOUT/FLUSH_SLAVE_TIMEOUT,若非常不幸的 broker 也宕機了,消息將會丟失。此時如果什么都不做,消息可能會丟失,如果重發(fā)消息,消息可能會出現(xiàn)重復(fù)。

通常我們建議發(fā)送端重發(fā)消息,由消費方來保證消息消費的冪等性。

1.2 消息發(fā)送失敗如何處理

Producer 的 send 方法本生支持內(nèi)部重試,重試邏輯如下:

至多重試3次 如果發(fā)送失敗,則輪轉(zhuǎn)到下一個broker 這個方法的總耗時時間不超過 sendMsgTimeout,默認3秒 所以發(fā)送消息已經(jīng)產(chǎn)生超時異常的話就不會再重試。以上策略仍不能保證消息發(fā)送一定成功,為保證消息發(fā)送一定成功,建議應(yīng)用這么做:如果調(diào)用 send 同步發(fā)送失敗,則嘗試將消息存儲到db,由后臺線程定時重試,保證消息一定到達 Broker。

1.3 oneway 的發(fā)送形式

對于可靠性要求不高的應(yīng)用,可以采用 oneway 的發(fā)送形式,oneway 形式不等待應(yīng)答。

1.4 發(fā)送順序消息

順序消息分為分區(qū)有序和全局有序。

分區(qū)有序要求 producer 在send 時傳入 MessageQueueSelector 的實現(xiàn)類,最終將某一類消息發(fā)送到同一隊列。但是一旦發(fā)生通信異常、broker 重啟等,由于隊列總數(shù)發(fā)生變化,哈希取模后定位的隊列會變化,會產(chǎn)生短暫的順序不一致。如果業(yè)務(wù)能容忍在集群異常情況下(如某個 broker 宕機或者重啟)消息短暫的亂序,使用分區(qū)有序比較合適。

全局嚴格有序的消息即便在異常情況下也能保證消息的有序性,但是卻犧牲了分布式的 failover 特性,即 broker 集群中只有要一臺機器不可用,則整個集群都不可用,服務(wù)可用性會大大降低。

順序消息的缺點:

發(fā)送順序消息無法利用集群的 FailOver 特性 消費順序消息的并行度依賴于隊列數(shù)量 隊列熱點問題,個別隊列由于哈希不均導(dǎo)致消息過多,消費速度跟不上,產(chǎn)生消費堆積問題 遇到消費失敗的消息,無法跳過,當(dāng)前隊列需要暫停 5.發(fā)送事務(wù)消息 目前暫不支持。

二. 消費者

2.1 消費者組和訂閱

不同的消費者組可以獨立消費相同的topic,這點類似于ActiveMQ的虛擬 topic. 另外對于相同的消費者組,需要確保組內(nèi)的消費者訂閱消息的規(guī)則是一致的!

MQ 里的一個Consumer Group 代表一個 Consumer 實例群組。對于大多數(shù)分布式應(yīng)用來說,一個 Consumer Group 下通常會掛載多個 Consumer 實例。訂閱關(guān)系一致指的是同一個 Consumer Group 下所有 Consumer 實例的處理邏輯必須完全一致。一旦訂閱關(guān)系不一致,消息消費的邏輯就會混亂,甚至導(dǎo)致消息丟失。

由于 MQ 的訂閱關(guān)系主要由 Topic+Tag 共同組成,因此,保持訂閱關(guān)系一致意味著同一個 Consumer Group 下所有的實例需在以下兩方面均保持一致:

訂閱的 Topic 必須一致;訂閱的 Topic 中的 Tag 必須一致。

技術(shù)架構(gòu) > Consumer 最佳實踐 > image2017-11-15 15:50:13.png

2.2 MessageListener

1)順序消費 MessageListenerOrderly

順序消費時消費者會鎖定隊列,以確保消息被順序消費,但是這樣也會造成一定的性能損耗。當(dāng)消費出現(xiàn)異常的時候,建議不要拋出異常,而是返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,讓消費暫停一會,暫停時間由 context.setSuspendCurrentQueueTimeMillis 方法指定。

2)并發(fā)消費

并發(fā)消費是推薦的消費方式,在此種模式下,消息將被并發(fā)的消費。消費出現(xiàn)異常時不建議拋出異常,只需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 即可。為了保證消息肯定被至少消費一次,消息將會被重發(fā)回 broker (topic不是原topic而是這個消費組的RETRY topic),在延遲的某個時間點(默認是10秒,業(yè)務(wù)可設(shè)置,通過 delayLevelWhenNextConsume 和 MessageStoreConfig.messageDelayLevel 設(shè)置)后,再次投遞到這個 ConsumerGroup,而如果一直這樣重復(fù)消費都持續(xù)失敗到一定次數(shù)(默認是16次,DefaultMQPushConsumer.maxReconsumeTimes),就會投遞到DLQ隊列。應(yīng)用可以監(jiān)控死信隊列來做人工干預(yù)。

3)返回狀態(tài)

在并行消費時可以通過返回 RECONSUME_LATER 來告訴 Consumer 當(dāng)前無法消費該消息,等延時一段時間再重新消費,但是此時消費不會停止,你可以繼續(xù)消費其他消息。但在順序消費時,因為要保證消費的順序性,所以你不能跳過失敗的消息,此時你可以通過返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 來告訴 Consumer 先暫停一會。

4)阻塞

不建議阻塞Listener,因為這會阻塞住線程池,同時也有可能造成消費者線程終止。

2.3 線程數(shù)

consumer 內(nèi)部通過一個 ThreadPoolExecutor 來消費消息,可以通過 setConsumeThreadMin 和 setConsumeThreadMax 來改變線程池的大小。

2.4 ConsumeFromWhere

當(dāng)新實例啟動的時候,PushConsumer會拿到本消費組broker已經(jīng)記錄好的消費進度(consumer offset),按照這個進度發(fā)起自己的第一次Pull請求。

如果這個消費進度在Broker并沒有存儲起來,證明這個是一個全新的消費組,這時候客戶端有幾個策略可以選擇:

CONSUME_FROM_LAST_OFFSET //默認策略,從該隊列最尾開始消費,即跳過歷史消息。

CONSUME_FROM_FIRST_OFFSET //從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍。

CONSUME_FROM_TIMESTAMP//從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前 注意:這些配置只對全新的消費組有效,老的消費組都是按已經(jīng)存儲過的消費進度繼續(xù)消費。

對于老消費組想跳過歷史消息可以采用以下幾種方法:

1)判斷消息的發(fā)送時間,太老的消息直接返回 CONSUME_SUCCESS。

2)判斷消息的 offset 和 MAX_OFFSET 的差距,如果落后太多,可以直接。返回 CONSUME_SUCCESS。

3)消費者啟動前,先調(diào)整該消費組的消費進度,再開始消費。可以人工使用命令 resetOffsetByTimeStamp,詳見 ResetOffsetByTimeCommand.java。

2.5 消息冪等

由于 RocketMQ 無法避免消費重復(fù),所以如果業(yè)務(wù)對消息重復(fù)非常敏感,務(wù)必在業(yè)務(wù)層面去重。

2.6 消費速度慢處理方式

1)提高消費并行度

大部分消息消費行為都屬于 IO 密集型業(yè)務(wù),適當(dāng)?shù)奶岣卟l(fā)度可以顯著的改善消費的吞吐量。

2)批量方式消費

默認情況下 consumer 的 consumeMessageBatchMaxSize 為1,即一次只消費一個消息,如果應(yīng)用可以批量消費消息,則可以很大程度上提高消費吞吐量。

3)跳過非重要消息

當(dāng)消堆積嚴重時可以丟棄不重要的消息。

4)優(yōu)化消息消費過程

2.7 打印消費日志

建議在消費入口方法打印消息,方便后續(xù)排查問題,消費失敗時也打印失敗日志。

2.8 利用broker過濾消息,避免多余的消息傳輸

三. 小結(jié)

好了,RocketMQ生產(chǎn)者與消費者的使用事項就總結(jié)完畢了,相信大家對RocketMQ的使用應(yīng)該會更有信心了。

責(zé)任編輯:武曉燕 來源: Java技術(shù)指北
相關(guān)推薦

2024-02-04 00:00:00

Effect數(shù)據(jù)組件

2024-01-02 12:05:26

Java并發(fā)編程

2024-10-11 09:15:33

2023-07-03 07:20:50

2022-12-06 07:53:33

MySQL索引B+樹

2023-03-26 22:31:29

2022-12-06 08:37:43

2022-04-26 08:41:54

JDK動態(tài)代理方法

2023-08-08 08:23:08

Spring日志?線程池

2022-04-13 09:01:45

SASSCSS處理器

2023-09-06 11:31:24

MERGE用法SQL

2024-09-10 10:34:48

2024-12-31 00:08:37

C#語言dynamic?

2023-03-09 07:38:58

static關(guān)鍵字狀態(tài)

2024-08-12 08:12:38

2023-05-18 09:01:11

MBRGPT分區(qū)

2024-10-12 10:25:15

2024-01-19 08:25:38

死鎖Java通信

2023-07-26 13:11:21

ChatGPT平臺工具

2023-01-10 08:43:15

定義DDD架構(gòu)
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 国产成人一区二区 | 国产精品久久久久久久久免费高清 | 97在线超碰 | 夜夜操av | 亚洲一区中文字幕 | 中文字幕第九页 | 日本一二三区高清 | 精品视频99 | 色999视频| 九色在线观看 | 成人在线观看免费视频 | 亚洲精品久久久久久久不卡四虎 | 成人影院在线观看 | 国产精品永久免费视频 | 亚洲激精日韩激精欧美精品 | 精品久久一区二区三区 | 精品国产色| 日韩不卡三区 | 欧美日韩在线一区二区三区 | 99在线精品视频 | 国产精品一区一区三区 | 国产区在线免费观看 | 成人在线视频网 | h视频免费观看 | 99热热精品 | 亚洲欧洲中文日韩 | 久久久91精品国产一区二区三区 | 81精品国产乱码久久久久久 | 欧美精品video | 欧美精品中文字幕久久二区 | 澳门永久av免费网站 | 国产成人精品一区二区三区视频 | 欧美国产精品一区二区三区 | 久久网一区二区三区 | 欧美二级| 欧美日韩综合一区 | 欧美日韩三区 | 韩日精品视频 | 在线观看日本高清二区 | 人人草人人干 | 国产一区二区精 |