成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

警惕!這八個場景下 RocketMQ 會發生流量控制

開發 前端
本文介紹了 RocketMQ 發生流量控制的 8 個場景,其中 Broker 4 個場景,Consumer 4 個場景。Broker 的流量控制,本質是對 Producer 的流量控制,最好的解決方法就是給 Broker 擴容,增加 Broker 寫入能力。

大家好,我是君哥。

在使用 RocketMQ 的過程中,有時候我們會看到下面的日志:

[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5

這是因為 RocketMQ 觸發了流量控制。今天我們來聊一聊哪些場景下 RocketMQ 會觸發流量控制。

如上圖,生產者把消息寫入 Broker,Consumer 從 Broker 拉取消息。Broker 是 RocketMQ 的核心 ,觸發流量控制主要就是為了防止 Broker 壓力過大而宕機。

一、 Broker 流控

1、 broker busy

RockerMQ 默認采用異步刷盤策略,Producer 把消息發送到 Broker 后,Broker 會先把消息寫入 Page Cache,刷盤線程定時地把數據從 Page Cache 刷到磁盤上,如下圖:

那 broker busy 是怎么導致的呢?

Broker 默認是開啟快速失敗的,處理邏輯類是 BrokerFastFailure,這個類中有一個定時任務用來清理過期的請求,每 10 ms 執行一次,代碼如下:

public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
cleanExpiredRequest();
}
}
}, 1000, 10, TimeUnit.MILLISECONDS);
}

(1)Page Cache 繁忙

清理過期請求之前首先會判斷 Page Cache 是否繁忙,如果繁忙,就會給 Producer 返回一個系統繁忙的狀態碼(code=2,remark="[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d"),也就是本文開頭的異常日志。那怎么判斷 Page Cache 繁忙呢?Broker 收到一條消息后會追加到 Page Cache 或者內存映射文件,這個過程首先獲取一個 CommitLog 寫入鎖,如果持有鎖的時間大于 osPageCacheBusyTimeOutMills(默認 1s,可以配置),就認為 Page Cache 繁忙。具體代碼見 DefaultMessageStore 類 isOSPageCacheBusy 方法。

(2)清理過期請求

清理過期請求時,如果請求線程的創建時間到當前系統時間間隔大于 waitTimeMillsInSendQueue(默認 200ms,可以配置)就會清理這個請求,然后給 Producer 返回一個系統繁忙的狀態碼(code=2,remark="[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d")。

system busy

這個異常在 NettyRemotingAbstract#processRequestCommand 方法。

拒絕請求

如果 NettyRequestProcessor 拒絕了請求,就會給 Producer 返回一個系統繁忙的狀態碼(code=2,remark="[REJECTREQUEST]system busy, start flow control for a while")。那什么情況下請求會被拒絕呢?看下面這段代碼:

//SendMessageProcessor類
public boolean rejectRequest() {
return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}

從代碼中可以看到,請求被拒絕的情況有兩種可能,一個是 Page Cache 繁忙,另一個是 TransientStorePoolDeficient。

跟蹤 isTransientStorePoolDeficient 方法,發現判斷依據是在開啟 transientStorePoolEnable 配置的情況下,是否還有可用的 ByteBuffer。

注意:在開啟 transientStorePoolEnable 的情況下,寫入消息時會先寫入堆外內存(DirectByteBuffer),然后刷入 Page Cache,最后刷入磁盤。而讀取消息是從 Page Cache,這樣可以實現讀寫分離,避免讀寫都在 Page Cache 帶來的問題。如下圖:

線程池拒絕

Broker 收到請求后,會把處理邏輯封裝成到 Runnable 中,由線程池來提交執行,如果線程池滿了就會拒絕請求(這里線程池中隊列的大小默認是 10000,可以通過參數 sendThreadPoolQueueCapacity 進行配置),線程池拒絕后會拋出異常 RejectedExecutionException,程序捕獲到異常后,會判斷是不是單向請求(OnewayRPC),如果不是,就會給 Producer 返回一個系統繁忙的狀態碼(code=2,remark="[OVERLOAD]system busy, start flow control for a while")。

判斷 OnewayRPC 的代碼如下,flag = 2 或者 3 時是單向請求:

public boolean isOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
return (this.flag & bits) == bits;
}

(3) 消息重試

