Broker的實現邏輯-kafka知識體系(三)
上篇文章分享了kafka 生產端的邏輯,以及消息發送到緩存后由sender線程發送到Broker,那么Broker 是怎么進行數據接收和持久化的呢?下面我們從Broker 的網絡設計聊起。
Broker 網絡設計
kafka的網絡設計和Kafka的調優有關,這也是為什么它能支持高并發的原因。
Kafka的網絡三層架構
首先客戶端發送請求全部會先發送給一個Acceptor,broker里面會存在3個線程(默認是3個),這3個線程都是叫做processor,
Acceptor不會對客戶端的請求做任何的處理,直接封裝成一個個socketChannel發送給這些processor形成一個隊列,發送的方式是輪詢,就是先給第一個processor發送,然后再給第二個,第三個,然后又回到第一個。
消費者線程去消費這些socketChannel時,會獲取一個個request請求,這些request請求中就會伴隨著數據。
線程池里面默認有8個線程,這些線程是用來處理request的,解析請求,如果request是寫請求,就寫到磁盤里。讀的話返回結果。processor會從response中讀取響應數據,然后再返回給客戶端。這就是Kafka的網絡三層架構。

調優點1
所以如果我們需要對kafka進行增強調優,增加processor并增加線程池里面的處理線程,就可以達到效果。request和response那一塊部分其實就是起到了一個緩存的效果,是考慮到processor們生成請求太快,線程數不夠不能及時處理的問題。所以這就是一個加強版的reactor網絡線程模型。
Broker數據存儲設計
【partition 的數據文件】
我們知道topic 是邏輯上的概念,partition是topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。
例如創建2個topic名稱分別為report_push、launch_info, partitions數量都為partitions=4 存儲路徑和目錄規則為:xxx/message-folder
- |--report_push-0
- |--report_push-1
- |--report_push-2
- |--report_push-3
- |--launch_info-0
- |--launch_info-1
- |--launch_info-2
- |--launch_info-3
而partition物理上由多個segment組成。
【segment】log
每個segment 大小相等,順序讀寫.
每個segment數據文件以該段中最小的offset 命名,文件擴展名為.log
日志回滾受log.segment.bytes控制,默認1G;
這樣在查找指定offset 的Message 的時候,用二分查找(跳表)就可以定位到該Message 在哪個segment 數據文件中.
在磁盤上,一個partition就是一個目錄,然后每個segment由一個index文件和一個log文件組成。如下:
- $ tree kafka | head -n 6
- kafka
- ├── events-1
- │ ├── 00000000003064504069.index
- │ ├── 00000000003064504069.log
- │ ├── 00000000003065011416.index
- │ ├── 00000000003065011416.log
Segment下的log文件就是存儲消息的地方
每個消息都會包含消息體、offset、timestamp、key、size、壓縮編碼器、校驗和、消息版本號等。
在磁盤上的數據格式和producer發送到broker的數據格式一模一樣,也和consumer收到的數據格式一模一樣。由于磁盤格式與consumer以及producer的數據格式一模一樣,這樣就使得Kafka可以通過零拷貝(zero-copy)技術來提高傳輸效率。
【segment】index
索引文件是內存映射(memory mapped)的。
索引文件,一個稀疏格式的索引,受參數log.index.interval.bytes控制,默認4KB。即不是每條數據都會寫索引,默認每寫4KB數據才會寫一條索引。
Kafka 為每個分段后的數據文件建立了索引文件,文件名與數據文件的名字是一樣的,只是文件擴展名為.index.
index 文件中并沒有為數據文件中的每條 Message 建立索引,而是采用了稀疏存儲的方式,每隔一定字節的數據建立一條索引.
這樣避免了索引文件占用過多的空間,從而可以將索引文件保留在內存中。
有關內存映射:
- 即便是順序寫入硬盤,硬盤的訪問速度還是不可能追上內存。所以Kafka的數據并不是實時的寫入硬盤,它充分利用了現代操作系統分頁存儲來利用內存提高I/O效率。Memory Mapped Files(后面簡稱mmap)也被翻譯成內存映射文件,它的工作原理是直接利用操作系統的Page來實現文件到物理內存的直接映射。完成映射之后你對物理內存的操作會被同步到硬盤上(操作系統在適當的時候)。通過mmap,進程像讀寫硬盤一樣讀寫內存,也不必關心內存的大小有虛擬內存為我們兜底。mmap其實是Linux中的一個用來實現內存映射的函數,在Java NIO中可用MappedByteBuffer來實現內存映射。
【Kafka中通過offset查詢消息內容的整個流程】
Kafka 中存在一個 ConcurrentSkipListMap 來保存在每個日志分段。
offset-->concurrentSkipListMap-->找到baseOffset對應的日志分段-->讀取索引文件.index-->找打不大于offset-baseoffset的最大索引項-->讀取分段文件(.log)-->從日志分段文件(.log)中順序查找
當前索引文件的文件名即為 baseOffset 的值。
【日志留存策略】
Kafka 會定期檢查是否要刪除舊消息,見參數
log.retention.check.interval.ms,默認5分鐘。當前有三種日志留存策略:
基于空間:log.retention.bytes,默認未開啟;
基于時間:log.retention.hours(mintues/ms),默認7天;
基于起始位移:Kafka 0.11.0.0版本引入,解決流處理場景中已處理的中間消息刪除問題。
目前基于時間的日志留存策略最常使用。
調優點2
即盡力保持客戶端版本和 Broker 端版本一致
即盡力保持客戶端版本和 Broker 端版本一致。不要小看版本間的不一致問題,它會令 Kafka 喪失很多性能收益,比如 Zero Copy。

