成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

品 RocketMQ 源碼,學(xué)習(xí)并發(fā)編程三大神器

開(kāi)發(fā) 開(kāi)發(fā)工具
異步是更細(xì)粒度的使用系統(tǒng)資源的一種方式,在異步消息處理的過(guò)程中,通過(guò) CompletableFuture 這個(gè)神器,各個(gè)線程各司其職,優(yōu)雅且高效的提升了 RocketMQ 的性能。

筆者是 RocketMQ 的忠實(shí)粉絲,在閱讀源碼的過(guò)程中,學(xué)習(xí)到了很多編程技巧。

這篇文章,筆者結(jié)合 RocketMQ 源碼,分享并發(fā)編程三大神器的相關(guān)知識(shí)點(diǎn)。

圖片

1 CountDownLatch 實(shí)現(xiàn)網(wǎng)絡(luò)同步請(qǐng)求

CountDownLatch 是一個(gè)同步工具類,用來(lái)協(xié)調(diào)多個(gè)線程之間的同步,它能夠使一個(gè)線程在等待另外一些線程完成各自工作之后,再繼續(xù)執(zhí)行。

下圖是 CountDownLatch 的核心方法:

圖片

我們可以認(rèn)為它內(nèi)置一個(gè)計(jì)數(shù)器,構(gòu)造函數(shù)初始化計(jì)數(shù)值。每當(dāng)線程執(zhí)行 countDown 方法,計(jì)數(shù)器的值就會(huì)減一,當(dāng)計(jì)數(shù)器的值為 0 時(shí),表示所有的任務(wù)都執(zhí)行完成,然后在 CountDownLatch 上等待的線程就可以恢復(fù)執(zhí)行接下來(lái)的任務(wù)。

舉例,數(shù)據(jù)庫(kù)有100萬(wàn)條數(shù)據(jù)需要處理,單線程執(zhí)行比較慢,我們可以將任務(wù)分為5個(gè)批次,線程池按照每個(gè)批次執(zhí)行,當(dāng)5個(gè)批次整體執(zhí)行完成后,打印出任務(wù)執(zhí)行的時(shí)間 。

 long start = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(10);
