從RocketMQ的Broker源碼層面驗證一下這兩個點
本篇博客會從源碼層面,驗證在RocketMQ基礎(chǔ)概念剖析,并分析一下Producer的底層源碼中提到的結(jié)論,分別是:
- Broker在啟動時,會將自己注冊到所有的NameServer上
- Broker在啟動之后,會每隔30S向NameServer發(fā)送心跳
之前的文章中,我們知道了RocketMQ中的一些核心概念,例如Broker、NameServer、Topic和Tag等等。Producer從啟動到發(fā)送消息的整個過程,從源碼級別分析了Producer在發(fā)送消息到Broker的時候,是如何拿到Broker的數(shù)據(jù)的,如何從多個MessageQueue中選擇對應(yīng)的Queue發(fā)送消息。
但是由于篇幅原因,文章開頭提到的兩個已知結(jié)論在上篇博客里并沒沒有對其進(jìn)行驗證,這次就從源碼層面來驗證一下。
一開頭就看到Broker主從架構(gòu)相關(guān)的源碼
在上篇博客中提到過,Broker為了保證自身的高可用,會采取一主一從的架構(gòu)。即使Master Broker因為意外原因掛了,Slave Broker上還有一份完整的數(shù)據(jù),Broker可以繼續(xù)提供服務(wù)。
isEnableDLegerCommitLog中提到的DLeger可以先不管,我們目前只需要知道其默認(rèn)返回的結(jié)果是false。所以Broker首次啟動的時候,就會執(zhí)行被If包裹住的邏輯。
RocketMQ本身是有主從架構(gòu)的,但是功能不夠完善,如果Master Broker出現(xiàn)了故障,需要人工的將Slave Broker切換成Master。
就有點類似于手動的將一臺Redis設(shè)置成另一臺Redis的Slave節(jié)點,如果此時Redis的Master掛了,還需要手動的進(jìn)行切換一樣。為了解決這個問題,Redis搞出了Sentinel,可以在發(fā)生故障的時候自動的實現(xiàn)故障轉(zhuǎn)移。所以RocketMQ在4.5版本之后推出的Dleger差不多也是這么個東西,除此之外,Dleger還可以實現(xiàn)多副本。
不使用Dleger時,主從數(shù)據(jù)如何進(jìn)行同步
先給出結(jié)論,在RocketMQ的主從架構(gòu)下,主從同步采取的是Slave主動拉取的方式。
如果當(dāng)前執(zhí)行注冊的Broker角色是Slave,那就會使用ScheduledExecutorService啟動一個周期性的定時任務(wù),每隔10秒就會去Master同步一次,同步的數(shù)據(jù)包括Topic的相關(guān)配置、Consumer的消費偏移量、延遲消息的Offset、訂閱組的相關(guān)數(shù)據(jù)和配置。
ScheduledExecutorService的作用和原理下面會做簡單介紹。
首次啟動時強制進(jìn)行Broker注冊
因為是首次啟動,所以參數(shù)forceRegister被直接設(shè)置成了true。
使用ScheduledExecutorService啟動定時任務(wù)
通過入口進(jìn)來之后,Broker會啟動一個定時任務(wù),周期性的去注冊。ScheduledExecutorService底層就是一個newSingleThreadScheduledExecutor,只有一個線程的線程池,其關(guān)鍵的參數(shù)corePoolSize值為1,然后按照指定的頻率周期性的執(zhí)行某個任務(wù)。
ScheduledExecutorService主要的功能有兩個,分別是:
- ScheduledExecutorService 以固定的頻率執(zhí)行任務(wù)
- ScheduledExecutorService 執(zhí)行完之后,間隔制定的時間后再執(zhí)行下一個任務(wù)
使用scheduleAtFixedRate實現(xiàn)心跳機制
此處我們使用的是scheduleAtFixedRate,如下圖。
至于執(zhí)行的頻率,我們能夠配置的范圍最大不能超過一分鐘,也就是說這個范圍是在10-60秒之間,默認(rèn)30秒執(zhí)行一次,這也就驗證了每30秒,Broker會向NameServer發(fā)送一次心跳。
獲取執(zhí)行頻率的這個判斷有點意思,甚至看起來有那么一絲絲簡潔,但是理解其具體可配置的時間范圍可能需要花點時間。在實際業(yè)務(wù)性代碼中,個人建議還是不要這么寫,業(yè)務(wù)中代碼的可讀性和可維護(hù)性我認(rèn)為是需要放在首位的。
值得注意的是,此處啟動心跳,給了一個10秒的延遲,因為在不使用Dleger的情況下,在之前的邏輯中已經(jīng)執(zhí)行過一次注冊了。如果不做延遲,那么幾乎是同一個時間就會有兩次注冊操作,而這明顯是不符合預(yù)期的;同時forceRegister也從true變成了通過函數(shù)isForceRegister來進(jìn)行獲取。
調(diào)用registerBrokerAll注冊
定時任務(wù)注冊完成之后,之后的每次觸發(fā)都會執(zhí)行registerBrokerAll方法來執(zhí)行注冊,你可能會有疑問,我當(dāng)前不就是一個Broker嗎,怎么名字有個后綴All?那是因為NameServer會有多個,Broker啟動的時候會將自己注冊到所有的NameServer上去。當(dāng)然,口說無憑,我們繼續(xù)看下去。
繼續(xù)往里走,如果當(dāng)前滿足注冊條件,則會實際的執(zhí)行注冊操作。那具體滿足什么條件呢?由變量forceRegister和一個needRegister方法來決定,forceRegister默認(rèn)是true,所以當(dāng)?shù)谝粓?zhí)行這個邏輯的時候是一定會執(zhí)行注冊操作的。
通過對比數(shù)據(jù)版本判斷當(dāng)前Broker是否需要進(jìn)行注冊
感興趣的話,可以繼續(xù)跟隨文章了解一下,needRegister是根據(jù)什么來判斷是否需要注冊的。
首先,Broker一旦注冊到了NameServer之后,由于Producer不停的在寫入數(shù)據(jù),Consumer也在不停的消費數(shù)據(jù),Broker也可能因為故障導(dǎo)致MessageQueue等關(guān)鍵路由信息發(fā)生變動,NameServer中的數(shù)據(jù)和Broker中實際的數(shù)據(jù)就會不一致,如果不及時更新,Producer拉取到的路由數(shù)據(jù)就可能有誤。
所以每次定時任務(wù)觸發(fā)的時候會去對比NameServer和Broker的數(shù)據(jù),如果發(fā)現(xiàn)數(shù)據(jù)版本不一致,Broker會重新進(jìn)行注冊,將最新的數(shù)據(jù)更新到NameServer。說直白一點,就是做一個數(shù)據(jù)定時更新。以下紅框中的代碼就是數(shù)據(jù)對比的核心代碼。
當(dāng)Broker和所有的NameServer節(jié)點一一完成數(shù)據(jù)對比之后,就會進(jìn)行結(jié)果判定,但凡有一個節(jié)點數(shù)據(jù)不一致,都需要進(jìn)行重新注冊,把最新的數(shù)據(jù)更新到NameServer,核心判斷邏輯同樣用紅框標(biāo)出。
至此,其實我們就已經(jīng)完成了 Broker在啟動的時候會向所有NameServer進(jìn)行注冊 的驗證。但是由于后續(xù)仍然有值得關(guān)注發(fā)光點,我們繼續(xù)后續(xù)的源碼閱讀。
使用CountDownLatch獲取所有注冊異步任務(wù)的返回結(jié)果
除此之外,還值得注意的是在needRegister中,對于和多個NameServer的交互,RocketMQ是通過線程池異步實現(xiàn)的,同時使用了CountDownLatch來等待所有的請求結(jié)束,返回結(jié)果給主線程。
既然聊到了CountDownLatch,就順帶提一下。假設(shè)我們有5個互不依賴的計算任務(wù),如果快速的計算出結(jié)果并返回呢?那當(dāng)然是5個任務(wù)并發(fā)執(zhí)行,這就需要通過新開線程實現(xiàn),結(jié)果就無法一起返回了。
而CountDownLatch可以讓主線程等待,等待這5個計算任務(wù)全部結(jié)束之后,喚醒主線程再繼續(xù)后面的邏輯。這就是CountDownLatch的作用,如果平時只是單純的CRUD功能的話,可能連CountDownLatch是什么都做不知道,這也是為什么大廠面試會問這些問題,因為在大廠的復(fù)雜業(yè)務(wù)背景下,你必須要會使用它們。
指定需要注冊之后,接下來就是核心的注冊方法了,核心邏輯由registerBrokerAll來實現(xiàn)。Broker同樣會去每一個NameServer節(jié)點上注冊自己,并且為了提前執(zhí)行的效率,同樣開線程采用了異步的方式。在獲取所有結(jié)果時,同樣的使用了CountDownLatch。
使用CopyOnWriteArrayList存儲注冊請求的返回
除此之外,用于保存注冊結(jié)果的列表,使用的是CopyOnWriteArrayList,被面試虐過的同學(xué)應(yīng)該熟悉。我們知道此處開啟了多線程去不同的NameServer注冊,寫入注冊結(jié)果的時候,多線程對同一個列表進(jìn)行寫入,會產(chǎn)生線程安全的問題。
而我們知道ArrayList是非線程安全的,這也是為什么此處要使用CopyOnWriteArrayList來保存注冊結(jié)果。為什么CopyOnWriteArrayList能夠保證線程安全?
這歸功于COW(Copy On Write),讀請求時共用同一個List,涉及到寫請求時,會復(fù)制出一個List,并在寫入數(shù)據(jù)的時候加入獨占鎖。比起直接對所有操作加鎖,讀寫鎖的形式分離了讀、寫請求,使其互不影響,只對寫請求加鎖,降低了加鎖的消耗,提升了整體操作的并發(fā)。
上面并發(fā)執(zhí)行的注冊操作,具體做了哪些事情呢?先看代碼。
上面就是單個注冊的所有邏輯,可以看到在構(gòu)建完請求之后,有一個oneway的判斷。
oneway值為false,表示單向通信,Broker不關(guān)心NameServer的返回,也不會觸發(fā)任何回調(diào)函數(shù)。接下來Broker就會把已經(jīng)寫進(jìn)request body的所有數(shù)據(jù)發(fā)送給NameServer。請求數(shù)據(jù)統(tǒng)一由一個叫TopicConfigSerializeWrapper的Wrapper給包裹住。
其可以看為兩部分:
- 存在該Broker節(jié)點上的所有Topic的數(shù)據(jù)
- 數(shù)據(jù)版本
然后帶著這些數(shù)據(jù),Broker會同步的調(diào)用invokeSync發(fā)送請求給NameServe,并且在執(zhí)行之后觸發(fā)實現(xiàn)特定功能的回調(diào)函數(shù)。
EOF
至此,我們完成了對開篇所提結(jié)論的驗證,同時也發(fā)現(xiàn)了RocketMQ的主從架構(gòu)、Master和Slave同步數(shù)據(jù)的方式、心跳機制的實現(xiàn)等等,也基本從源碼中看完了Broker啟動的所有流程。看這些老哥寫的源碼還是挺有意思的,之后有時間隨緣再看看NameServer端相關(guān)的源碼吧。