Broker 發生流量控制的情況下,返回給 Producer 系統繁忙的狀態碼(code=2),Producer 收到這個狀態碼是不會進行重試的。下面是會進行重試的響應碼:

//DefaultMQProducer類
private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(
ResponseCode.TOPIC_NOT_EXIST,
ResponseCode.SERVICE_NOT_AVAILABLE,
ResponseCode.SYSTEM_ERROR,
ResponseCode.NO_PERMISSION,
ResponseCode.NO_BUYER_ID,
ResponseCode.NOT_IN_CURRENT_UNIT
));

二、 Consumer 流控

DefaultMQPushConsumerImpl 類中有 Consumer 流控的邏輯 。

1、 緩存消息數量超過閾值

ProcessQueue 保存的消息數量超過閾值(默認 1000,可以配置),源碼如下:

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

2、緩存消息大小超過閾值

ProcessQueue 保存的消息大小超過閾值(默認 100M,可以配置),源碼如下:

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}

3、 緩存消息跨度超過閾值

對于非順序消費的場景,ProcessQueue 中保存的最后一條和第一條消息偏移量之差超過閾值(默認 2000,可以配置)。源代碼如下:

if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
}

4、獲取鎖失敗

對于順序消費的情況,ProcessQueue 加鎖失敗,也會延遲拉取,這個延遲時間默認是 3s,可以配置。

三、總結

本文介紹了 RocketMQ 發生流量控制的 8 個場景,其中 Broker 4 個場景,Consumer 4 個場景。Broker 的流量控制,本質是對 Producer 的流量控制,最好的解決方法就是給 Broker 擴容,增加 Broker 寫入能力。而對于 Consumer 端的流量控制,需要解決 Consumer 端消費慢的問題,比如有第三方接口響應慢或者有慢 SQL。

在使用的時候,根據打印的日志可以分析具體是哪種情況的流量控制,并采用相應的措施。

責任編輯:姜華 來源: 君哥聊技術
相關推薦

2023-08-07 09:12:51

權限SpringSecurity

2025-02-10 10:38:24

2022-05-06 17:12:35

區塊鏈元宇宙

2010-02-03 23:04:31

流量控制P2P華夏創新

2022-05-26 00:33:29

權限TienChin項目

2022-05-02 16:18:22

RocketMQBrokertopic

2023-10-08 12:14:42

Sentinel流量控制

2018-04-09 12:44:45

Docker使用場景開發

2015-01-06 09:48:34

Docker多租戶docker應用

2024-05-13 18:33:08

SQL日期函數

2013-07-22 14:25:29

iOS開發ASIHTTPRequ

2011-06-23 09:09:37

流量控制

2019-10-18 15:16:10

Redis數據庫并發

2021-03-09 07:38:15

Percona Xtr流量控制運維

2010-06-04 10:49:58

Linux流量控制

2010-06-17 17:00:07

Linux流量控制

2021-11-19 10:25:23

MySQL數據庫架構

2021-08-12 10:05:06

MySQL數據庫MySQL

2022-03-02 11:39:53

物聯網科技

2016-09-09 13:25:01

Linux
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久久精品成人 | 国产欧美视频一区 | 精品9999| 国产精品一区久久久 | 成人黄色a | 午夜精品 | 九九热re | 亚洲视频欧美视频 | 久久成人一区 | 欧美一级二级在线观看 | 中文字幕在线观看 | 国产高清精品一区二区三区 | 天久久| 久久久久久成人 | 欧美性猛交一区二区三区精品 | www日本高清 | 欧美精品在线播放 | .国产精品成人自产拍在线观看6 | 国产一级成人 | 日本激情一区二区 | 欧美精品在线一区二区三区 | 日本亚洲一区二区 | 欧美日韩一区二区三区在线观看 | www.欧美.com| 国内自拍偷拍 | 成人免费网站在线 | 高清欧美性猛交 | 免费视频一区二区 | 精品国产一区二区国模嫣然 | 国产精品99久久久久久宅男 | 日韩中文在线视频 | 国产午夜精品一区二区三区 | 91久操网| 成人av观看 | 国产精品区二区三区日本 | 欧美aⅴ在线观看 | 亚洲国产精品视频 | 亚洲福利在线视频 | 亚洲黄色高清视频 | 欧美区日韩区 | 成人在线看片 |