聊聊 Kafka: Consumer 源碼解析之 Rebalance 機(jī)制
一、前言
我們上一篇分析了 Consumer 如何加入 Consumer Group,其實(shí)上一篇是一個(gè)很宏觀的東西,主要講 ConsumerCoordinator 怎么與 GroupCoordinator 通信。等等,老周,ConsumerCoordinator 和 GroupCoordinator 是個(gè)啥玩意?這兩個(gè)組件分別是 Consumer、Kafka Broker 的協(xié)調(diào)器,說(shuō)白了就是我們?cè)O(shè)計(jì)模式中的門面模式,具體的內(nèi)容可以看上一篇回顧下。今天這一篇主要講上一篇 Consumer 如何加入 Consumer Group 中的 Rebalance 機(jī)制,其實(shí)上一篇講了大概了,這一篇更深入的來(lái)說(shuō)一說(shuō) Rebalance 機(jī)制的具體細(xì)節(jié)。
如果你是一個(gè)有一定經(jīng)驗(yàn)的程序員,Rebalance 機(jī)制我覺(jué)得可以作為一道面試題來(lái)考察,而且還是有一定難度的。但是也不需要妄自菲薄,跟著老周的這篇文章下來(lái),相信你一定可以拿下它的。
但有些讀者確實(shí)覺(jué)得還是有一定難度,別著急,先看下下面 Kafka 的拓?fù)浣Y(jié)構(gòu),這個(gè)結(jié)構(gòu)很清晰了吧,如果你對(duì) Kafka 的拓?fù)浣Y(jié)構(gòu)還不了解,那我建議你先別往下看了,先把 Kafka 的拓?fù)浣Y(jié)構(gòu)搞清楚,或者先看老周前面的幾篇文章再來(lái)繼續(xù)閱讀,我覺(jué)得效果會(huì)更好。
這一篇主要從以下幾點(diǎn)來(lái)聊一聊 Rebalance 機(jī)制:
- 什么是 Rebalance 機(jī)制?
- 觸發(fā) Rebalance 機(jī)制的時(shí)機(jī)
- Group 狀態(tài)變更
- 舊版消費(fèi)者客戶端的問(wèn)題
- Rebalance 機(jī)制的原理
- Broker 端重平衡場(chǎng)景
二、什么是 Rebalance 機(jī)制?
Rebalance 本質(zhì)上是一種協(xié)議,規(guī)定了一個(gè) Consumer Group 下的所有 Consumer 如何達(dá)成一致,來(lái)分配訂閱 Topic 的每個(gè)分區(qū)。
當(dāng)集群中有新成員加入,或者某些主題增加了分區(qū)之后,消費(fèi)者是怎么進(jìn)行重新分配消費(fèi)的?這里就涉及到重平衡(Rebalance)的概念,下面我就給大家講解一下什么是 Kafka 重平衡機(jī)制。
從圖中可以找到消費(fèi)組模型的幾個(gè)概念:
- 同一個(gè)消費(fèi)組,一個(gè)分區(qū)只能被一個(gè)消費(fèi)者訂閱消費(fèi),但一個(gè)消費(fèi)者可訂閱多個(gè)分區(qū),也就是說(shuō)每條消息只會(huì)被同一個(gè)消費(fèi)組的某一個(gè)消費(fèi)者消費(fèi),確保不會(huì)被重復(fù)消費(fèi);
- 一個(gè)分區(qū)可被不同消費(fèi)組訂閱,這里有種特殊情況,假如每個(gè)消費(fèi)組只有一個(gè)消費(fèi)者,這樣分區(qū)就會(huì)廣播到所有消費(fèi)者上,實(shí)現(xiàn)廣播模式消費(fèi)。
要想實(shí)現(xiàn)以上消費(fèi)組模型,那么就要實(shí)現(xiàn)當(dāng)外部環(huán)境變化時(shí),比如主題新增了分區(qū),消費(fèi)組有新成員加入等情況,實(shí)現(xiàn)動(dòng)態(tài)調(diào)整以維持以上模型,那么這個(gè)工作就會(huì)交給 Kafka 重平衡(Rebalance)機(jī)制去處理。
從圖中可看出,Kafka 重平衡是外部觸發(fā)導(dǎo)致的,下面來(lái)看下觸發(fā) Kafka 重平衡的時(shí)機(jī)有哪些。
三、觸發(fā) Rebalance 機(jī)制的時(shí)機(jī)
- 有新的 Consumer 加入 Consumer Group
- 有 Consumer 宕機(jī)下線。Consumer 并不一定需要真正下線,例如遇到長(zhǎng)時(shí)間的 GC、網(wǎng)絡(luò)延遲導(dǎo)致消費(fèi)者長(zhǎng)時(shí)間未向 GroupCoordinator 發(fā)送 HeartbeatRequest 時(shí),GroupCoordinator 會(huì)認(rèn)為 Consumer 下線。
- 有 Consumer 主動(dòng)退出 Consumer Group(發(fā)送 LeaveGroupRequest 請(qǐng)求)。比如客戶端調(diào)用了 unsubscribe() 方法取消對(duì)某些主題的訂閱。
- Consumer 消費(fèi)超時(shí),沒(méi)有在指定時(shí)間內(nèi)提交 offset 偏移量。
- Consumer Group 所對(duì)應(yīng)的 GroupCoordinator 節(jié)點(diǎn)發(fā)生了變更。
- Consumer Group 所訂閱的任一主題或者主題的分區(qū)數(shù)量發(fā)生變化。
四、Group 狀態(tài)變更
4.1 消費(fèi)端
在 Consumer 側(cè)的門面 ConsumerCoordinator,它繼承了 AbstractCoordinator 抽象類。在協(xié)調(diào)器 AbstractCoordinator 中的內(nèi)部類 MemberState 中我們可以看到協(xié)調(diào)器的四種狀態(tài),分別是未注冊(cè)、重分配后沒(méi)收到響應(yīng)、重分配后收到響應(yīng)但還沒(méi)有收到分配、穩(wěn)定狀態(tài)。
上述消費(fèi)端的四種狀態(tài)的轉(zhuǎn)換如下圖所示:
4.2 服務(wù)端
對(duì)于 Kafka 服務(wù)端的 GroupCoordinator 則有五種狀態(tài) Empty、PreparingRebalance、CompletingRebalance、Stable、Dead。他們的狀態(tài)轉(zhuǎn)換如下圖所示:
- 一個(gè)消費(fèi)者組最開(kāi)始是 Empty
- 重平衡開(kāi)啟后,會(huì)置于 PreparingRebalance 等待成員加入。
- 之后變更到 CompletingRebalance 等待分配方案
- 最后流轉(zhuǎn)到 Stable 完成 Rebalance
- 當(dāng)有成員變動(dòng)時(shí),消費(fèi)者組狀態(tài)從 Stable 變?yōu)?PreparingRebalance。
- 此時(shí)所有現(xiàn)存成員需要重新申請(qǐng)加入組
- 當(dāng)所有組成員都退出組后,消費(fèi)者組狀態(tài)為 Empty。
- 消費(fèi)者組處于 Empty 狀態(tài),Kafka 會(huì)定期自動(dòng)刪除過(guò)期 offset。
五、舊版消費(fèi)者客戶端的問(wèn)題
ConsumerCoordinator 與 GroupCoordinator 的概念是針對(duì) Kafka 0.9.0 版本后的消費(fèi)者客戶端而言的,我們 暫且把 Kafka 0.9.0 版本之前的消費(fèi)者客戶端稱為舊版消費(fèi)者客戶端。舊版消費(fèi)者客戶端是使用 Zookeeper 的監(jiān)聽(tīng)器(Watcher)來(lái)實(shí)現(xiàn)這些功能的。
每個(gè)消費(fèi)組
下圖與 /consumers/
- /consumers//owners 路徑下記錄了分區(qū)和消費(fèi)者的對(duì)應(yīng)關(guān)系
- /consumers//offsets 路徑下記錄了此消費(fèi)組在分區(qū)中對(duì)應(yīng)的消費(fèi)位移
每個(gè) broker、主題和分區(qū)在 Zookeeper 中也都對(duì)應(yīng)一個(gè)路徑:
- /brokers/ids/記錄了 host、port 及分配在此 broker 上的主題分區(qū)列表;
- /brokers/topics/ 記錄了每個(gè)分區(qū)的 leader 副本、ISR 集合等信息。
- /brokers/topics//partitions//state 記錄了當(dāng)前 leader 副本、leader epoch 等信息。
每個(gè)消費(fèi)者在啟動(dòng)時(shí)都會(huì)在 /consumers/
這種方式下每個(gè)消費(fèi)者對(duì) Zookeeper 的相關(guān)路徑分別進(jìn)行監(jiān)聽(tīng),當(dāng)觸發(fā)再均衡操作時(shí),一個(gè)消費(fèi)組下的所有消費(fèi)者會(huì)同時(shí)進(jìn)行再均衡操作,而消費(fèi)者之間并不知道彼此操作的結(jié)果,這樣可能導(dǎo)致 Kafka 工作在一個(gè)不正確的狀態(tài)。與此同時(shí),這種嚴(yán)重依賴于 Zookeeper 集群的做法還有兩個(gè)比較嚴(yán)重的問(wèn)題。
- 羊群效應(yīng)(Herd Effect):所謂的羊群效應(yīng)是指 Zookeeper 中一個(gè)被監(jiān)聽(tīng)的節(jié)點(diǎn)變化,大量的 Watcher 通知被發(fā)送到客戶端,導(dǎo)致在通知期間的其他操作延遲,也有可能發(fā)生類似死鎖的情況。
- 腦裂問(wèn)題(Split Brain):消費(fèi)者進(jìn)行再均衡操作時(shí)每個(gè)消費(fèi)者都與 Zookeeper 進(jìn)行通信以判斷消費(fèi)者或 broker 變化的情況,由于 Zookeeper 本身的特性,可能導(dǎo)致在同一時(shí)刻各個(gè)消費(fèi)者獲取的狀態(tài)不一致,這樣會(huì)導(dǎo)致異常問(wèn)題發(fā)生。
六、Rebalance 機(jī)制的原理
Kafka 0.9.0 版本后的消費(fèi)者客戶端對(duì)此進(jìn)行了重新設(shè)計(jì),將全部消費(fèi)組分成多個(gè)子集,每個(gè)消費(fèi)組的子集在服務(wù)端對(duì)應(yīng)一個(gè) GroupCoordinator 對(duì)其進(jìn)行管理,GroupCoordinator 是 Kafka 服務(wù)端中用于管理消費(fèi)組的組件。而消費(fèi)者客戶端中的 ConsumerCoordinator 組件負(fù)責(zé)與 GroupCoordinator 進(jìn)行交互。
- Rebalance 完整流程需要 Consumer & Coordinator 共同完成
- Consumer 端 Rebalance 步驟
- 加入組:對(duì)應(yīng) JoinGroup 請(qǐng)求
- 等待 Leader Consumer 分配方案:對(duì)應(yīng) SyncGroup 請(qǐng)求
- 當(dāng)組內(nèi)成員加入組時(shí),Consumer 向協(xié)調(diào)者發(fā)送 JoinGroup 請(qǐng)求。
- 每個(gè) Consumer 會(huì)上報(bào)自己訂閱的 topic
- Coordinator 收集到所有 JoinGroup 請(qǐng)求后,從這些成員中選擇一個(gè)擔(dān)任消費(fèi)者組的 Leader
- 通常第一個(gè)發(fā)送 JoinGroup 請(qǐng)求的自動(dòng)成為 Leader
- Leader Consumer 的任務(wù)是收集所有成員的 topic,根據(jù)信息制定具體的 partition consumer 分配方案。
- 選出 Leader 后,協(xié)調(diào)者把所有 topic 信息封裝到 JoinGroup Response 中,發(fā)送給 Leader。
- Leader Consumer 做出統(tǒng)一分配方案,進(jìn)入到 SyncGroup 請(qǐng)求。
- Leader Consumer 向協(xié)調(diào)者發(fā)送 SyncGroup,將分配方案發(fā)給協(xié)調(diào)者。
- 其他成員也會(huì)發(fā)出 SyncGroup 請(qǐng)求
- 協(xié)調(diào)者以 SyncGroup Response 的方式將方案下發(fā)給所有成員
- 所有成員成功接收到分配方案,消費(fèi)者組進(jìn)入 Stable 狀態(tài),開(kāi)始正常消費(fèi)。
具體的源碼分析,可以看我上一篇分析的 Consumer 如何加入 Consumer Group 文章。
七、Broker 端重平衡場(chǎng)景
7.1 新成員加入
消費(fèi)者組處于 Stable 之后有新成員加入
7.2 組成員主動(dòng)離開(kāi)
- 主動(dòng)離開(kāi):Consumer Instance 通過(guò)調(diào)用 close() 方法通知協(xié)調(diào)者退出
- 該場(chǎng)景涉及第三個(gè)請(qǐng)求:LeaveGroup 請(qǐng)求
7.3 組成員崩潰離開(kāi)
- 協(xié)調(diào)者需要等待一段時(shí)間才能感知
- 這個(gè)時(shí)間段由 Consumer 端參數(shù) sessionn.timeout.ms 控制
- Kafka 不會(huì)超過(guò)上述參數(shù)時(shí)間感知崩潰
- 處理流程相同
7.4 Rebalance 時(shí)組成員提交 offset
- Rebalance 開(kāi)啟時(shí),協(xié)調(diào)者會(huì)給予成員一段緩沖時(shí)間,要求每個(gè)成員在這段時(shí)間內(nèi)快速上報(bào)自己的 offset。
- 再開(kāi)啟正常的 JoinGroup/SyncGroup 請(qǐng)求
好了,Rebalance 機(jī)制就先說(shuō)這么多了,下一篇會(huì)來(lái)聊一聊如何避免重平衡。
本文轉(zhuǎn)載自微信公眾號(hào)「老周聊架構(gòu)」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系老周聊架構(gòu)公眾號(hào)。