圖中藍色的 Producer、Consumer 和 Broker 的版本是相同的,它們之間的通信可以享受 Zero Copy 的快速通道;相反,一個低版本的 Consumer 程序想要與 Producer、Broker 交互的話,就只能依靠 JVM 堆中轉一下,丟掉了快捷通道,就只能走慢速通道了。因此,在優化 Broker 這一層時,你只要保持服務器端和客戶端版本的一致,就能獲得很多性能收益了。
Broker 副本機制
分區副本默認1,見參數
default.replication.factor。
【副本作用(并不提供讀寫分離)】
1、實現冗余,提高消息可靠性
2、實現高可用,參與leader選舉,在leader不可用時提高可用性。
3、leader處理partition的所有讀寫請求;follower會被動定期地去復制leader上的數據
【leader副本選舉】
1、由控制器負責
2、選舉機制或策略
所有的副本(replicas)統稱為Assigned Replicas,即AR
副本同步隊列(ISR)
SR是AR中的一個子集,由leader維護ISR列表,follower從leader同步數據有一些延遲。任意一個超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也會先存放在OSR中。AR=ISR+OSR
基本策略是從AR中找第一個存活的副本,且該副本在ISR中。
3、leader來維護:leader有單獨的線程定期檢測ISR中follower是否脫離ISR, 如果發現ISR變化,則會將新的ISR的信息返回到Zookeeper的相關節點中。
【副本機制的好處】
通常來講副本機制的好處:
1、提供數據冗余。即使系統部分組件失效,系統依然能夠繼續運轉,因而增加了整體可用性以及數據持久性。
2、提供高伸縮性。支持橫向擴展,能夠通過增加機器的方式來提升讀性能,進而提高讀操作吞吐量。
3、改善數據局部性。允許將數據放入與用戶地理位置相近的地方,從而降低系統延時。
對于 Apache Kafka 而言,目前只能享受到副本機制帶來的第 1 個好處,也就是提供數據冗余實現高可用性和高持久性。
對于客戶端用戶而言,Kafka 的追隨者副本沒有任何作用,它既不能像 MySQL 那樣幫助領導者副本“抗讀”,也不能實現將某些副本放到離客戶端近的地方來改善數據局部性。
Broker 高水位機制
【概念】
HW即高水位,是Kafka副本對象的重要屬性,分區的高水位由leader副本的高水位表示,含義是被follower副本同步之后的位置。
對于leader新寫入的消息,consumer不能立刻消費,leader會等待該消息被所有ISR中的replicas同步后更新HW,此時消息才能被consumer消費
【作用】
定義消息可見性,只有分區高水位以下的消息才能被消費;
幫助kafka完成副本同步,kafka是基于高水位實現的異步的副本同步機制。
【LEO的概念】
含義是日志末端位移(Log End Offset),下一條消息寫入的位移。
總結為什么MySQL的索引不采用kafka的索引機制?
既然kafka那么優秀那么快,為什么MySQL的索引不采用kafka的索引機制?
我們還要考慮一個問題:InnoDB中維護索引的代價比Kafka中的要高。Kafka中當有新的索引文件建立的時候ConcurrentSkipListMap才會更新,而不是每次有數據寫入時就會更新,這塊的維護量基本可以忽略,B+樹中數據有插入、更新、刪除的時候都需要更新索引,還會引來“頁分裂”等相對耗時的操作。Kafka中的索引文件也是順序追加文件的操作,和B+樹比起來工作量要小很多。
其實說到底還是應用場景不同所決定的。MySQL中需要頻繁地執行CRUD的操作,CRUD是MySQL的主要工作內容,而為了支撐這個操作需要使用維護量大很多的B+樹去支撐。Kafka中的消息一般都是順序寫入磁盤,再到從磁盤順序讀出(不深入探討page cache等),他的主要工作內容就是:寫入+讀取,很少有檢索查詢的操作,換句話說,檢索查詢只是Kafka的一個輔助功能,不需要為了這個功能而去花費特別太的代價去維護一個高level的索引。前面也說過,Kafka中的這種方式是在磁盤空間、內存空間、查找時間等多方面之間的一個折中。