int batchSize = 5;
CountDownLatch countDownLatch = new CountDownLatch(batchSize);
for (int i = 0; i < batchSize; i++) {
final int batchNumber = i;
executorService.execute(new Runnable() {
@Override
public void run(){
try {
doSomething(batchNumber);
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
});
}
countDownLatch.await();
System.out.println("任務(wù)執(zhí)行耗時(shí):" + (System.currentTimeMillis() - start) + "毫秒");

溫習(xí)完 CountDownLatch 的知識(shí)點(diǎn),回到 RocketMQ 源碼。

筆者在沒(méi)有接觸網(wǎng)絡(luò)編程之前,一直很疑惑,網(wǎng)絡(luò)同步請(qǐng)求是如何實(shí)現(xiàn)的?

同步請(qǐng)求指:客戶端線程發(fā)起調(diào)用后,需要在指定的超時(shí)時(shí)間內(nèi),等到響應(yīng)結(jié)果,才能完成本次調(diào)用。如果超時(shí)時(shí)間內(nèi)沒(méi)有得到結(jié)果,那么會(huì)拋出超時(shí)異常。

RocketMQ 的同步發(fā)送消息接口見(jiàn)下圖:

圖片

追蹤源碼,真正發(fā)送請(qǐng)求的方法是通訊模塊的同步請(qǐng)求方法 invokeSyncImpl 。

圖片

整體流程:

發(fā)送消息線程 Netty channel 對(duì)象調(diào)用 writeAndFlush 方法后 ,它的本質(zhì)是通過(guò) Netty 的讀寫(xiě)線程將數(shù)據(jù)包發(fā)送到內(nèi)核 , 這個(gè)過(guò)程本身就是異步的;

ResponseFuture 類中內(nèi)置一個(gè) CountDownLatch 對(duì)象 ,responseFuture 對(duì)象調(diào)用 waitRepsone 方法,發(fā)送消息線程會(huì)阻塞 ;

圖片

客戶端收到響應(yīng)命令后, 執(zhí)行 processResponseCommand 方法,核心邏輯是執(zhí)行 ResponseFuture 的 putResponse 方法。

圖片

該方法的本質(zhì)就是填充響應(yīng)對(duì)象,并調(diào)用 countDownLatch 的 countDown 方法 , 這樣發(fā)送消息線程就不再阻塞。

CountDownLatch 實(shí)現(xiàn)網(wǎng)絡(luò)同步請(qǐng)求是非常實(shí)用的技巧,在很多開(kāi)源中間件里,比如 Metaq ,Xmemcached 都有類似的實(shí)現(xiàn)。

2 ReadWriteLock 名字服務(wù)路由管理

讀寫(xiě)鎖是一把鎖分為兩部分:讀鎖和寫(xiě)鎖,其中讀鎖允許多個(gè)線程同時(shí)獲得,而寫(xiě)鎖則是互斥鎖。

它的規(guī)則是:讀讀不互斥,讀寫(xiě)互斥,寫(xiě)寫(xiě)互斥,適用于讀多寫(xiě)少的業(yè)務(wù)場(chǎng)景。

我們一般都使用 ReentrantReadWriteLock ,該類實(shí)現(xiàn)了 ReadWriteLock 。ReadWriteLock 接口也很簡(jiǎn)單,其內(nèi)部主要提供了兩個(gè)方法,分別返回讀鎖和寫(xiě)鎖 。

 public interface ReadWriteLock {
//獲取讀鎖
Lock readLock();
//獲取寫(xiě)鎖
Lock writeLock();
}

讀寫(xiě)鎖的使用方式如下所示:

創(chuàng)建 ReentrantReadWriteLock 對(duì)象 , 當(dāng)使用 ReadWriteLock 的時(shí)候,并不是直接使用,而是獲得其內(nèi)部的讀鎖和寫(xiě)鎖,然后分別調(diào)用 lock / unlock 方法 ;

private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

讀取共享數(shù)據(jù) ;

Lock readLock = readWriteLock.readLock();
readLock.lock();
try {
// TODO 查詢共享數(shù)據(jù)
} finally {
readLock.unlock();
}

寫(xiě)入共享數(shù)據(jù);

Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
// TODO 修改共享數(shù)據(jù)
} finally {
writeLock.unlock();
}

RocketMQ架構(gòu)上主要分為四部分,如下圖所示 :

圖片

Producer :消息發(fā)布的角色,Producer 通過(guò) MQ 的負(fù)載均衡模塊選擇相應(yīng)的 Broker 集群隊(duì)列進(jìn)行消息投遞,投遞的過(guò)程支持快速失敗并且低延遲。

Consumer :消息消費(fèi)的角色,支持以 push 推,pull 拉兩種模式對(duì)消息進(jìn)行消費(fèi)。

BrokerServer :Broker主要負(fù)責(zé)消息的存儲(chǔ)、投遞和查詢以及服務(wù)高可用保證。

NameServer :名字服務(wù)是一個(gè)非常簡(jiǎn)單的 Topic 路由注冊(cè)中心,其角色類似 Dubbo 中的zookeeper,支持Broker的動(dòng)態(tài)注冊(cè)與發(fā)現(xiàn)。

NameServer 是一個(gè)幾乎無(wú)狀態(tài)節(jié)點(diǎn),可集群部署,節(jié)點(diǎn)之間無(wú)任何信息同步。Broker 啟動(dòng)之后會(huì)向所有 NameServer 定期(每 30s)發(fā)送心跳包(路由信息),NameServer 會(huì)定期掃描 Broker 存活列表,如果超過(guò) 120s 沒(méi)有心跳則移除此 Broker 相關(guān)信息,代表下線。

那么 NameServer 如何保存路由信息呢?

圖片

路由信息通過(guò)幾個(gè) HashMap 來(lái)保存,當(dāng) Broker 向 Nameserver 發(fā)送心跳包(路由信息),Nameserver 需要對(duì) HashMap 進(jìn)行數(shù)據(jù)更新,但我們都知道 HashMap 并不是線程安全的,高并發(fā)場(chǎng)景下,容易出現(xiàn) CPU 100% 問(wèn)題,所以更新 HashMap 時(shí)需要加鎖,RocketMQ 使用了  JDK 的讀寫(xiě)鎖 ReentrantReadWriteLock 。

更新路由信息,操作寫(xiě)鎖

圖片

查詢主題信息,操作讀鎖

圖片

讀寫(xiě)鎖適用于讀多寫(xiě)少的場(chǎng)景,比如名字服務(wù),配置服務(wù)等。

3 CompletableFuture 異步消息處理

RocketMQ 主從架構(gòu)中,主節(jié)點(diǎn)與從節(jié)點(diǎn)之間數(shù)據(jù)同步/復(fù)制的方式有同步雙寫(xiě)和異步復(fù)制兩種模式。

異步復(fù)制是指消息在主節(jié)點(diǎn)落盤(pán)成功后就告訴客戶端消息發(fā)送成功,無(wú)需等待消息從主節(jié)點(diǎn)復(fù)制到從節(jié)點(diǎn),消息的復(fù)制由其他線程完成。

同步雙寫(xiě)是指主節(jié)點(diǎn)將消息成功落盤(pán)后,需要等待從節(jié)點(diǎn)復(fù)制成功,再告訴客戶端消息發(fā)送成功。

同步雙寫(xiě)模式是阻塞的,筆者按照 RocketMQ 4.6.1 源碼,整理出主節(jié)點(diǎn)處理一個(gè)發(fā)送消息的請(qǐng)求的時(shí)序圖。

圖片

整體流程:

生產(chǎn)者將消息發(fā)送到 Broker , Broker 接收到消息后,發(fā)送消息處理器 SendMessageProcessor 的執(zhí)行線程池SendMessageExecutor 線程池來(lái)處理發(fā)送消息命令;

執(zhí)行 ComitLog 的 putMessage 方法;

ComitLog 內(nèi)部先執(zhí)行 appendMessage 方法;

然后提交一個(gè) GroupCommitRequest 到同步復(fù)制服務(wù) HAService  ,等待 HAService 通知 GroupCommitRequest 完成;

返回寫(xiě)入結(jié)果并響應(yīng)客戶端 。

我們可以看到:發(fā)送消息的執(zhí)行線程需要等待消息復(fù)制從節(jié)點(diǎn) , 并將消息返回給生產(chǎn)者才能開(kāi)始處理下一個(gè)消息。

RocketMQ 4.6.1 源碼中,執(zhí)行線程池的線程數(shù)量是 1 ,假如線程處理主從同步速度慢了,系統(tǒng)在這一瞬間無(wú)法處理新的發(fā)送消息請(qǐng)求,造成 CPU 資源無(wú)法被充分利用 , 同時(shí)系統(tǒng)的吞吐量也會(huì)降低。

那么優(yōu)化同步雙寫(xiě)呢 ?

從 RocketMQ 4.7 開(kāi)始,RocketMQ 引入了 CompletableFuture 實(shí)現(xiàn)了異步消息處理 。

發(fā)送消息的執(zhí)行線程不再等待消息復(fù)制到從節(jié)點(diǎn)后再處理新的請(qǐng)求,而是提前生成 CompletableFuture 并返回 ;

HAService 中的線程在復(fù)制成功后,調(diào)用 CompletableFuture 的 complete 方法,通知 remoting 模塊響應(yīng)客戶端(線程池:PutMessageExecutor ) 。

我們分析下 RocketMQ 4.9.4 核心代碼:

Broker 接收到消息后,發(fā)送消息處理器 SendMessageProcessor 的執(zhí)行線程池SendMessageExecutor 線程池來(lái)處理發(fā)送消息命令;

調(diào)用 SendMessageProcessor 的 asyncProcessRequest 方法;

圖片

調(diào)用 Commitlog 的 aysncPutMessage 方法寫(xiě)入消息 ;

圖片

這段代碼中,當(dāng) commitLog 執(zhí)行完 appendMessage 后, 需要執(zhí)行刷盤(pán)任務(wù)和同步復(fù)制兩個(gè)任務(wù)。但這兩個(gè)任務(wù)并不是同步執(zhí)行,而是異步的方式。

復(fù)制線程復(fù)制消息后,喚醒 future ;

圖片

組裝響應(yīng)命令 ,并將響應(yīng)命令返回給客戶端。

為了便于理解這一段消息發(fā)送處理過(guò)程的線程模型,筆者在 RocketMQ 源碼中做了幾處埋點(diǎn),修改 Logback 的日志配置,發(fā)送一條普通的消息,觀察服務(wù)端日志。

圖片

從日志中,我們可以觀察到:

發(fā)送消息的執(zhí)行線程(圖中紅色)在執(zhí)行完創(chuàng)建刷盤(pán) Future 和同步復(fù)制 future 之后,并沒(méi)有等待這兩個(gè)任務(wù)執(zhí)行完成,而是在結(jié)束 asyncProcessRequest 方法后就可以處理發(fā)送消息請(qǐng)求了 ;

刷盤(pán)線程和復(fù)制線程執(zhí)行完各自的任務(wù)后,喚醒 future,然后通過(guò)刷盤(pán)線程組裝存儲(chǔ)結(jié)果,最后通過(guò) PutMessageExecutor 線程池(圖中黃色)將響應(yīng)命令返回給客戶端。

筆者一直認(rèn)為:異步是更細(xì)粒度的使用系統(tǒng)資源的一種方式,在異步消息處理的過(guò)程中,通過(guò) CompletableFuture  這個(gè)神器,各個(gè)線程各司其職,優(yōu)雅且高效的提升了 RocketMQ 的性能。

責(zé)任編輯:武曉燕 來(lái)源: 勇哥java實(shí)戰(zhàn)分享
相關(guān)推薦

2020-09-29 07:38:22

Python裝飾器框架

2016-09-13 19:21:07

CTO管理技術(shù)

2022-07-02 08:40:00

并發(fā)編程

2018-06-08 10:18:22

Python裝飾器迭代器

2023-12-06 07:36:27

前端開(kāi)發(fā)

2021-03-11 00:05:55

Java高并發(fā)編程

2021-03-18 00:14:29

JavaCyclicBarri高并發(fā)

2021-03-04 07:24:24

JavaSemaphore高并發(fā)

2021-09-13 09:28:10

PuppeteerNode 庫(kù)DevTools 協(xié)議

2021-05-27 12:10:42

前端puppeteer代碼

2021-01-31 20:51:55

PuppeteerNode核心

2022-10-17 08:07:13

Go 語(yǔ)言并發(fā)編程

2023-03-30 19:17:54

語(yǔ)言編程

2023-05-05 07:12:09

GPT產(chǎn)品主題

2017-01-05 14:01:38

linux密碼高強(qiáng)度

2024-04-29 09:06:46

線程初始化源碼

2015-07-03 10:46:26

PHP程序員工作高效

2019-07-17 10:55:40

Kubernetes工具Katacoda

2024-11-22 08:00:00

Netty開(kāi)發(fā)

2022-04-24 15:29:17

微服務(wù)go
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 亚洲嫩草 | 天天干天天爱天天 | 久久久久久国产精品 | 国产精品国产a | 欧美在线天堂 | 亚洲成人精品一区 | 日本不卡免费新一二三区 | 在线观看涩涩视频 | 国产精品久久久久久久模特 | 久久午夜精品 | 每日在线更新av | 国产999精品久久久 日本视频一区二区三区 | 夜夜爽99久久国产综合精品女不卡 | 欧美日韩高清 | 中文字幕免费视频 | 中文字幕二区 | 天堂中文资源在线 | 久草新在线 | 91亚洲国产精品 | 91成人免费观看 | 综合色久 | 欧美日韩福利视频 | 成人一区二区三区 | 国产高潮av| 国产999精品久久久久久 | 国产精品久久久久久52avav | 91麻豆产精品久久久久久 | 欧美精品1区2区 | 一区二区三区小视频 | 男人天堂99 | 亚洲欧美日韩电影 | 成人午夜网站 | 成人国产精品免费观看视频 | 韩日精品在线观看 | 色资源在线 | 欧美videosex性极品hd | 日本偷偷操| 色视频www在线播放国产人成 | 精品亚洲一区二区三区 | 欧美三区| 国产福利久久 |