一文講清 Kafka 工作流程和存儲機制
一、Kafka 文件存儲機制
topic構成
Kafka 中消息是以 topic 進行分類的,生產者生產消息,消費者消費消息,都是面向 topic 的。
在 Kafka 中,一個 topic 可以分為多個 partition,一個 partition 分為多個 segment,每個 segment 對應兩個文件:.index 和 .log 文件
topic 是邏輯上的概念,而 patition 是物理上的概念,每個 patition 對應一個 log 文件,而 log 文件中存儲的就是 producer 生產的數據,patition 生產的數據會被不斷的添加到 log 文件的末端,且每條數據都有自己的 offset。
消費組中的每個消費者,都是實時記錄自己消費到哪個 offset,以便出錯恢復,從上次的位置繼續消費。
消息存儲原理
由于生產者生產的消息會不斷追加到 log 文件末尾,為防止 log 文件過大導致數據定位效率低下,Kafka 采取了分片和索引機制,將每個 partition 分為多個 segment。每個 segment 對應兩個文件——.index文件和 .log文件。這些文件位于一個文件夾下,該文件夾的命名規則為:topic名稱+分區序號。
如下,我們創建一個只有一個分區一個副本的 topic
- > bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic starfish
然后可以在 kafka-logs 目錄(server.properties 默認配置)下看到會有個名為 starfish-0 的文件夾。如果,starfish 這個 topic 有三個分區,則其對應的文件夾為 starfish-0,starfish-1,starfish-2。
這些文件的含義如下:
類別 | 作用 |
---|---|
.index | 偏移量索引文件,存儲數據對應的偏移量 |
.timestamp | 時間戳索引文件 |
.log | 日志文件,存儲生產者生產的數據 |
.snaphot | 快照文件 |
leader-epoch-checkpoint | 保存了每一任leader開始寫入消息時的offset,會定時更新。follower被選為leader時會根據這個確定哪些消息可用 |
index 和 log 文件以當前 segment 的第一條消息的 offset 命名。偏移量 offset 是一個 64 位的長整形數,固定是20 位數字,長度未達到,用 0 進行填補,索引文件和日志文件都由此作為文件名命名規則。所以從上圖可以看出,我們的偏移量是從 0 開始的,.index 和 .log 文件名稱都為 00000000000000000000。
接著往 topic 中發送一些消息,并啟動消費者消費
- > bin /kafka-console-producer.sh --bootstrap-server localhost:9092 --topic starfishone
- > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic starfish --from-beginningone
查看 .log 文件下是否有數據 one
內容存在一些”亂碼“,因為數據是經過序列化壓縮的。
那么數據文件 .log 大小有限制嗎,能保存多久時間?這些我們都可以通過 Kafka 目錄下 conf/server.properties 配置文件修改:
- # log文件存儲時間,單位為小時,這里設置為1周
- log.retention.hours=168
- # log文件大小的最大值,這里為1g,超過這個值,則會創建新的segment(也就是新的.index和.log文件)
- log.segment.bytes=1073741824
比如,當生產者生產數據量較多,一個 segment 存儲不下觸發分片時,在日志 topic 目錄下你會看到類似如下所示的文件:
- 00000000000000000000.index
- 00000000000000000000.log
- 00000000000000170410.index
- 00000000000000170410.log
- 00000000000000239430.index
- 00000000000000239430.log
下圖展示了Kafka查找數據的過程:
.index文件 存儲大量的索引信息,.log文件 存儲大量的數據,索引文件中的元數據指向對應數據文件中 message 的物理偏移地址。
比如現在要查找偏移量 offset 為 3 的消息,根據 .index 文件命名我們可以知道,offset 為 3 的索引應該從00000000000000000000.index 里查找。根據上圖所示,其對應的索引地址為 756-911,所以 Kafka 將讀取00000000000000000000.log 756~911區間的數據。
二、Kafka 生產過程
Kafka 生產者用于生產消息。通過前面的內容我們知道,Kafka 的 topic 可以有多個分區,那么生產者如何將這些數據可靠地發送到這些分區?生產者發送數據的不同的分區的依據是什么?針對這兩個疑問,這節簡單記錄下。
3.2.1 寫入流程
producer 寫入消息流程如下:
- producer 先從 zookeeper 的 "/brokers/.../state"節點找到該 partition 的 leader
- producer 將消息發送給該 leader
- leader 將消息寫入本地 log
- followers 從 leader pull 消息,寫入本地 log 后向 leader 發送 ACK
- leader 收到所有 ISR 中的 replication 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向 producer 發送 ACK
2.1 寫入方式
producer 采用推(push) 模式將消息發布到 broker,每條消息都被追加(append) 到分區(patition) 中,屬于順序寫磁盤(順序寫磁盤效率比隨機寫內存要高,保障 kafka 吞吐率)。
2.2 分區(Partition)
消息發送時都被發送到一個 topic,其本質就是一個目錄,而 topic 是由一些 Partition Logs(分區日志)組成
分區的原因:
方便在集群中擴展,每個 Partition 可以通過調整以適應它所在的機器,而一個 topic 又可以有多個 Partition 組成,因此整個集群就可以適應任意大小的數據了;
可以提高并發,因為可以以 Partition 為單位讀寫了。
分區的原則:
我們需要將 producer 發送的數據封裝成一個 ProducerRecord 對象。
- public ProducerRecord (String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
- public ProducerRecord (String topic, Integer partition, Long timestamp, K key, V value)
- public ProducerRecord (String topic, Integer partition, K key, V value, Iterable<Header> headers)
- public ProducerRecord (String topic, Integer partition, K key, V value)
- public ProducerRecord (String topic, K key, V value)
- public ProducerRecord (String topic, V value)
2.3 副本(Replication) 同一個 partition 可能會有多個 replication( 對應 server.properties 配置中的 default.replication.factor=N)。沒有 replication 的情況下,一旦 broker 宕機,其上所有 patition 的數據都不可被消費,同時 producer 也不能再將數據存于其上的 patition。引入 replication 之后,同一個 partition 可能會有多個 replication,而這時需要在這些 replication 之間選出一 個 leader, producer 和 consumer 只與這個 leader 交互,其它 replication 作為 follower 從 leader 中復制數據。 2.4 數據可靠性保證 為保證 producer 發送的數據,能可靠的發送到指定的 topic,topic 的每個 partition 收到 producer 數據后,都需要向 producer 發送 ack(acknowledgement確認收到),如果 producer 收到 ack,就會進行下一輪的發送,否則重新發送數據。 a) 副本數據同步策略主要有如下兩種 Kafka 選擇了第二種方案,原因如下: b) ISR 采用第二種方案之后,設想一下情景:leader 收到數據,所有 follower 都開始同步數據,但有一個 follower 掛了,遲遲不能與 leader 保持同步,那 leader 就要一直等下去,直到它完成同步,才能發送 ack,這個問題怎么解決呢? leader 維護了一個動態的 in-sync replica set(ISR),意為和 leader 保持同步的 follower 集合。當 ISR 中的follower 完成數據的同步之后,leader 就會給 follower 發送 ack。如果 follower 長時間未向 leader 同步數據,則該 follower 將會被踢出 ISR,該時間閾值由 replica.lag.time.max.ms 參數設定。leader 發生故障之后,就會從 ISR 中選舉新的 leader。(之前還有另一個參數,0.9 版本之后 replica.lag.max.messages 參數被移除了) c) ack應答機制 對于某些不太重要的數據,對數據的可靠性要求不是很高,能夠容忍數據的少量丟失,所以沒必要等 ISR 中的follower全部接收成功。 所以Kafka為用戶提供了三種可靠性級別,用戶根據對可靠性和延遲的要求進行權衡,選擇以下的acks 參數配置 0:producer 不等待 broker 的 ack,這一操作提供了一個最低的延遲,broker 一接收到還沒有寫入磁盤就已經返回,當 broker 故障時有可能丟失數據; 1:producer 等待 broker 的 ack,partition 的 leader 落盤成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么將會丟失數據; -1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盤成功后才返回 ack。但是 如果在 follower 同步完成后,broker 發送 ack 之前,leader 發生故障,那么就會造成數據重復。 d) 故障處理 由于我們并不能保證 Kafka 集群中每時每刻 follower 的長度都和 leader 一致(即數據同步是有時延的),那么當leader 掛掉選舉某個 follower 為新的 leader 的時候(原先掛掉的 leader 恢復了成為了 follower),可能會出現leader 的數據比 follower 還少的情況。為了解決這種數據量不一致帶來的混亂情況,Kafka 提出了以下概念: 消費者和 leader 通信時,只能消費 HW 之前的數據,HW 之后的數據對消費者不可見。 針對這個規則: 所以數據一致性并不能保證數據不丟失或者不重復,這是由 ack 控制的。HW 規則只能保證副本之間的數據一致性! 2.5 Exactly Once語義 將服務器的 ACK 級別設置為 -1,可以保證 Producer 到 Server 之間不會丟失數據,即 At Least Once 語義。相對的,將服務器 ACK 級別設置為 0,可以保證生產者每條消息只會被發送一次,即 At Most Once語義。 At Least Once 可以保證數據不丟失,但是不能保證數據不重復。相對的,At Most Once 可以保證數據不重復,但是不能保證數據不丟失。但是,對于一些非常重要的信息,比如說交易數據,下游數據消費者要求數據既不重復也不丟失,即 Exactly Once 語義。在 0.11 版本以前的 Kafka,對此是無能為力的,只能保證數據不丟失,再在下游消費者對數據做全局去重。對于多個下游應用的情況,每個都需要單獨做全局去重,這就對性能造成了很大的影響。 0.11 版本的 Kafka,引入了一項重大特性:冪等性。所謂的冪等性就是指 Producer 不論向 Server 發送多少次重復數據。Server 端都會只持久化一條,冪等性結合 At Least Once 語義,就構成了 Kafka 的 Exactily Once 語義,即:At Least Once + 冪等性 = Exactly Once 要啟用冪等性,只需要將 Producer 的參數中 enable.idompotence 設置為 true 即可。Kafka 的冪等性實現其實就是將原來下游需要做的去重放在了數據上游。開啟冪等性的 Producer 在初始化的時候會被分配一個 PID,發往同一 Partition 的消息會附帶 Sequence Number。而 Broker 端會對 但是 PID 重啟就會變化,同時不同的 Partition 也具有不同主鍵,所以冪等性無法保證跨分區會話的 Exactly Once。 三、Broker 保存消息 3.1 存儲方式 物理上把 topic 分成一個或多個 patition(對應 server.properties 中的 num.partitions=3 配置),每個 patition 物理上對應一個文件夾(該文件夾存儲該 patition 的所有消息和索引文件)。 3.2 存儲策略 無論消息是否被消費, kafka 都會保留所有消息。有兩種策略可以刪除舊數據: 基于時間:log.retention.hours=168 基于大小:log.retention.bytes=1073741824 需要注意的是,因為 Kafka 讀取特定消息的時間復雜度為 O(1),即與文件大小無關, 所以這里刪除過期文件與提高 Kafka 性能無關。 四、Kafka 消費過程 Kafka 消費者采用 pull 拉模式從 broker 中消費數據。與之相對的 push(推)模式很難適應消費速率不同的消費者,因為消息發送速率是由 broker 決定的。它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成 consumer 來不及處理消息。而 pull 模式則可以根據 consumer 的消費能力以適當的速率消費消息。 pull 模式不足之處是,如果 kafka 沒有數據,消費者可能會陷入循環中,一直返回空數據。為了避免這種情況,我們在我們的拉請求中有參數,允許消費者請求在等待數據到達的“長輪詢”中進行阻塞(并且可選地等待到給定的字節數,以確保大的傳輸大小,或者傳入等待超時時間)。 4.1 消費者組 消費者是以 consumer group 消費者組的方式工作,由一個或者多個消費者組成一個組, 共同消費一個 topic。每個分區在同一時間只能由 group 中的一個消費者讀取,但是多個 group 可以同時消費這個 partition。在圖中,有一個由三個消費者組成的 group,有一個消費者讀取主題中的兩個分區,另外兩個分別讀取一個分區。某個消費者讀取某個分區,也可以叫做某個消費者是某個分區的擁有者。 在這種情況下,消費者可以通過水平擴展的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那么其他的 group 成員會自動負載均衡讀取之前失敗的消費者讀取的分區。 消費者組最為重要的一個功能是實現廣播與單播的功能。一個消費者組可以確保其所訂閱的 Topic 的每個分區只能被從屬于該消費者組中的唯一一個消費者所消費;如果不同的消費者組訂閱了同一個 Topic,那么這些消費者組之間是彼此獨立的,不會受到相互的干擾。 如果我們希望一條消息可以被多個消費者所消費,那么可以將這些消費者放到不同的消費者組中,這實際上就是廣播的效果;如果希望一條消息只能被一個消費者所消費,那么可以將這些消費者放到同一個消費者組中,這實際上就是單播的效果。 4.2 分區分配策略 一個 consumer group 中有多個 consumer,一個 topic 有多個 partition,所以必然會涉及到 partition 的分配問題,即確定哪個 partition 由哪個 consumer 來消費。 Kafka 有兩種分配策略,一是 RoundRobin,一是 Range。 RoundRobin RoundRobin 即輪詢的意思,比如現在有一個三個消費者 ConsumerA、ConsumerB 和 ConsumerC 組成的消費者組,同時消費 TopicA 主題消息,TopicA 分為 7 個分區,如果采用 RoundRobin 分配策略,過程如下所示: 圖片:mrbird.cc 這種輪詢的方式應該很好理解。但如果消費者組消費多個主題的多個分區,會發生什么情況呢?比如現在有一個兩個消費者 ConsumerA 和 ConsumerB 組成的消費者組,同時消費 TopicA 和 TopicB 主題消息,如果采用RoundRobin 分配策略,過程如下所示: 注:TAP0 表示 TopicA Partition0 分區數據,以此類推。 這種情況下,采用 RoundRobin 算法分配,多個主題會被當做一個整體來看,這個整體包含了各自的 Partition,比如在 Kafka-clients 依賴中,與之對應的對象為 TopicPartition。接著將這些 TopicPartition 根據其哈希值進行排序,排序后采用輪詢的方式分配給消費者。 但這會帶來一個問題:假如上圖中的消費者組中,ConsumerA 只訂閱了 TopicA 主題,ConsumerB 只訂閱了TopicB 主題,采用 RoundRobin 輪詢算法后,可能會出現 ConsumerA 消費了 TopicB 主題分區里的消息,ConsumerB 消費了 TopicA 主題分區里的消息。 綜上所述,RoundRobin 算法只適用于消費者組中消費者訂閱的主題相同的情況。同時會發現,采用 RoundRobin 算法,消費者組里的消費者之間消費的消息個數最多相差 1 個。 Range Kafka 默認采用 Range 分配策略,Range 顧名思義就是按范圍劃分的意思。 比如現在有一個三個消費者 ConsumerA、ConsumerB 和 ConsumerC 組成的消費者組,同時消費 TopicA 主題消息,TopicA分為7個分區,如果采用 Range 分配策略,過程如下所示: 假如現在有一個兩個消費者 ConsumerA 和 ConsumerB 組成的消費者組,同時消費 TopicA 和 TopicB 主題消息,如果采用 Range 分配策略,過程如下所示: Range 算法并不會把多個主題分區當成一個整體。 從上面的例子我們可以總結出Range算法的一個弊端:那就是同一個消費者組內的消費者消費的消息數量相差可能較大。 4.3 offset 的維護 由于 consumer 在消費過程中可能會出現斷電宕機等故障,consumer 恢復后,需要從故障前的位置繼續消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復后繼續消費。 Kafka 0.9 版本之前,consumer 默認將 offset 保存在 Zookeeper 中,從 0.9 版本開始,consumer 默認將 offset保存在 Kafka 一個內置的 topic 中,該 topic 為 _consumer_offsets。 消費 topic 后,查看 kafka-logs 目錄,會發現多出 50 個分區。 默認情況下__consumer_offsets 有 50 個分區,如果你的系統中 consumer group 也很多的話,那么這個命令的輸出結果會很多 五、Kafka事務 Kafka 從 0.11 版本開始引入了事務支持。事務可以保證 Kafka 在 Exactly Once 語義的基礎上,生產和消費可以跨分區和會話,要么全部成功,要么全部失敗。 5.1 Producer事務 為了了實現跨分區跨會話的事務,需要引入一個全局唯一的 TransactionID,并將 Producer 獲得的 PID 和Transaction ID 綁定。這樣當 Producer 重啟后就可以通過正在進行的 TransactionID 獲得原來的 PID。 為了管理 Transaction,Kafka 引入了一個新的組件 Transaction Coordinator。Producer 就是通過和 Transaction Coordinator 交互獲得 Transaction ID 對應的任務狀態。Transaction Coordinator 還負責將事務所有寫入 Kafka 的一個內部 Topic,這樣即使整個服務重啟,由于事務狀態得到保存,進行中的事務狀態可以得到恢復,從而繼續進行。 5.2 Consumer事務 對 Consumer 而言,事務的保證就會相對較弱,尤其是無法保證 Commit 的消息被準確消費。這是由于Consumer 可以通過 offset 訪問任意信息,而且不同的 SegmentFile 生命周期不同,同一事務的消息可能會出現重啟后被刪除的情況。 參考: 尚硅谷Kafka教學 部分圖片來源:mrbird.cc https://gitbook.cn/books/5ae1e77197c22f130e67ec4e/index.html
方案
優點
缺點
半數以上完成同步,就發送ack
延遲低
選舉新的 leader 時,容忍n臺節點的故障,需要2n+1個副本
全部完成同步,才發送ack
選舉新的 leader 時,容忍n臺節點的故障,需要 n+1 個副本
延遲高