攜程基于Kafka的數(shù)據(jù)校驗代理在FinOps領域的應用
一、現(xiàn)狀與問題
1.1 現(xiàn)狀
圖1-1
如圖1-1所示,攜程目前使用了混合多云的模式,同時也以自建PaaS服務為主,因此計費系統(tǒng)除了需要從云商獲取賬單等信息,還需要接入幾十種自建PaaS服務的用量信息。其主要結構如下:
1)TripCostAllocationProtocol: 為了計費接入的擴展性,計費系統(tǒng)設計了計費協(xié)議,兼容混合多云模式,支持自建及原生PaaS/SaaS服務。
2)計費數(shù)據(jù)接入:云原生服務統(tǒng)一由計費系統(tǒng)處理,自建服務由各團隊按TripCostAllocationProtocol定期打點的方式,將用量信息投遞到Kafka中。
3)計費處理:計費系統(tǒng)根據(jù)接入的賬單、用量以及服務間的關系,進行遞歸結算,并將結果落到內(nèi)部數(shù)倉。
1.2 問題描述
計費系統(tǒng)上線后,成功為管理層、運營以及研發(fā)等角色提供了計費及成本分析的能力。在數(shù)據(jù)質(zhì)量上,系統(tǒng)在數(shù)倉結果表上創(chuàng)建了對應的檢測規(guī)則,針對明顯違反業(yè)務邏輯的數(shù)據(jù)可以觸發(fā)告警。但是,隨著系統(tǒng)的推廣,數(shù)據(jù)質(zhì)量的問題仍然居高不下,具體表現(xiàn)如下:
問題發(fā)現(xiàn):
a. 覆蓋率低:針對數(shù)據(jù)錯誤程度在一定范圍內(nèi),但仍然符合業(yè)務邏輯的問題,無法被檢測到。
b. 及時性差:由于檢測基于結果表,告警只能針對計費結果,告警結果有滯后性。
問題定位:
a. 效率低:檢測是運行在計費系統(tǒng)內(nèi)部的結果表上,需要多個數(shù)據(jù)接入方團隊及計費系統(tǒng)開發(fā)人員共同排查,確定問題發(fā)生的源頭及原因。
問題治理:
a. 責任不明:由于質(zhì)量檢測基于結果表,但造成問題的源頭多樣。無法通過對結果的檢測直接將問題歸屬到對應團隊,問題無法獲得及時的關注與處理。
數(shù)據(jù)質(zhì)量是計費系統(tǒng)的生命線,原有系統(tǒng)在質(zhì)量上的問題導致計費系統(tǒng)開發(fā)團隊大量的時間被消耗在對質(zhì)量問題的響應、排查與修復,無法集中精力投入在產(chǎn)品迭代上,也無法應對更多服務的接入。
1.3 解決方案
針對以上問題,我們決定重新構建數(shù)據(jù)質(zhì)量治理的能力。目標如下:
問題發(fā)現(xiàn)
a. 全問題覆蓋:從數(shù)據(jù)源頭出發(fā),所有不符合校驗規(guī)則的初始數(shù)據(jù)都可以被發(fā)現(xiàn)。
b. 及時性提高:數(shù)據(jù)異常在進入計費鏈路之前就可以被發(fā)現(xiàn),而非通過計費結果告警。
問題定位:
a. 提高效率:無需多方團隊協(xié)調(diào)排查,自動捕獲問題源頭及問題發(fā)生的原因。
問題治理:
a. 責任明確:問題產(chǎn)生的當下就告知相關責任人,明確問題治理的對應團隊。
通過分析數(shù)據(jù)質(zhì)量的案例,發(fā)現(xiàn)絕大部分數(shù)據(jù)質(zhì)量的問題來自于幾十個自建服務數(shù)據(jù)接入方。根據(jù)以上目標的梳理的問題的分析,我們決定引入Kafka Gatekeeper組件,重點解決自建服務接入的質(zhì)量問題,如圖1-2所示。該組件提供以下能力:
1)校驗前置:打點數(shù)據(jù)在進入計費邏輯前,先進行規(guī)則校驗,保證問題發(fā)現(xiàn)及時性。
2)規(guī)則可配置:校驗規(guī)則可隨時配置、隨時更新,保證規(guī)則檢測的全覆蓋。
3)自助排查:提供自助查詢看板,包括數(shù)據(jù)錯誤條數(shù),問題發(fā)生原因等信息,研發(fā)可自助查詢對應團隊的相關信息,提高問題定位效率。
4)自動告警:檢測發(fā)現(xiàn)不合規(guī)數(shù)據(jù)(如字段缺失、數(shù)據(jù)類型錯誤等)時,向數(shù)據(jù)來源的團隊發(fā)送告警,明確問題治理責任。
圖1-2
二、設計與核心實現(xiàn)
2.1 Kafka的相關背景知識
為了實現(xiàn)Kafka代理服務的數(shù)據(jù)校驗功能,需要解決以下兩個問題:
1)如何根據(jù)Kafka協(xié)議對消息進行解碼。
2)如何處理Kafka客戶端,服務端和代理之間的連接關系。
2.1.1 通訊協(xié)議
圖2-1
如圖2-1所示,Kafka請求只能由Client主動發(fā)到Broker,Broker針對每個請求回復響應給Client。
Kafka使用基于TCP的自定義二進制協(xié)議。它定義了客戶端和服務器之間的消息格式、消息傳遞方式和處理邏輯。所有消息都是通過長度來分隔,并且由基本類型組成。請求由請求類型(ApiKey),版本號(ApiVersion),相關性標識(CorrelationId),客戶端標識(ClientId)和請求消息(RequestMessage)組成。響應由相關性標識(CorrelationId)和響應消息組成(ResponseMessage)組成。
ApiKey用于確認Request的類型,以通過不同類型的數(shù)據(jù)格式解析請求。Request和Response通過CorrleationId來一一對應。
由于發(fā)送生產(chǎn)消息,僅包含兩種API--元數(shù)據(jù)(Metadata)和生產(chǎn)(Produce),本文僅關注這兩種API的請求和響應,協(xié)議格式見圖2-2。
圖2-2
Metadata是用于獲取元數(shù)據(jù)的API。元數(shù)據(jù)請求在攜帶topic_name時會返回topic相關的數(shù)據(jù),如果為空則返回所有主題。元數(shù)據(jù)響應返回的數(shù)據(jù)包括一串broker的數(shù)據(jù)信息,以及topic名、分區(qū)信息等。圖中省略部分內(nèi)容,僅展示和本文相關的部分。
Produce是用于將消息集發(fā)送到服務器的API。生產(chǎn)請求將攜帶目標topic,以及分區(qū)信息,其中分區(qū)信息中包含所要發(fā)送的具體消息記錄集合。生產(chǎn)響應返回的數(shù)據(jù)包括具體的請求結果。圖中省略部分內(nèi)容,僅展示和本文相關的部分。
通過了解以上兩種API的格式,可以基于協(xié)議格式進行解碼。
2.1.2 交互流程
處理連接關系,還需要了解Metadata、Produce協(xié)議的交互流程。
元數(shù)據(jù)請求可以發(fā)往任意broker。Kafka集群會提供Bootstrap地址,由此地址負載均衡到某一服務器并返回。客戶端提供一組topic,服務端返回元數(shù)據(jù)響應,包含所有的broker信息和相關的topic信息。broker信息中包括節(jié)點的IP地址,即客戶端真正發(fā)送生產(chǎn)信息的服務器地址。
生產(chǎn)請求將會發(fā)送到元數(shù)據(jù)請求中返回的某一服務器上,服務器端將會返回請求結果。
圖2-3
如圖2-3所示,將集群簡化為一個Broker,Produce的具體流程:
1)Client向Bootstrap地址發(fā)送元數(shù)據(jù)請求,查詢集群當前Broker列表。
2)Bootstrap真實響應的Server其實是(某一個)Broker,Broker返回了所有的信息包含在元數(shù)據(jù)響應中。
3)Client向真實的Broker地址發(fā)送生產(chǎn)請求。
4)Broker處理請求,并回復響應。
通過了解Kafka生產(chǎn)的基本流程,可以實現(xiàn)代理,接管并處理其中的連接關系。
2.2 Kafka Gatekeeper的設計和實現(xiàn)
Gatekeeper作為Kafka客戶端和服務端之間的代理,接受客戶端的請求對于指定內(nèi)容做數(shù)據(jù)校驗,并轉發(fā)給服務器,同時將服務器的響應返回給客戶端。
對于客戶端來說,僅需要將原本配置的Boostrap地址改成Gatekeeper的地址。
對于Gatekeeper來說,需要做到:
1)設計解碼器和解碼方案:解碼Kafka消息,從而進一步進行處理。
2)設計校驗器和校驗規(guī)則:進行數(shù)據(jù)校驗,提高數(shù)據(jù)質(zhì)量。
3)維護Boostrap地址和Gatekeeper地址之間的映射關系:處理客戶、Gatekeeper、服務間的連接。
Gatekeeper設計架構如圖2-4所示。
圖2-4
解碼器用于在處理請求時,根據(jù)Kafka協(xié)議和自定義的解碼方案解碼。當解析元數(shù)據(jù)請求時,根據(jù)自定義的映射關系修改返回的元數(shù)據(jù)響應。
校驗器當解析的是生產(chǎn)請求時,會根據(jù)自定義的校驗規(guī)則進行校驗。
映射關系被維護在Gatekeeper中處理連接關系。
Gatekeeper維護的映射關系中,由于Kafka的默認端口號是9092,"Gatekeeper的IP地址+9092端口"的連接將與"Bootstrap的IP+9092端口"做映射。"Gatekeeper的IP+port1"的連接將與"Broker1的IP+9092端口"做映射,"Gatekeeper的IP+port2"的連接將與"Broker2的IP+9092端口"做映射,依此類推。Gatekeeper就可以根據(jù)這個映射關系,處理來自客戶的請求,發(fā)送給相應服務端,并同樣處理來自服務端的響應。
總而言之,Kafka Gatekeeper監(jiān)聽了客戶端發(fā)來的請求,根據(jù)配置轉發(fā)給服務端,一方面解析了客戶端的生產(chǎn)信息做數(shù)據(jù)校驗,另一方面修改了服務端的元數(shù)據(jù)響應信息給用戶,以保證用戶的生產(chǎn)信息總是通過Gatekeeper進行轉發(fā)。
2.2.1 利用通訊協(xié)議進行解析
通過前文可知,每個請求和響應都有固定格式的header和具體的請求包。而由于Kafka每一種協(xié)議也都有固定的格式,Kafka協(xié)議中可使用的數(shù)據(jù)類型是固定的,且是按順序存儲的。
因此,只需給每種數(shù)據(jù)類型實現(xiàn)一個特定的編解碼方案,并通過header中攜帶的ApiKey和ApiVersion,確定某一個解碼格式,就可以根據(jù)收到的包序列化數(shù)據(jù)。
綜上所述,Gatekeeper的解碼器需要完成兩個任務:實現(xiàn)不同數(shù)據(jù)類型的序列化功能,以及根據(jù)版本確定協(xié)議格式。
以version-1的生產(chǎn)消息為例:
從編解碼角度來說,每個協(xié)議包都是由4字節(jié)的size開頭,后面再跟相應字節(jié)的請求包或響應包。解碼器首先會通過序列化功能解析了這個4字節(jié)的size,計算出請求包的大小。
同樣的,解碼器計算出2個字節(jié)的ApiKey和ApiVersion(和本文無關的其他字段暫時略過)。解碼器計算出生產(chǎn)請求的的ApiKey為0,ApiVersion為1。這樣解碼器的確認版本功能就能確定一個協(xié)議格式,再根據(jù)這個格式的數(shù)據(jù)類型去逐個做解析。
至此,Gatekeeper可以就根據(jù)不同類型,不同版本的的客戶端請求,完成解析。根據(jù)即時的解碼內(nèi)容,針對需要再進一步處理,可以保證問題發(fā)現(xiàn)的及時性。
2.2.2 利用交互流程進行連接處理
Gatekeeper的工作原理是在本地機器上打開tcp套接字,并在使用套接字時,代理連接到相關的Kafka服務,它將本地端口與真實的服務地址進行映射。
圖2-5
如圖2-5所示,用戶加入Gatekeeper服務后,Produce的具體流程:
1)當客戶端發(fā)起第一條元數(shù)據(jù)請求時,發(fā)送地址是Gatekeeper地址,請求將會被Gatekeeper監(jiān)聽到。
2)Gatekeeper發(fā)起一條到服務端的連接,把監(jiān)聽到的連接發(fā)送給Boostrap地址,同時存儲一份Boostrap地址和本地地址的映射。
3)元數(shù)據(jù)響應會返回一串Broker的相關信息,Gatekeeper接收到相關信息后,會解析內(nèi)容將Metadata數(shù)據(jù)中原本的節(jié)點IP信息,替換成Gatekeeper的地址。
4)當客戶端發(fā)送Produce請求時,通過接收到響應里的地址和Gatekeeper建立連接的。
綜上所述,客戶端的Produce流程都經(jīng)過Gatekeeper,Gatekeeper可以對所有的Kafka消息進行校驗。保證打點數(shù)據(jù)進入計費鏈條前先進入校驗邏輯,實現(xiàn)校驗檢測全覆蓋。
2.2.3 可配置化校驗與自助異常定位
Gatekeeper的目標,是提供一個針對Kafka消息的前置數(shù)據(jù)校驗代理,解決接入服務的數(shù)據(jù)質(zhì)量問題。從數(shù)據(jù)源頭入手,配置校驗規(guī)則檢查每個topic的數(shù)據(jù)是否合規(guī),定位異常數(shù)據(jù)來源,向相關團隊告警,并提供自助排查看板。
提供可自定義配置的校驗規(guī)則,可以隨時更新、訂正,并且提供根據(jù)解碼內(nèi)容明確責任人的功能。Gatekeeper的校驗器會根據(jù)配置的規(guī)則,對比分析解碼內(nèi)容做校驗。提供包括判斷字段類型、檢查字段缺失、以及符合CEL語法的校驗規(guī)則等功能。
以如下schema為例,TripCostAllocationProtocol約定某topic必須包括,不為空的字符串Name字段,和可選且大于零的整形Timestamp字段。
schema{
Name: "", //required
TimeStamp: 0, //optional
...
}
相應的,Gatekeeper配置如下,針對該topic的TripCostAllocationProtocol約定,配置校驗規(guī)則。在數(shù)據(jù)流經(jīng)代理時,根據(jù)規(guī)則全面檢測。
"Topics": [
{
"Name": "fake.topic",
"Owner": [
"Key":"Name"
],
"SchemaRules": [
{
"Name": "Name",
"Type": "string",
"Optional": false
},
{
"Name": "Timestamp",
"Type": int,
"Optional": true
"Rule": "TimeStamp>0"
}
]
}
]
以Timestamp字段非法零值為例,當Gatekeeper檢驗發(fā)現(xiàn),此條消息不符合配置的規(guī)則 "Timestamp>0",會根據(jù)配置的Owner,鎖定數(shù)據(jù)來源“Service A”并告警反饋給該團隊,明確問題責任。
{
Name:"Service A",
TimeStamp: 0,
}
此外,Gatekeeper也提供了自動告警功能和自助查詢功能。如圖2-6所示,處理用戶生產(chǎn)數(shù)據(jù)的流程以實線表示,是“用戶-代理-服務”的線性流程,全覆蓋所有數(shù)據(jù)一進行校驗。同時Gatekeeper向日志系統(tǒng)和監(jiān)控系統(tǒng)分別發(fā)送了校驗失敗指標和詳細信息日志。
圖2-6
校驗到不合規(guī)數(shù)據(jù)時,用戶接收監(jiān)控告警,通過監(jiān)控系統(tǒng)可以查看包括數(shù)據(jù)錯誤條數(shù),校驗通過率等內(nèi)容。
以上述錯誤為例,自建PaaS服務Service A收到fake.topic中生產(chǎn)了校驗不合法數(shù)據(jù)的告警。Service A的研發(fā)團隊通過檢查告警系統(tǒng)檢查告警信息。研發(fā)團隊可以跳轉到對應的日志系統(tǒng),以檢查錯誤日志以及校驗不通過字段的規(guī)則,根據(jù)錯誤日志自助修復數(shù)據(jù)。
通過自動告警,提高了問題的定位效率,明確了問題責任方。通過提供自助查詢看板,可視化展示校驗結果和異常來源,方便研發(fā)團隊自助修復數(shù)據(jù),閉環(huán)治理流程。
2.2.4 高可用部署
根據(jù)攜程的可用性最佳實踐,實現(xiàn)跨AZ高可用和數(shù)據(jù)校驗就近處理。如圖2-7所示,服務給用戶提供統(tǒng)一入口,并在AZ內(nèi)部署多個實例提供服務。
圖2-7
如圖2-8所示,(1) 是原始的Kafka客戶端和服務端交互過程示意圖,(2) 是單AZ內(nèi),增加了Kafka Gatekeeper做代理后的交互過程示意。
圖2-8
此前,Gatekeeper的地址需要承擔的三項責任--監(jiān)聽,廣播和提供入口:
1)監(jiān)聽來自客戶的元數(shù)據(jù)和生產(chǎn)請求。
2)返回元數(shù)據(jù)響應時,提供給用戶新的生產(chǎn)請求發(fā)送的廣播地址。
3)提供客戶端,用于替換原本Bootstrap的入口地址。
在高可用的部署架構下,Gatekeeper的地址不再承擔提供入口的責任,客戶端使用總的入口地址代替原本需要配置的Bootstrap地址。
2.2.5 技術挑戰(zhàn)
在某次重啟服務后,盡管服務看起來正常運行,并且新增接入的客戶端也符合預期,但部分已接入的Java客戶端總是連接失敗,直到客戶端自行重啟服務。經(jīng)過測試發(fā)現(xiàn),這種情況與客戶端刷新元數(shù)據(jù)的行為有關。
Kafka生產(chǎn)請求不可達時,客戶端會做一次元數(shù)據(jù)的刷新。響應這個刷新元數(shù)據(jù)請求的Broker會根據(jù)注冊中心返回當前可達的Broker節(jié)點列表。在第一次發(fā)送元數(shù)據(jù)請求時,客戶端連接的是Bootstrap地址(加入Gatekeeper的流程中,連接的是Gatekeeper的IP地址)。然而,在刷新元數(shù)據(jù)時,客戶端的行為有一些區(qū)別。
圖2-9
如圖2-9(1)所示,測試時使用的客戶端刷新MetaData的請求,和初始發(fā)送元數(shù)據(jù)請求一樣,發(fā)往Bootstrap地址。而Java客戶端只有第一次啟動時把元數(shù)據(jù)請求發(fā)給Boostrap地址,如圖(2)所示,Java客戶端的刷新MetaData的請是直接打給具體Broker地址的。
在加入Gatekeeper服務之后,如圖2-10(1)所示,監(jiān)聽請求的地址和返回的廣播地址是Gatekeeper自己的IP地址,廣播地址用于代替元數(shù)據(jù)響應中的的Broker地址信息。也就是說,Java客戶端在刷新元數(shù)據(jù)信息時也會將請求發(fā)往Gatekeeper的IP地址。而由于服務重啟,Gatekeeper實例的IP地址發(fā)生了變化。因此,客戶端也無法通過刷新元數(shù)據(jù)得到目前可達的服務地址,導致連接失敗。
圖2-10
總之,不同客戶端在刷新元數(shù)據(jù)的方式上存在差異。Java客戶端會緩存Gatekeeper的IP地址,當這個地址變得無效時,連接就會失敗。而當客戶端的連接總是發(fā)送到一個有效的負載均衡地址,因此不會出現(xiàn)這樣的問題。
因此,只需讓客戶端緩存的地址,也就是在元數(shù)據(jù)響應中返回的地址,總是有效的,就可以避免以上問題。
原本Gatekeeper的IP地址承擔了監(jiān)聽和廣播兩個功能,其中廣播地址與Broker地址一一對應。如圖2-10(2)所示,為了解決固定IP的問題,服務使用掛載的固定負載均衡地址替換原有廣播地址。同時,使用固定范圍的端口號代替隨機的端口號,且LoadBalancer上與Gatekeeper上映射的端口是完全一致的,就可以解決固定端口的問題。
這樣,即使Gatekeeper不像Kafka一樣使用注冊中心來注冊所有可達的地址,仍然可以確保客戶端始終能夠找到服務,而不會丟失連接。
三、總結
Gatekeeper是一個提供對Kafka數(shù)據(jù)進行校驗的工具,并實現(xiàn)校驗規(guī)則的可配置化。同時Gatekeeper還提供了可視化展示校驗結果和異常來源的監(jiān)控看板,并提供自助查詢錯誤日志的功能。在源頭實現(xiàn)定位異常,保證了規(guī)則檢測全覆蓋,并提供自動異常發(fā)現(xiàn)和自助異常定位服務,從而完成了治理閉環(huán),提升了數(shù)據(jù)質(zhì)量。
目前Gatekeeper的適用范圍僅限于FinOps計費系統(tǒng),但整體架構是針對Kafka消息設計的,因此它可以作為一個可復用的數(shù)據(jù)校驗代理。未來,Gatekeeper希望能提供一個通用的數(shù)據(jù)系統(tǒng)校驗處理能力,以解決更廣泛的數(shù)據(jù)質(zhì)量問題。