Kafka源碼系列之以kafka為例講解分布式存儲(chǔ)系統(tǒng)
Kafka源碼系列,kafka 0.8.2.2為例給大家講解。目標(biāo)是大家讀完kafka源碼系列能徹底了解kafka,***能設(shè)計(jì)處自己的消息隊(duì)列或者存儲(chǔ)系統(tǒng)。
一,分布式系統(tǒng)的CAP理論
1,理論首先把分布式系統(tǒng)中的三個(gè)特性進(jìn)行了如下歸納:
一致性(C):在分布式系統(tǒng)中的所有數(shù)據(jù)備份,在同一時(shí)刻是否同樣的值。(等同于所有節(jié)點(diǎn)訪問同一份***的數(shù)據(jù)副本)
可用性(A):在集群中一部分節(jié)點(diǎn)故障后,集群整體是否還能響應(yīng)客戶端的讀寫請(qǐng)求。(對(duì)數(shù)據(jù)更新具備高可用性)
分區(qū)容錯(cuò)性(P):多副本進(jìn)行容錯(cuò)。以實(shí)際效果而言,分區(qū)相當(dāng)于對(duì)通信的時(shí)限要求。系統(tǒng)如果不能在時(shí)限內(nèi)達(dá)成數(shù)據(jù)一致性,就意味著發(fā)生了分區(qū)的情況,必須就當(dāng)前操作在C和A之間做出選擇。
2,CAP理論實(shí)踐中的妥協(xié)
由于CAP理論在分布式存儲(chǔ)系統(tǒng)中,做多只能實(shí)現(xiàn)上面兩點(diǎn)。而現(xiàn)實(shí)環(huán)境是很復(fù)雜的,比如網(wǎng)絡(luò)抖動(dòng)及故障,硬件故障等問題,分區(qū)容錯(cuò)是我們必須要實(shí)現(xiàn)的。所以我們只能在一致性和可用性之間權(quán)衡。
A),CP系統(tǒng)-一致性優(yōu)先原則
要實(shí)現(xiàn)強(qiáng)一致性的原則有很多方式,最簡(jiǎn)單的方式就是一個(gè)master節(jié)點(diǎn)和任意數(shù)目的包含冗余備份的附屬節(jié)點(diǎn)。數(shù)據(jù)永遠(yuǎn)從master寫入和讀取。但是這個(gè)是存在單點(diǎn)故障的,由于master的故障會(huì)導(dǎo)致系統(tǒng)不可用,就此而言是放棄了可用性。但是一般情況下我們都有容錯(cuò)機(jī)制,讓從屬節(jié)點(diǎn)變?yōu)閙aster,錯(cuò)誤處理完成系統(tǒng)即可用了。
B),AP系統(tǒng)可用性優(yōu)先原則
選擇支持可用性和分區(qū)容錯(cuò)性,并犧牲一致性的系統(tǒng)被稱為具有”最終一致性”。特點(diǎn)是,我們可以從任意的一個(gè)節(jié)點(diǎn)寫入,該節(jié)點(diǎn)負(fù)責(zé)將數(shù)據(jù)同步到其它節(jié)點(diǎn)。讀取的時(shí)候只需訪問數(shù)據(jù)存在的一個(gè)節(jié)點(diǎn)就夠了,但是可用會(huì)存在從某個(gè)節(jié)點(diǎn)讀取的數(shù)據(jù)不是***的,也即系統(tǒng)不具備一致性。
C),靈活的一致性程度
三個(gè)特性之間權(quán)衡并不是非黑即白,其實(shí)可以平緩過度,達(dá)到***系統(tǒng)性能要求。
比如,在AP系統(tǒng)中,假如數(shù)據(jù)有三個(gè)冗余副本,我們可以通過調(diào)節(jié)請(qǐng)求數(shù)據(jù)的節(jié)點(diǎn)數(shù)目來達(dá)到高的一致性,比如我們同時(shí)向三個(gè)副本請(qǐng)求數(shù)據(jù),那么我們就滿足了強(qiáng)一致性,但是代價(jià)是喪失了容錯(cuò)性。通常,我們可以要求特定數(shù)目的節(jié)點(diǎn)或者大多數(shù)節(jié)點(diǎn)可用并且能返回一致性結(jié)果,是在一致性和容錯(cuò)性中進(jìn)行權(quán)衡的一個(gè)不錯(cuò)的方法。
同樣的在CP系統(tǒng)中,我們可以運(yùn)行從附屬節(jié)點(diǎn)中讀取數(shù)據(jù),犧牲一部分一致性來達(dá)到高的可用性。如果保持仍然只能想master寫數(shù)據(jù),那么我們還是高的一致性的寫入操作,但是允許讀取操作最終一致性。
我們可以根據(jù)具體的用例,調(diào)整CAP各種特性的強(qiáng)度,使之最適合用例的需要。甚至可以對(duì)同一個(gè)應(yīng)用程序、同一個(gè)數(shù)據(jù)庫中的不同類型的數(shù)據(jù)混合使用這些策略。
二,設(shè)計(jì)自己的分布式存儲(chǔ)系統(tǒng)
設(shè)計(jì)一個(gè)分布式存儲(chǔ)系統(tǒng),并不難,難在如何保證系統(tǒng)的健壯性或者叫魯棒性。至于原因,在這里浪尖只想說一句話那就是網(wǎng)絡(luò)是不可靠的。
目前典型的分布式存儲(chǔ)系統(tǒng)的結(jié)構(gòu)為:
元數(shù)據(jù)服務(wù)器,數(shù)據(jù)存儲(chǔ)節(jié)點(diǎn),客戶端。
數(shù)據(jù)的存取過程:
客戶端會(huì)先獲取元數(shù)據(jù)信息,然后根據(jù)元數(shù)據(jù)信息去特定的節(jié)點(diǎn)讀寫數(shù)據(jù)。元數(shù)據(jù)維護(hù)了數(shù)據(jù)在所有節(jié)點(diǎn)的存儲(chǔ)情況,副本情況等等。數(shù)據(jù)存儲(chǔ)節(jié)點(diǎn)會(huì)完成副本數(shù)據(jù)同步的過程。
與此同時(shí),我們會(huì)要求分布式數(shù)據(jù)存儲(chǔ)系統(tǒng)包含以下三個(gè)特性:
1,數(shù)據(jù)備份機(jī)制,順利的處理某個(gè)節(jié)點(diǎn)無法訪問的情況。
2,提供備份一致性的機(jī)制-----當(dāng)用戶請(qǐng)求數(shù)據(jù)的時(shí)候能獲得最近更新的數(shù)據(jù)(一致性)。
3,線性擴(kuò)展機(jī)制-----20個(gè)節(jié)點(diǎn)的吞吐量是10個(gè)節(jié)點(diǎn)的兩倍。
三,剖析kafka存儲(chǔ)系統(tǒng)
不要抬杠說kafka是消息隊(duì)列不是存儲(chǔ)系統(tǒng)。
1,Kafka的系統(tǒng)整個(gè)體系的角色:
1),zookeeper
記錄的有kafka的Broker,kafka 的Broker controller選舉,topic發(fā)布,配置更新,分區(qū)新增等都是客戶端通過zookeeper發(fā)布到Crontroller的。
2),producer
負(fù)責(zé)發(fā)送數(shù)據(jù)到kafka
3),consumer
負(fù)責(zé)從kafka取數(shù)據(jù)
4),broker
負(fù)責(zé)數(shù)據(jù)接收、存儲(chǔ)、管理等
5),topic 和 partition
Topic是代表一個(gè)種類的數(shù)據(jù)。Partition是對(duì)topic進(jìn)行細(xì)分,保證吞吐量和處理并發(fā)度的關(guān)鍵,并且是集群數(shù)據(jù)備份的單元。
2,從常見的分布式存儲(chǔ)系統(tǒng)的角色來看:
客戶端:producer,consumer,zookeeper(topic,partition,Broker等相關(guān)的變更都是通過zookeeper通知到集群的controller的,要是覺得牽強(qiáng)可以將其歸屬到元數(shù)據(jù)集群)。
存儲(chǔ)系統(tǒng):Broker集群
元數(shù)據(jù)集群:zookeeper集群。其實(shí)每個(gè)Broker都會(huì)存儲(chǔ)一份元數(shù)據(jù)(client-->zookeeper-->
Controller-->普通Broker)。
3,kafka的分布式存儲(chǔ)特性
1),數(shù)據(jù)備份,故障恢復(fù)
分兩個(gè)部分:
A),Broker故障恢復(fù).Broker注冊(cè)到zookeeper,臨時(shí)zknode,/brokers/ids/[0...N],臨時(shí)節(jié)點(diǎn)保存的是advertisedHost:advertisedPort,并會(huì)初始化SessionExpireListener該listener會(huì)監(jiān)聽Broker自己的臨時(shí)節(jié)點(diǎn)(會(huì)話超時(shí)重新注冊(cè))。Crontroller就可以監(jiān)聽這個(gè)目錄下的臨時(shí)節(jié)點(diǎn),會(huì)得知Brokers是否已經(jīng)宕機(jī),或者是否有新的Broker加入到節(jié)點(diǎn).
Brokers集群通過向zookeeper注冊(cè)臨時(shí)節(jié)點(diǎn)/controller,來選舉Crontroller,并且每個(gè)Broker都會(huì)監(jiān)聽該臨時(shí)節(jié)點(diǎn),通過臨時(shí)節(jié)點(diǎn)的變動(dòng)來決定是否進(jìn)行Crontroller的選舉。Crontroller宕機(jī),觸發(fā)其它Broker進(jìn)行Crontroller重新選舉。來進(jìn)行容錯(cuò)。
B),topic表示一類消息,topic劃分為若干partition,對(duì)每個(gè)partition進(jìn)行數(shù)據(jù)的讀寫操作。Partition會(huì)有若干副本,副本會(huì)選舉一個(gè)leader,然后數(shù)據(jù)的寫入和讀取都是通過leader來實(shí)現(xiàn),這就實(shí)現(xiàn)了強(qiáng)一致性CP。與此同時(shí)引入了一個(gè)單點(diǎn)故障的問題,故障恢復(fù)機(jī)制是從isr列表里重新選舉出leader。假如數(shù)據(jù)在flower從leader同步數(shù)據(jù)存在滯后的話,會(huì)導(dǎo)致數(shù)據(jù)丟失,那么此時(shí),我們可以通過下面的配置,我們可以保證故障轉(zhuǎn)移之后會(huì)不會(huì)有數(shù)據(jù)丟失。
屬性的詳細(xì)介紹:
Request.required.acks:有三個(gè)取值分別的含義是:
0-不等待Broker應(yīng)答,立即返回。
1-等待leader數(shù)據(jù)提交成功的應(yīng)答。
-1-等待min.insync.replicas數(shù)目個(gè)副本都接收到數(shù)據(jù)才會(huì)視為寫入成功。
該參數(shù)要結(jié)合min.insync.replicas來使用,當(dāng)request.required.acks設(shè)置為-1時(shí),isr列表里min.insync.replicas數(shù)目個(gè)副本數(shù)據(jù)寫入完畢,才算消息生產(chǎn)成功。
min.insync.replicas數(shù)值不能超過副本數(shù)總數(shù),假如相等的話,有一個(gè)副本不可用即會(huì)導(dǎo)致集群癱瘓。一般是replication.factor = min.insync.replicas +1即可。
2),數(shù)據(jù)備份一致性的機(jī)制
副本會(huì)選舉出leader,其余follower。數(shù)據(jù)的寫入和讀取都是經(jīng)過leader,以此來實(shí)現(xiàn)數(shù)據(jù)備份的一致性。所以,數(shù)據(jù)備份的一致性是CP,強(qiáng)一致性。Leader存在故障恢復(fù)機(jī)制:leader宕機(jī),從isr列表里選舉出新的leader。
3),線性擴(kuò)展機(jī)制
該機(jī)制對(duì)于kafka來說也是分兩個(gè)部分:
A),Broker的線性擴(kuò)展
新的Broker加入集群,Crontroller會(huì)感知到變化。但是已有的分區(qū)或者數(shù)據(jù)不會(huì)重新分布到新的Broker上去,假如沒有新增topic或者不進(jìn)行人工遷移等操作的話新的Broker不會(huì)有數(shù)據(jù)。增加集群假如是非異構(gòu)機(jī)器的話集群性能應(yīng)該是線性增加的。
B),給某個(gè)topic擴(kuò)大分區(qū)數(shù),也會(huì)增加topic的并發(fā)度,前提是磁盤數(shù)目要合適。該增加也是會(huì)增加topic吞吐量。
4,client與kafka集群之間通訊的機(jī)制
這篇文章之后講大致過程,后面會(huì)陸續(xù)出文章講細(xì)節(jié)部分。
A),Command---->zookeeper---->broker Controller---->Brokers
這個(gè)相當(dāng)于基于zookeeper做了一個(gè)訂閱發(fā)布系統(tǒng)。Topic創(chuàng)建配置更新等都是通過這種方式傳達(dá)給所有Brokers Controller,然后由Broker Crontroller傳遞給所有的Brokers。
B),producer/SampleConsumer---->Brokers partitions leader----->follower
這個(gè)在<Kafka源碼系列之通過源碼分析Producer性能瓶頸>那講已經(jīng)說過,請(qǐng)求分兩步:
1),***步隨機(jī)選一個(gè)Broker,然后獲取topic相關(guān)的元數(shù)據(jù),如leader的位置等。
2),構(gòu)建鏈接到所有l(wèi)eader所在Broker的連接池,進(jìn)行數(shù)據(jù)的讀寫。
假如是生產(chǎn)消息的話follower會(huì)主動(dòng)從leader上獲取滯后的消息。
C),high consumer--->zookeeper---->brokers leader----->brokers follower
這個(gè)獲取數(shù)據(jù)的方式比上個(gè)步驟多了個(gè)環(huán)節(jié),就是從zookeeper上獲取Broker的ip和port,而上個(gè)方式是直接在配置里寫明了Broker的ip和port。
在上個(gè)基礎(chǔ)上又基于zookeeper做了一些優(yōu)化,增加了三個(gè)重要的zookeeper的listener:
1),ZKRebalancerListener
該listener監(jiān)聽的是/consumers/group/ids目錄,當(dāng)該目錄下的有子節(jié)點(diǎn)增刪的時(shí)候會(huì)觸發(fā),rebalance。假如嘗試4次數(shù)后不能成功就會(huì)拋出一下異常
- throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries")
2),ZKSessionExpireListener
監(jiān)聽的是每個(gè)consumer自己臨時(shí)節(jié)點(diǎn)(/consumers/group/ids/consumerID)的刪除與注冊(cè)(無動(dòng)作),臨時(shí)節(jié)點(diǎn)刪除時(shí)handleNewSession在處理函數(shù)里需要重新想zookeeper注冊(cè)該節(jié)點(diǎn)。也會(huì)觸發(fā)rebalance。
3),ZKTopicPartitionChangeListener
該listener,監(jiān)控的是/brokers/topics/topicName節(jié)點(diǎn)的數(shù)據(jù)變動(dòng),也即是分區(qū)的變動(dòng),假如有新的分區(qū)增加也會(huì)觸發(fā)rebalance。
四,總結(jié)
本文主要是想幫助大家理解設(shè)計(jì)一套分布式存儲(chǔ)系統(tǒng),首先介紹了CAP理論,接著講了分布式存儲(chǔ)系統(tǒng)的幾個(gè)要素,***以kafka為例,講解了kafka這個(gè)分布式消息隊(duì)列或者分布式存儲(chǔ)系統(tǒng)的結(jié)構(gòu)。希望能幫助到大家。***,提筆做個(gè)埋點(diǎn),關(guān)于zookeeper的理論及使用我們后面會(huì)跟大家娓娓道來。