kafka高性能原理分析,你看懂了嗎?
一、消費者消費消息offset存儲
kafka的所有消息都是持久化存儲在broker上的,消費者每次消費消息是如何知道獲取哪一條呢?kafka提供一個專門的tipic存儲每個consumer group的消費消息的offset,offset保證消息在分區內部有序,所以每次消費者都可以知道自己要從哪一條消息開始消費。__consumer_offsets_* 的一個topic ,把 offset 信 息 寫 入 到 這 個 topic 中。__consumer_offsets 默認有50 個分區。broker按照以下規則,存儲消費者組的消費offset到對應的 __consumer_offsets分區文件中。
Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 默 認 情 況 下groupMetadataTopicPartitionCount 有 50 個分區,假如groupid=”KafkaConsumerDemo”,計算得到的結果為:35, 意味著當前的consumer_group 的位移信息保存在__consumer_offsets 的第 35 個分區,可以用命令格式化查看分區數據
kafka-simple-consumer-shell.sh –topic __consumer_offsets –partition 35 –broker-list 192.168.0.15:9092 –formatter “kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter”
或者直接使用ui工具查看分區數據。
消費消息的offset保存,是按照整個消費者group來分配保存的,同一個group的消費者offset保存在同一個__consumer_offsets分區。
二、消息持久化存儲
首先在kafka里面,消息都是需要持久化存儲的,不會分持久化和非持久化消息。存儲的方式是基于索引文件+內容文件的方式來進行存儲。下面看一下有關存儲的相關內容。
消息存儲的路徑
首先我們知道,一個topic可以有多個分區,然后多個分區按照取模算法分配到集群中的多個broker中。其次一個topic的每一個分區的消息都是分開存儲的,例如一個topic test,有三個分區。就會創建三個文件夾 test_0,test_1,test_2,去存儲消息,消息的結構上面說了,就是index+內容的組合。例如有一個test3p的topic,在單個broker集群環境下,可以看到在dataDir的目錄下面生成了如下三個文件夾。
總的來說消息按照不同分區來進行存儲。
消息存儲機制詳細解析
在對應的分區文件夾內部是如何存儲消息的呢?
log.segment.bytes?
log.segment.bytes是配置文件里面的一個重要配置,當內容文件達到這個配置的字節數大小時,消息存儲的內容文件就會分隔,新增一個內容文件來存儲內容,新內容文件的命名是上一個內容文件存儲的最后一個offset命令。
上面這圖是我設置log.segment.bytes=10000,然后不停發送消息測試結果,我發送的消息內容大小是固定的,可以看到大約是在經過26000個offset左右就會新加一個log文件,同時會成對新增index,timindex文件。這個就是kafka的logSegment,消息文件分片,控制文件大小可以提高io性能。
每種存儲文件的作用
00000000000000000000.index?
這個就是一個索引文件,里面存儲對消息內容文件的物理索引,可以快速定位消息內容所在,內容類似下面格式。
執行命令查看。
kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafkalogs/test3p-0/00000000000000000000.index --print-datalog
offset: 48 position: 4128
offset: 96 position: 8256
offset: 144 position: 12373
上面就是查看結果,offset就是消息在分區內部的offset,partition就是一個物理地址,用于索引內容,可以看出這里的索引是屬于稀疏索引,并不是每個offset都存儲消息的物理地址。
00000000000000000000.log?
這個就是內容文件,同樣可以使用上面使用的命令查看內容,截取部分結果如下。?
producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 0 payload: isAsyncSend48
offset: 151 position: 12968 CreateTime: 1534321675701 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: - payload: isAsyncSend45
offset: 152 position: 13053 CreateTime: 1534321675705 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: * payload: isAsyncSend42
offset: 153 position: 13138 CreateTime: 1534321675706 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: ' payload: isAsyncSend39
offset: 154 position: 13223 CreateTime: 1534321675706 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: $ payload: isAsyncSend36
offset: 155 position: 13308 CreateTime: 1534321675706 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: ! payload: isAsyncSend33
offset: 156 position: 13393 CreateTime: 1534321675707 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend30
offset: 157 position: 13478 CreateTime: 1534321675707 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: ayload: isAsyncSend27
offset: 158 position: 13563 CreateTime: 1534321675707 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend24
offset: 159 position: 13648 CreateTime: 1534321675707 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend21
offset: 160 position: 13733 CreateTime: 1534321675708 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend18
offset: 161 position: 13818 CreateTime: 1534321675708 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend15
offset: 162 position: 13903 CreateTime: 1534321675708 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key:
payload: isAsyncSend12
offset: 163 position: 13988 CreateTime: 1534321675708 isvalid: true keysize: 4 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend9
offset: 164 position: 14072 CreateTime: 1534321675709 isvalid: true keysize: 4 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend6
offset: 165 position: 14156 CreateTime: 1534321675709 isvalid: true keysize: 4 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: payload: isAsyncSend3
?
可以看出消息內容文件存儲了offset、position和payload等內容,通過索引就可以快速定位到position位置,找到消息內容。
實際的查找算法過程?
1.索引文件命名是有序的,因此使用二分查找的方式,可以快速查詢到消息對應的索引文件
2.在對應的索引文件中,由于使用的是稀疏索引,所以利用offset查找符合offset范圍的position。
3.得到position之后自然可以快速從position位置開始查找對應offset的消息,而不必從頭搜索
三、消息日志的清理與壓縮
消息清理?
消息日志的能夠分段存儲,一方面能夠減少單個文件 內容的大小,另一方面,方便kafka進行日志清理。日志的 清理策略有兩個分別是按消息時間和topic消息大小來清理。
1. 根據消息的保留時間,當消息在 kafka 中保存的時間超 過了指定的時間,就會觸發清理過程
2. 根據topic存儲的數據大小,當topic所占的日志文件大 小大于一定的閥值,則可以開始刪除最舊的消息。kafka 會啟動一個后臺線程,定期檢查是否存在可以刪除的消 息 通過 log.retention.bytes 和 log.retention.hours 這兩個參 數來設置,當其中任意一個達到要求,都會執行刪除。默認的保留時間是:7天
消息壓縮?
Kafka 還提供了“日志壓縮(Log Compaction)”功能,通過這個功能可以有效的減少日志文件的大小,緩解磁盤緊 張的情況,在很多實際場景中,消息的 key 和 value 的值 之間的對應關系是不斷變化的,就像數據庫中的數據會不 斷被修改一樣,消費者只關心key對應的最新的value。因 此,我們可以開啟 kafka 的日志壓縮功能,服務端會在后 臺啟動啟動Cleaner線程池,定期將相同的key進行合并, 只保留最新的value值。
四、kafka高性能io
機械結構的磁盤,如果把消息以隨機的方式寫入到磁盤,那么磁盤首先要做的就是 尋址,也就是定位到數據所在的物理地址,在磁盤上就要 找到對應的柱面、磁頭以及對應的扇區;這個過程相對內 存來說會消耗大量時間,為了規避隨機讀寫帶來的時間消 耗,kafka采用順序寫的方式存儲數據來避免這個過程。
但是 頻繁的 I/O 操作仍然會造成磁盤的性能瓶頸,所以 kafka 還有一個重要的性能策略,零拷貝。
如果不使用零拷貝技術,要把數據從磁盤讀出并且發送到網卡需要進行以下步驟:
- 操作系統將數據從磁盤讀入到內核空間的頁緩存
- 應用程序將數據從內核空間讀入到用戶空間緩存中
- 應用程序將數據寫回到內核空間到socket緩存中
- 操作系統將數據從socket緩沖區復制到網卡緩沖區,最后將數據經網絡發出
這個過程涉及到4次上下文切換以及4次數據復制,并且有兩次復制操作是由 CPU 完成。但是這個過程中,數據完全沒有 進行變化,僅僅是從磁盤復制到網卡緩沖區。
如果是零拷貝技術的話,,可以去掉這些沒必要的數據復制操作, 同時也會減少上下文切換次數;現代的unix操作系統提供 一個優化的代碼路徑,用于將數據直接從頁緩存傳輸到socket;在 Linux 中通過 sendfile 系統調用來完成的。Java 提 供了訪問這個系統調用的方法,FileChannel.transferTo API ,這樣就可以直接跳過數據復制到用戶空間然后又從用戶控制復制到socket的過程。