萬字長文解析Kafka分區工作機制
Kafka的消息發送與消息消費與分區關聯密切,我們從這篇文章開始講點學習分區相關的知識,本篇文章將重點介紹分區內部的工作機制,即分區狀態機運轉機制。
1、Kafka分區狀態
Kafka內部分區的運轉機制具體實現為PartitionStateMachine,從這個類的注釋上來看可以得知Kafka分區的狀態共有四個,它們分別是:
- NonExistentPartition 表示分區不存在,通常是該分區從未創建過或者創建后被刪除。
- NewPartition 分區已創建,即分配完成了副本,但還未進行分區Leader選舉,即還不存在Leader分區與ISR集合,前一個有效狀態為NonExistentPartition。
- OnlinePartition 分區處于在線時的狀態,表示已經完成了分區選舉,成功選舉出Leader,此時可以進行消息發送與消息消費,前一個有效狀態為NewPartition/OfflinePartition。
- OfflinePartition分區處于離線時狀態,表示選舉出來的Leader失效了,例如Leader所在的Broker宕機,前一個有效狀態為NewPartition/OnlinePartition。
關于分區的狀態變如下所示:
2、Kafka分區狀態機
接下來本文的行為思路,將會通過源碼閱讀的方式,深入PartitionStateMachine的實現細節,從而提煉出分區變更實現要點,幫助我們更好的運維kafka。
2.1 狀態機啟動流程
狀態機的啟動流程定義在PartitionStateMachine的startup方法,該方法的調用時機:一個新的Broker通過控制器選舉成為新的Controller時會被調用。
該方法的聲明如下:
狀態機的啟動主要包括兩個步驟:
- 初始化分區的狀態
- 觸發分區狀態向OnlinePartition轉換
接下來將詳細探討實現細節。
2.1.1 分區狀態初始化
首先我們來看一下分區的初始化流程,具體代碼如下所示:
該方法的實現要點:
- 在KafkaController中使用來ControllerContext用來在內存中存儲與控制器相關的數據結構,其中Map[String, mutable.Map[Int, Seq[Int]]] partitionReplicaAssignmentUnderlying存儲了當前集群中所有的分區信息(主題名稱、分區編號,副本數情況),既然是控制器重新選舉,故需要重新初始化所有的分區。
- 然后根據 Map[TopicPartition, LeaderIsrAndControllerEpoch] partitionLeadershipInfo中存儲各個分區當前的運行時狀態,這里分成三種情況:
如果partitionLeadershipInfo中并不存在主題分區的Leaer和ISR信息,驅動狀態從NonExistentPartition轉換為NewPartition。
如果partitionLeadershipInfo中存在主題分區的leader信息,但對應的Broker已經為下線狀態,則驅動狀態從NonExistentPartition轉換為OfflinePartition。
如果partitionLeadershipInfo中存在主題分區的leader信息,但對應的Broker已經為下線狀態,則將狀態從NonExistentPartition先轉換為OfflinePartition。
值得注意的是,調用changeStateTo方法改變分區的狀態,僅僅只是在內存中更新狀態,其具體實現如圖所示:
具體的做好是將需要更新的狀態存儲到Map[TopicPartition, PartitionState] 中。
2.1.2 分區狀態運轉機制
在內存中根據當前維護的LeaderAndISR信息后將狀態存儲到本地內存后,接下來就是將分區狀態向Online狀態轉換,具體的代碼實現見PartitionStateMachine的triggerOnlinePartitionStateChange方法,代碼如下所示:
該方法的實現要點是在內存緩存中(Map[TopicPartition, PartitionState] )挑選出狀態處于OfflinePartition與NewPartition并且未被刪除的分區,驅動狀態機,調用handleStateChanges方法嘗試向OnlinePartition分區轉化。
該方法主要做如下兩件事情:
- 調用PartitionStateMachine的doHandleStateChanges的方法,驅動分區狀態機的轉換。
- 然后調用ControllerBrokerRequestBatch的sendRequestsToBrokers方法,實現元信息在其他Broker上的同步
要想清晰而全面的了解分區狀態的變更,我還給出了Kafka中所有調用handleStateChanges的調用入口,在后續深入研究Kafka相關機制時會再次一一提及,調用鏈如下圖所示:
由于篇幅的問題,分區信息在其他Broker中的狀態同步將在下一篇文章中介紹。
PartitionStateMachine的doHandleStateChanges方法在上一篇中已經詳細介紹,尷尬,在Kafka生產實踐中又出問題了 中詳細介紹過,在這里我稍微總結提煉一下:
目標狀態為NewPartition、OfflinePartition、NonExistentPartition 這三個狀態并沒有什么復雜的實現邏輯,只是更新內存中的狀態,并在state-change.log文件中將輸出狀態變更日志,只有目標狀態為OnlinePartition時才會詳細的處理邏輯。
但或許你有一個疑問,狀態變更為NewPartition,什么時候會向OnlinePartition狀態轉換呢?其實通過調用doHandleStateChanges將目標方法設置為NewPartition后,會緊接著調用triggerOnlinePartitionStateChange等方法,將狀態進一步向OnlinePartition狀態轉化。
由于在尷尬,在Kafka生產實踐中又出問題了 這篇文章中詳細介紹了OfflinePartition向OnlinePartition的轉化流程,故本篇文章就將重點放在了NewPartition狀態向OnlinePartition的轉化處理邏輯,其實也就是分區創建的流程,這塊的代碼入口如下所示:
由于PartitionStateMachine的initializeLeaderAndIsrForPartitions方法比較長,接下來將分步講解。
2.1.3 分區初始化流程
接下來我們詳細探討PartitionStateMachine的initializeLeaderAndIsrForPartitions方法。
Step1:首先獲取所有分區對應的在線副本,Seq< Map< TopicPartition, Seq< Int>> > liveReplicasPerPartition 來表示,類比Java的數據結構為List< Map< TopicPartition, List< Interger> >,代碼如下所示:
在Kafka中創建一個主題時,kafka首先會根據集群節點的負載情況,根據主題的分區數、副本數,物理機架等信息,生成靜態負載情況,存儲在/brokers/topics/{topicName},其數據如下圖所示:
而liveReplicasPerPartition是在這個數據結構的基礎上篩選出在線的broker,例如如果id為4的broker已下線,那么liveReplicasPerPartition中的值就可能如下所示:
["0":[0,1,2],"1":[1,2],"2":[2,0],"3":[0,1],"4":[0,2],"5":[1,0],"6":[0,2,1],"7":[1,0,2]]
Step2:如果一個分區所有預分配的分片都不在線,則打印錯誤日志,代碼如下所示:
Step3:為分區創建leaderIsrAndControllerEpoch信息,代碼如下所示:
這里的實現比較簡單,值得注意的是初始化時分區的Leader則為ISR列表中的第一個分區。
Step4:將分區的狀態信息 leaderIsrAndControllerEpoch(leader,isr,LeaderEpoch、ControllerEpoch)寫入到zookeeper中,具體代碼如下;
具體就是在zookeeper中創建/broker/topics/{topicName}/partitions/{分區序號}/state,并將leaderIsrAndControllerEpoch寫入到上述節點,具體效果如下圖所示:
Step5:對zookeeper寫入結果進行處理,對應的代碼如下所示:
如果在zookeeper中創建成功,將leaderIsrAndControllerEpoch信息緩存到內存中(Map< TopicPartition, leaderIsrAndControllerEpoch>)中,并將信息放入到controllerBrokerRequestBatch,Kafka Broker控制將信息同步到集群的其他Broker上,同時會在state-change.log日志文件中記錄狀態成功變更日志;如果創建失敗,則在state-change.log中輸出對應的錯誤日志。
當然:為了盡量保證上述過程成功創建,Zookeeper的寫入過程引入來重試機制來保證最終執行成功,除非一些類似AUTH_FAILED等不可恢復的異常。
分區的信息寫入到zookeeper的/broker/topics/{topicName}/partitions/{分區序號}/state文件路徑后,會再次調用changeTo方法,在內存中將分區的狀態變更為OnlineParttion。
那在什么時候觸發真正創建分區相關的文件夾呢?
原來在將分區信息寫入到zookeeper指定文件后,由于Kafka Controller訂閱了/broker/topics/{topicName}相關節點,故節點的創建會實時告知Kafka Controller,從而執行分區的選擇,具體的代碼如下所示:
通過Zookeeper的事件監聽機制,kafka就這樣巧妙的實現了分區狀態機的切換。
3、總結
通過上面的學習,我們對分區的理解應該更加深刻了,從這里我們至少能得出如下結論:
分區的狀態主要包括NonExistentPartition、NewPartition、OnlinePartition、OfflinePartition四個狀態,只有分區狀態為OnlinePartition才能對外提供讀與寫。
Kafka啟動時,在選舉好集群的控制器(Kafka Controller)后會啟動分區狀態機(PartitionStateMachine),Kafka會根據/brokers/topics/{topicName}/partitions/{partition_no}/state中的信息,驅動分區狀態向OnlineParttion轉換。
當新創建主題時,Kafka會根據當前集群的負載情況,主題需要創建的分區數量、副本數量,機架信息等,進行負載均衡,生成分區的意向leader,已經分區副本的分布情況,寫入到/brokers/topics/{topicName}節點上,此時會觸發PartitionModifications,從而觸發分區創建流程,即從NewPartition向OnlineParttion轉換。