成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Flink 在 B 站的多元化探索與實(shí)踐

大數(shù)據(jù)
在過(guò)去的一年里,B站圍繞 Flink 主要做了三個(gè)方面的工作:平臺(tái)建設(shè)、增量化和 AI on Flink。實(shí)時(shí)平臺(tái)是實(shí)時(shí)業(yè)務(wù)的技術(shù)底座,也是 Flink 面向用戶的窗口,需要堅(jiān)持持續(xù)迭代優(yōu)化,不斷增強(qiáng)功能,提升用戶效率。

本文整理自嗶哩嗶哩基礎(chǔ)架構(gòu)部資深研發(fā)工程師張楊在 Flink Forward Asia 2021 平臺(tái)建設(shè)專場(chǎng)的演講。主要內(nèi)容包括:

  • 平臺(tái)建設(shè)
  • 增量化
  • AI On Flink

在過(guò)去的一年里,B站圍繞 Flink 主要做了三個(gè)方面的工作:平臺(tái)建設(shè)、增量化和 AI on Flink。實(shí)時(shí)平臺(tái)是實(shí)時(shí)業(yè)務(wù)的技術(shù)底座,也是 Flink 面向用戶的窗口,需要堅(jiān)持持續(xù)迭代優(yōu)化,不斷增強(qiáng)功能,提升用戶效率。增量化是我們?cè)谠隽炕瘮?shù)倉(cāng)和流批一體上的嘗試,在實(shí)時(shí)和離線之間找到一個(gè)更好的平衡,加速數(shù)倉(cāng)效率,解決計(jì)算口徑問(wèn)題。AI 方向,我們也正在結(jié)合業(yè)務(wù)做進(jìn)一步的探索,與 AIFlow 社區(qū)進(jìn)行合作,完善優(yōu)化機(jī)器學(xué)習(xí)工作流。

一、 平臺(tái)建設(shè)

1.1 基礎(chǔ)功能完善

在平臺(tái)的基礎(chǔ)功能方面,我們做了很多新的功能和優(yōu)化。其中兩個(gè)重點(diǎn)的是支持 Kafka 的動(dòng)態(tài) sink 和任務(wù)提交引擎的優(yōu)化。

我們遇到了大量這樣的 ETL 場(chǎng)景,業(yè)務(wù)的原始實(shí)時(shí)數(shù)據(jù)流是一條較大的混合數(shù)據(jù)流,包含了數(shù)個(gè)子業(yè)務(wù)數(shù)據(jù)。數(shù)據(jù)通過(guò) Kafka 傳輸,末端的每個(gè)子業(yè)務(wù)都對(duì)應(yīng)單獨(dú)的處理邏輯,每個(gè)子業(yè)務(wù)都去消費(fèi)全量數(shù)據(jù),再進(jìn)行過(guò)濾,這樣的資源消耗對(duì)業(yè)務(wù)來(lái)說(shuō)是難以接受的,Kafka 的 IO 壓力也很大。因此我們會(huì)開(kāi)發(fā)一個(gè) Flink 任務(wù),對(duì)混合數(shù)據(jù)流按照子業(yè)務(wù)進(jìn)行拆分,寫到子業(yè)務(wù)對(duì)應(yīng)的 topic 里,讓業(yè)務(wù)使用。

技術(shù)實(shí)現(xiàn)上,早期 Flink SQL 的寫法就是寫一個(gè) source 再寫多個(gè) sink,每個(gè) sink 對(duì)應(yīng)一個(gè)業(yè)務(wù)的 topic,這確實(shí)可以滿足短期的業(yè)務(wù)訴求,但存在的問(wèn)題也較多:

第一是數(shù)據(jù)的傾斜,不同的子業(yè)務(wù)數(shù)據(jù)量不同,數(shù)據(jù)拆分后,不同 sink 之處理的數(shù)據(jù)量也存在較大差別,而且 sink 都是獨(dú)立的 Kafka producer,高峰期間會(huì)造成 sink 之間資源的爭(zhēng)搶,對(duì)性能會(huì)有明顯的影響;

第二是無(wú)法動(dòng)態(tài)增減 sink,需要改變 Flink SQL 代碼,然后重啟任務(wù)才能完成增減 sink。過(guò)程中,不僅所有下游任務(wù)都會(huì)抖動(dòng),還有一個(gè)嚴(yán)重的問(wèn)題就是無(wú)法從 savepoint 恢復(fù),也就意味著數(shù)據(jù)的一致性無(wú)法保證;

第三是維護(hù)成本高,部分業(yè)務(wù)存在上百個(gè)子分流需求,會(huì)導(dǎo)致 SQL 太長(zhǎng),維護(hù)成本極高。

基于以上原因,我們開(kāi)發(fā)了一套 Kafka 動(dòng)態(tài) sink 的功能,支持在一個(gè) Kafka sink 里面動(dòng)態(tài)地寫多個(gè) topic 數(shù)據(jù),架構(gòu)如上圖。我們對(duì) Kafka 表的 DDL 定義進(jìn)行了擴(kuò)展,在 topic 屬性里支持了 UDF 功能,它會(huì)根據(jù)入倉(cāng)的數(shù)據(jù)計(jì)算出這條數(shù)據(jù)應(yīng)該寫入哪個(gè) Kafka 集群和 topic。sink 收到數(shù)據(jù)后會(huì)先調(diào)用 UDF 進(jìn)行計(jì)算,拿到結(jié)果后再進(jìn)行目標(biāo)集群和 topic 數(shù)據(jù)的寫入,這樣業(yè)務(wù)就不需要在 SQL 里編寫多個(gè) sink,代碼很干凈,也易于維護(hù),并且這個(gè) sink 被所有 topic 共用,不會(huì)產(chǎn)生傾斜問(wèn)題。UDF 直接面向業(yè)務(wù)系統(tǒng),分流規(guī)則也會(huì)平臺(tái)化,業(yè)務(wù)方配置好規(guī)則后,分流實(shí)施自動(dòng)生效,任務(wù)不需要做重啟。而且為了避免 UDF 的性能問(wèn)題,避免用戶自己去開(kāi)發(fā) UDF,我們提供了一套標(biāo)準(zhǔn)的分流,做了大量的緩存優(yōu)化,只要按照規(guī)范定義好分流,規(guī)則的業(yè)務(wù)表就可以直接使用 UDF。

目前內(nèi)部幾個(gè)千億級(jí)別的分流場(chǎng)景,都在這套方案下高效運(yùn)行中。

基礎(chǔ)功能上做的第二個(gè)優(yōu)化就是任務(wù)的提交引擎優(yōu)化。做提交器的優(yōu)化主要是因?yàn)榇嬖谝韵聨讉€(gè)問(wèn)題:

第一,本地編譯問(wèn)題。Flink SQL 任務(wù)在 Yarn 上的部署有三種模式:per-job、application 和 yarn-session。早前我們一直沿用 per-job 模式,但是隨著任務(wù)規(guī)模變大,這個(gè)模式出現(xiàn)了很多的問(wèn)題。per-job 模式下,任務(wù)的編譯是在本地進(jìn)行再提交到遠(yuǎn)程 app master,編譯消耗提交引擎的服務(wù)性能,在短時(shí)批量操作時(shí)很容易導(dǎo)致性能不足;

第二,多版本的支持問(wèn)題。我們支持多個(gè) Flink 版本,因此在版本與提交引擎耦合的情況下,需要維護(hù)多個(gè)不同代碼版本的提交引擎,維護(hù)成本高;

第三,UDF 的加載。我們一直使用 Flink 命令里的 -c 命令進(jìn)行 UDF 傳遞,UDF 代碼包存在 UDFS 上,通過(guò) Hadoop 的 web HDFS 協(xié)議進(jìn)行 cluster 加載,一些大的任務(wù)啟動(dòng)時(shí),web HDFS 的 HTTP 端口壓力會(huì)瞬間增大,存在很大的穩(wěn)定隱患;

第四,代碼包的傳輸效率。用戶代碼包或者 Flink 引擎代碼包都要做多次的上傳下載操作,遇到 HDFS 反應(yīng)較慢的場(chǎng)景,耗時(shí)較長(zhǎng),而實(shí)時(shí)任務(wù)希望做到極致的快速上下線。

因此我們做了提交器的優(yōu)化:

首先引入了 1.11 版本以上支持的 application 模式,這個(gè)模式與 per-job 最大的區(qū)別就是 Flink 任務(wù)的編譯全部移到了 APP master 里做,這樣就解決了提交引擎的瓶頸問(wèn)題;

在多版本的支持上面,我們對(duì)提交引擎也做了改造,把提交器與 Flink 的代碼徹底解耦,所有依賴 Flink 代碼的操作全部抽象了標(biāo)準(zhǔn)的接口放到了 Flink 源碼側(cè),并在 Flink 源碼側(cè)增加了一個(gè)模塊,這個(gè)模塊會(huì)隨著 Flink 的版本一起升級(jí)提交引擎,對(duì)通用接口的調(diào)用全部進(jìn)行反射和緩存,在性能上也是可接受的;

而且 Flink 的多版本源碼全部按照 maled 模式進(jìn)行管理,存放在 HDFS。按照業(yè)務(wù)指定的任務(wù)版本,提交引擎會(huì)從遠(yuǎn)程下載 Flink 相關(guān)的版本包緩存到本地,所以只需要維護(hù)一套提交器的引擎。Flink 任何變更完全和引擎無(wú)關(guān),升級(jí)版本提交引擎也不需要參與;

完成 application 模式升級(jí)后,我們對(duì) UDF 和其他資源包的上傳下載機(jī)制也進(jìn)行了修改,通過(guò) HDFS 遠(yuǎn)程直接分發(fā)到 GM/TM 上,減少了上傳下載次數(shù),同時(shí)也避免了 cluster 的遠(yuǎn)程加載。

1.2 新任務(wù)構(gòu)建模式

平臺(tái)之前支持 Flink 的構(gòu)建模式主要有兩種, SQL 和 JAR 包。兩者的優(yōu)劣勢(shì)都很明顯,SQL 簡(jiǎn)單易用門檻低,但是不夠靈活,比如一些定時(shí)操作在 SQL 里面無(wú)法進(jìn)行。JAR 包功能完善也靈活,但是門檻高,需要學(xué)習(xí) Flink datastream 一整套 API 的概念,非開(kāi)發(fā)人員難以掌握,而我們大量的用戶是數(shù)倉(cāng),這種JAR包的任務(wù)難以標(biāo)準(zhǔn)化管理。業(yè)務(wù)方大多希望使用 SQL,避免使用 JAR 包。

我們調(diào)研了平臺(tái)已有的 Datastream JAR 包任務(wù),發(fā)現(xiàn)大部分的 JAR 包任務(wù)還是以 Table API 為主,只有少量過(guò)程用 Datastream 做了一些數(shù)據(jù)的轉(zhuǎn)換,完成之后還是注冊(cè)成了 Table 進(jìn)行 Table 操作。如果平臺(tái)可以支持在 SQL 里面做一些復(fù)雜的自定義轉(zhuǎn)換,業(yè)務(wù)其實(shí)完全不需要編寫代碼。

因此我們支持了一種新的任務(wù)構(gòu)建模式——算子化,模塊化地構(gòu)建一個(gè) Flink 任務(wù),混合 JAR 包與 SQL,在進(jìn)行任務(wù)構(gòu)建時(shí),先定義一段 SQL,再定義一個(gè) JAR 包,再接一段 SQL,每段都稱為算子,算子之間相互串聯(lián),構(gòu)成一個(gè)完整的任務(wù)。

采用 Flink 標(biāo)準(zhǔn)的 SQL 語(yǔ)法,對(duì) JAR 包進(jìn)行了接口的限制,必須繼承平臺(tái)的接口定義進(jìn)行開(kāi)發(fā)。輸入輸出都是定義好的 Datastream。它比 UDF 的擴(kuò)展性更強(qiáng),靈活性也更好。而且整個(gè)任務(wù)的輸入輸出基本可以做到和 SQL 同級(jí)別的管控力,算子的開(kāi)發(fā)也比純 JAR 包簡(jiǎn)單得多,不需要學(xué)習(xí)太多 Flink API 的操作,只需要對(duì) Datastream 進(jìn)行變換。而且對(duì)于一些常用的公共算子,平臺(tái)可以統(tǒng)一開(kāi)發(fā)提供,擁有更專業(yè)的性能優(yōu)化,業(yè)務(wù)方只要引用即可。

目前在實(shí)時(shí)數(shù)倉(cāng)等一些偏固定業(yè)務(wù)的場(chǎng)景,我們都在嘗試進(jìn)行標(biāo)準(zhǔn)化算子的推廣和使用。

1.3 智能診斷

平臺(tái)建設(shè)的第三點(diǎn)是流任務(wù)的智能診斷。目前實(shí)時(shí)支持的業(yè)務(wù)場(chǎng)景包括 ETL、AI、數(shù)據(jù)集成等,且任務(wù)規(guī)模增長(zhǎng)速度很快。越來(lái)越大的規(guī)模對(duì)平臺(tái)的服務(wù)能力也提出了更高的要求。

此前,平臺(tái)人員需要花費(fèi)很多的時(shí)間在協(xié)助業(yè)務(wù)解決資源或各種業(yè)務(wù)問(wèn)題上,主要存在以下幾個(gè)方面的問(wèn)題:

  • 資源配置:初始資源確認(rèn)困難,碎片化嚴(yán)重,使用資源周期性變化;
  • 性能調(diào)優(yōu):數(shù)據(jù)傾斜,網(wǎng)絡(luò)資源優(yōu)化,state 性能調(diào)優(yōu),gc 性能調(diào)優(yōu);
  • 錯(cuò)誤診斷:任務(wù)失敗原因分析,修復(fù)建議。

這些問(wèn)題日常都靠平臺(tái)人員兜底,規(guī)模小的時(shí)候大家勉強(qiáng)可以負(fù)擔(dān),但是規(guī)模快速變大后已經(jīng)完全無(wú)力消化,需要一套自動(dòng)化的系統(tǒng)來(lái)解決這些問(wèn)題。

因此我們做了一套流任務(wù)的智能診斷系統(tǒng),架構(gòu)如上圖。

系統(tǒng)會(huì)持續(xù)抓取任務(wù)運(yùn)行時(shí)的 metrics 進(jìn)行性能分析,分析完成后推給用戶,讓用戶自己執(zhí)行具體的優(yōu)化改進(jìn)操作;也會(huì)實(shí)時(shí)抓取任務(wù)失敗的日志,并與詞庫(kù)進(jìn)行匹配,將錯(cuò)誤進(jìn)行翻譯,使用戶更容易理解,同時(shí)也會(huì)給出更好理解的解決方案,讓用戶自行進(jìn)行故障處理;同時(shí)還會(huì)根據(jù)任務(wù)的歷史運(yùn)行資源進(jìn)行自動(dòng)化縮容處理,解決資源浪費(fèi)和資源不足的問(wèn)題。

目前此功能已經(jīng)節(jié)省了整個(gè)隊(duì)列 10% 的資源左右,分擔(dān)了相當(dāng)一部分平臺(tái)的運(yùn)維壓力,在未來(lái)我們會(huì)持續(xù)進(jìn)行優(yōu)化迭代,更進(jìn)一步提高這套系統(tǒng)在自動(dòng)化運(yùn)維上面的能力以及覆蓋度。

未來(lái),在提交引擎方面,我們希望融合 Yarn session 模式與 application 模式做 session 的復(fù)用,解決任務(wù)上線的資源申請(qǐng)效率問(wèn)題。同時(shí)希望大 state 任務(wù)也能夠在 session 的基礎(chǔ)上復(fù)用本地的 state,啟動(dòng)時(shí)無(wú)需重新下載 state。

智能診斷方面,我們希望實(shí)現(xiàn)更多自動(dòng)化的操作,實(shí)現(xiàn)自動(dòng)進(jìn)行優(yōu)化改進(jìn),而不需要用戶手動(dòng)操作,做到用戶低感知;擴(kuò)容縮容也會(huì)持續(xù)提速,目前縮容的頻率只在天級(jí),擴(kuò)容還未實(shí)現(xiàn)自動(dòng)化。未來(lái)我們希望整個(gè)操作的周期和頻率做到分鐘級(jí)的自動(dòng)化。

算子方面,我們希望能統(tǒng)一目前的 SQL 和 JAR 包兩種模式,統(tǒng)一任務(wù)構(gòu)建方式,讓用戶以更低的成本更多復(fù)雜的操作,平臺(tái)也更方便管理。

二、增量化

上圖是我們?cè)缙诘臄?shù)據(jù)架構(gòu),是典型的 Lambda 架構(gòu)。實(shí)時(shí)和離線從源頭上就完全分離、互不干涉,實(shí)時(shí)占較低,離線數(shù)倉(cāng)是核心的數(shù)倉(cāng)模型,占主要的比例,但它存在幾個(gè)明顯的問(wèn)題。

第一,時(shí)效性。數(shù)倉(cāng)模型是分層架構(gòu),層與層之間的轉(zhuǎn)換靠調(diào)度系統(tǒng)驅(qū)動(dòng),而調(diào)度系統(tǒng)是有周期的,常見(jiàn)的基本都是天或小時(shí)。源頭生產(chǎn)的數(shù)據(jù),數(shù)倉(cāng)各層基本需要隔一天或幾個(gè)小時(shí)才可見(jiàn),無(wú)法滿足實(shí)時(shí)性要求稍高的場(chǎng)景;

第二,數(shù)據(jù)的使用效率低。ETL 和 adhoc 的數(shù)據(jù)使用完全一樣,沒(méi)有針對(duì)性的讀寫優(yōu)化,也沒(méi)有按照用戶的查詢習(xí)慣進(jìn)行重新組織,缺乏數(shù)據(jù)布局優(yōu)化的能力。

針對(duì)第一個(gè)問(wèn)題,是否全部實(shí)時(shí)化即可?但是實(shí)時(shí)數(shù)倉(cāng)的成本高,而且不太好做大規(guī)模的數(shù)據(jù)回溯。大部分業(yè)務(wù)也不需要做到 Kafka 的秒級(jí)時(shí)效。第二個(gè)問(wèn)題也不好解決,流式寫入為了追求效率,對(duì)數(shù)據(jù)的布局能力較弱,不具備數(shù)據(jù)的重新組織能力。因此我們?cè)趯?shí)時(shí)和離線之間找到了一個(gè)平衡——做分鐘級(jí)的增量化。

我們采用 Flink 作為計(jì)算引擎,它的 checkpoint 是一個(gè)天然的增量化機(jī)制,實(shí)時(shí)任務(wù)進(jìn)行一次 checkpoint,產(chǎn)出一批增量數(shù)據(jù)進(jìn)行增量化處理。數(shù)倉(cāng)來(lái)源主要有日志數(shù)據(jù)和 binlog 數(shù)據(jù),日志數(shù)據(jù)使用 Append 傳統(tǒng)的 HDFS 存儲(chǔ)即可做到增量化的生產(chǎn);binlog 數(shù)據(jù)是 update 模式,但 HDFS 對(duì) update 的支持并不好,因此我們引入了 Hudi 存儲(chǔ),它能夠支持 update 操作,并且具備一定的數(shù)據(jù)布局能力,同時(shí)它也可以做 Append 存儲(chǔ),并且能夠解決 HDFS 的一些小文件問(wèn)題。因此日志數(shù)據(jù)也選擇了 Hudi 存儲(chǔ),采用 Append 模式。

最終我們的增量化方案由 Flink 計(jì)算引擎 + Hudi 存儲(chǔ)引擎構(gòu)成。

增量化場(chǎng)景的落地上,考慮到落地的復(fù)雜性,我們先選取了業(yè)務(wù)邏輯相對(duì)簡(jiǎn)單、沒(méi)有復(fù)雜聚合邏輯的 ODS 和 DWD 層進(jìn)行落地。目前的數(shù)據(jù)是由 Flink 直接寫到 Hive 的 ODS 層,我們對(duì)此進(jìn)行了針對(duì)性的適配,支持了 Hive 表的增量化讀取,開(kāi)發(fā)了 HDFSStreamingSource,同時(shí)為了避免對(duì) HDFS 路徑頻繁掃描的壓力,ODS 層寫入時(shí)會(huì)進(jìn)行索引創(chuàng)建,記錄寫入的文件路徑和時(shí)間,只需要追蹤索引文件即可。

source 也是分層架構(gòu),有文件分發(fā)層和讀取層,文件分發(fā)層進(jìn)行協(xié)調(diào),分配讀取文件數(shù),防止讀取層某個(gè)文件讀取過(guò)慢堆積過(guò)多文件,中間的轉(zhuǎn)換能夠支持 FlinkSQL 操作,具備完整的實(shí)時(shí)數(shù)倉(cāng)的能力。

sink 側(cè)我們引入了 Hudi connector,支持?jǐn)?shù)據(jù) Append 寫入 Hudi,我們還對(duì) Hudi 的 compaction 機(jī)制進(jìn)行了一些擴(kuò)展,主要有三個(gè):DQC 檢測(cè)、數(shù)據(jù)布局的優(yōu)化以及映射到 Hive 表的分區(qū)目錄。目前數(shù)據(jù)的布局依舊還很弱,主要依賴 Hudi 本身的 min、max 和 bloom 的優(yōu)化。

完成所有上述操作后,ODS 到 DWD 的數(shù)據(jù)時(shí)效性有了明顯提升。

從數(shù)據(jù)生產(chǎn)到 DWD 可見(jiàn),提高到了分鐘級(jí)別;DWD 層的生產(chǎn)完成時(shí)間也從傳統(tǒng)的 2:00~5:00 提前到了凌晨 1 點(diǎn)之前。此外,采用 Hudi 存儲(chǔ)也為日后的湖倉(cāng)一體打下了以一個(gè)好的基礎(chǔ)。

除了日志數(shù)據(jù),我們對(duì) CDC 也采用這套方案進(jìn)行加速。基于 Flink 的 CDC 能力,針對(duì) MySQL 的數(shù)據(jù)同步實(shí)現(xiàn)了全增量一體化操作。依賴 Hudi 的 update 能力,單任務(wù)完成了 MySQL 的數(shù)據(jù)同步工作,并且數(shù)據(jù)只延遲了一個(gè) checkpoint 周期。CDC 暫時(shí)不支持全量拉取,需要額外進(jìn)行一次全量的初始化操作,其他的流程則完全一致。

Hudi 本身的模型和離線的分區(qū)全量有較大的區(qū)別,為了兼容離線調(diào)度需要的分區(qū)全量數(shù)據(jù),我們也修改了 Hudi 的 compaction 機(jī)制。在做劃分區(qū)的 compaction 時(shí)會(huì)做一次數(shù)據(jù)的全量拷貝,生成全量的歷史數(shù)據(jù)分區(qū),映射到 Hive 表的對(duì)應(yīng)分區(qū)。同時(shí)對(duì)于 CDC 場(chǎng)景下的數(shù)據(jù)質(zhì)量,我們也做了很多的保障工作。

為了保證 CDC 數(shù)據(jù)的一致性,我們從以下 4 個(gè)方面進(jìn)行了完善和優(yōu)化:

第一,binlog 條數(shù)的一致性。按照時(shí)間窗口進(jìn)行 binlog 生產(chǎn)側(cè)和消費(fèi)側(cè)的條數(shù)校驗(yàn),避免中間件丟數(shù)據(jù);

第二,數(shù)據(jù)內(nèi)容抽樣檢測(cè)。考慮到成本,我們?cè)?DB 端和源端、Hudi 存儲(chǔ)端抽樣增量數(shù)據(jù)進(jìn)行內(nèi)容的精確比較,避免 update 出錯(cuò);

第三,全鏈路的黑盒測(cè)試。測(cè)試庫(kù)表模擬了線上情況,進(jìn)行 7×24 小時(shí)不間斷的 Kafka 生產(chǎn) MySQL 數(shù)據(jù),然后串通整套流程防止鏈路故障;

第四,定期的全量對(duì)比。業(yè)務(wù)的庫(kù)表一般比較大,歷史數(shù)據(jù)會(huì)低頻地定期進(jìn)行全量比對(duì),防止抽樣觀測(cè)漏掉的錯(cuò)誤。

剛開(kāi)始使用 Hudi 的時(shí)候,Hudi on Flink 還是處于初級(jí)的階段,因此存在大量問(wèn)題,我們也一起和 Hudi 社區(qū)做了大量?jī)?yōu)化工作,主要有 4 個(gè)方面:Hudi 表的冷啟動(dòng)優(yōu)化、checkpoint 一致性問(wèn)題解決、Append 效率低的優(yōu)化以及 get list 的性能問(wèn)題。

首先是冷啟動(dòng)的問(wèn)題。Hudi 的索引存儲(chǔ)在 Flink state 里,一張存在的 Hudi 表如果要通過(guò) Flink 進(jìn)行增量化更新寫入,就必然面臨一個(gè)問(wèn)題:如何把 Hudi 表已有的信息寫入到 Flink state 里。

MySQL 可以借助 Flink CDC 完成全量 + 增量的過(guò)程構(gòu)建,可以繞開(kāi)從已有 Hudi 表冷啟動(dòng)的過(guò)程,但是 TiDB 不行,它的存量表在借助別的手段構(gòu)建完之后,想要增量化就會(huì)面臨如何從 FlinkSQL 冷啟動(dòng)的問(wèn)題。

社區(qū)有個(gè)原始方案,在記錄所有的算子 BucketAssigner 里面讀取全部的 Hudi 表數(shù)據(jù),然后進(jìn)行 state 構(gòu)建,從功能上是可行的,但是在性能上根本無(wú)法接受,尤其是大表,由于 Flink 的 key state 機(jī)制原理,BucketAssigner 每個(gè)并發(fā)度都要讀取全表數(shù)據(jù),然后挑選出屬于當(dāng)前這個(gè)并發(fā)的數(shù)據(jù)存儲(chǔ)到自己的 state 里面,每個(gè)并方案都要去讀全量的表,這在性能上難以滿足。

業(yè)務(wù)能啟動(dòng)的時(shí)間太長(zhǎng)了,很多百億級(jí)別的表能啟動(dòng)的時(shí)間可能是在幾個(gè)小時(shí),而且讀取的數(shù)據(jù)太多,很容易失敗。

和社區(qū)進(jìn)行了溝通交流后,他們提供了一套全新的方案,新增了獨(dú)立的 Bootstrap 機(jī)制,專門負(fù)責(zé)冷啟動(dòng)過(guò)程。Bootstrap 由 coordinator 和 IndexBootstrap 兩個(gè)算子組成,IndexBootstrap 負(fù)責(zé)讀取工作,coordinator 負(fù)責(zé)協(xié)調(diào)分配文件讀取,防止單個(gè) IndexBootstrap 讀取速度慢而降低整個(gè)初始化流程的效率。

IndexBootstrap 算子讀取到數(shù)據(jù)后,會(huì)按照與業(yè)務(wù)數(shù)據(jù)一樣的 Keyby 規(guī)則,Keyby 到對(duì)應(yīng)的 BucketAssigner 算子上,并在數(shù)據(jù)上面打標(biāo),告知 BucketAssigner 這條數(shù)據(jù)是有 Bootstrap 的,不需要往下游 writer 發(fā)送。整個(gè)流程里,原始數(shù)據(jù)只需讀取一遍,而且是多并發(fā)一起讀,效率獲得了極大的提升。而且 BucketAssigner 只需要處理自己應(yīng)該處理的數(shù)據(jù),不再需要處理全表的數(shù)據(jù)。

其次是 Hudi 的 checkpoint 一致性問(wèn)題。Hudi on checkpoint 在每次 checkpoint 完成的時(shí)候會(huì)進(jìn)行一次 commit 操作,具體流程是 writer 算子在 checkpoint 的時(shí)候 flush 內(nèi)存數(shù)據(jù),然后給 writer coordinator 算子匯報(bào)匯總信息,writer coordinor 算子收到匯報(bào)信息時(shí)會(huì)將其緩存起來(lái),checkpoin 完成后,收到 notification 信息時(shí)會(huì)進(jìn)行一次 commit 操作。

但是在 Flink 的 checkpoint 機(jī)制里,notification 無(wú)法保證一定成功,因?yàn)樗⒉辉?checkpoint 的生命周期里,而是一個(gè)回調(diào)操作,是在 checkpoin 成功后執(zhí)行。checkpoin 成功后,如果這個(gè)接口還沒(méi)有執(zhí)行完成,commit 操作就會(huì)丟失,也就意味著 checkpoint 周期內(nèi)的數(shù)據(jù)會(huì)丟失。

針對(duì)上述問(wèn)題,我們進(jìn)行了重構(gòu)。Writer 算子在 cehckpoint 時(shí),會(huì)對(duì)匯報(bào)的 writer coordinator 的信息進(jìn)行 state 持久化,任務(wù)重啟后重新匯報(bào)給 writer coordinator 算子。writer coordinator 算子再收集所有 writer 算子信息并做一次 commit 判斷,確保對(duì)應(yīng)的 commit 已經(jīng)完成。此時(shí),Writer 算子也會(huì)保持阻塞,確保上次持久化的 commit 完成之后才會(huì)處理最新的數(shù)據(jù),這樣就對(duì)齊了 Hudi 與 Flink 的 checkpoint 機(jī)制,保證了邊界場(chǎng)景數(shù)據(jù)的一致性。

第三是針對(duì) Hudi 在 Append 寫入場(chǎng)景下的優(yōu)化。

由于 Append 模式是復(fù)用 update 模式的代碼,所以在沒(méi)有重復(fù) key 的 Append 場(chǎng)景下,很多操作是可以簡(jiǎn)化的,因?yàn)?update 為了處理重復(fù),需要做很多額外的操作。如果能夠簡(jiǎn)化這些操作,吞吐能力可以有較大的提升。

第一個(gè)操作是小文件的查找,每次 checkpoint 后,update 都會(huì)重新 list 文件,然后從文件中找到大小不達(dá)標(biāo)的文件繼續(xù) open 并寫入。update 場(chǎng)景存在傾斜,會(huì)造成很多文件大小不均勻,但是 Append 場(chǎng)景不存在這種問(wèn)題,它所有的文件大小都很均勻;

第二個(gè)是 keyby。在 update 的模式下面,單個(gè) key 只能被一個(gè)節(jié)點(diǎn)處理,因此上游需要按照 Hudi key 進(jìn)行 keyby 操作。但是 Append 場(chǎng)景沒(méi)有重復(fù) key,可以直接用 chain 代替 keyby,大大減少了節(jié)點(diǎn)之間序列化傳輸?shù)拈_(kāi)銷。同時(shí) Append 場(chǎng)景下不存在內(nèi)存合并,整體效率也會(huì)更高。

最后一個(gè)是 GetListing 的優(yōu)化。Hudi 表與底層 HDFS 文件的映射是通過(guò) ViewManager 來(lái)做的,Hudi table 對(duì)象和 TimelineService 都會(huì)自己去初始化一個(gè) ViewManager,每個(gè) ViewManager 在初始化的時(shí)候都會(huì)進(jìn)行 HDFS 目錄的 list 操作,由于每個(gè)并發(fā)都持有多個(gè) Hudi table 或 TimelineService,會(huì)造成大并發(fā)任務(wù)啟動(dòng)時(shí) HDFS 的壓力很大。我們對(duì) TimelineService 進(jìn)行了單例化的優(yōu)化,保證每個(gè)進(jìn)程只有一 TimelineService,能夠數(shù)倍地降低 HDFS list 的壓力。后續(xù)我們還會(huì)基于 Flink 的 coordinator 機(jī)制做任務(wù)級(jí)別的單例化。

未來(lái),我們會(huì)繼續(xù)挖掘增量的能力,給業(yè)務(wù)帶來(lái)更多的價(jià)值。

三、AI on Flink

傳統(tǒng)的機(jī)器學(xué)習(xí)鏈路里數(shù)據(jù)的傳輸、特征的計(jì)算以及模型的訓(xùn)練,都是離線處理的,存在兩個(gè)大的問(wèn)題。

第一個(gè)是時(shí)效性低,模型和特征的更新周期基本是 t+1 天或者 t+1 小時(shí),在追求時(shí)效性的場(chǎng)景下體驗(yàn)并不好。第二個(gè)是計(jì)算訓(xùn)練的效率很低,必須等天或小時(shí)的分區(qū)數(shù)據(jù)全部準(zhǔn)備好之后才能開(kāi)始特征計(jì)算和訓(xùn)練。全量分區(qū)數(shù)據(jù)導(dǎo)致計(jì)算和訓(xùn)練的壓力大。

在實(shí)時(shí)技術(shù)成熟后,大部分模型訓(xùn)練流程都切換到實(shí)時(shí)架構(gòu)上,數(shù)據(jù)傳輸、特征計(jì)算和訓(xùn)練都可以做到幾乎實(shí)時(shí),從全量變成了短時(shí)的小批量增量進(jìn)行,訓(xùn)練的壓力也大大減輕。同時(shí)由于實(shí)時(shí)對(duì)離線的兼容性,在很多場(chǎng)景比如特征回補(bǔ)上,也可以嘗試使用 Flink 的流批一體進(jìn)行落地。

上圖是我們典型的機(jī)器學(xué)習(xí)鏈路圖。從圖上可以看出,樣本數(shù)據(jù)生產(chǎn)特征的計(jì)算、模型的訓(xùn)練和效果的評(píng)估都大量實(shí)時(shí)化,中間也夾雜著少量離線過(guò)程,比如一些超長(zhǎng)周期的特征計(jì)算。

同時(shí)也可以看出,完整的業(yè)務(wù)的模型訓(xùn)練鏈路長(zhǎng),需要管理和維護(hù)大量的實(shí)時(shí)任務(wù)和離線任務(wù)。出現(xiàn)故障的時(shí)候,具體問(wèn)題的定位也異常艱難。如何在整個(gè)機(jī)器學(xué)習(xí)的鏈路中同時(shí)管理號(hào)這么多實(shí)時(shí)和離線任務(wù),并且讓任務(wù)之間的協(xié)同和調(diào)度有序進(jìn)行、高效運(yùn)維,是我們一直在思考的問(wèn)題。

因此我們引入了 Flink 生態(tài)下 AIFlow 系統(tǒng)。AIFlow 本身的定位就是做機(jī)器學(xué)習(xí)鏈路的管理,核心的機(jī)器計(jì)算引擎是 Flink,這和我們的訴求不謀而合。這套系統(tǒng)有三個(gè)主要的特性符合我們的業(yè)務(wù)需求。

第一,流批的混合調(diào)度。在我們實(shí)際的業(yè)務(wù)生產(chǎn)上,一套完整的實(shí)時(shí)鏈路都會(huì)夾雜著實(shí)時(shí)和離線兩種類型的任務(wù)。AIFlow 支持流批的混合調(diào)度,支持?jǐn)?shù)據(jù)依賴與控制依賴,能夠很好地支持我們現(xiàn)有的業(yè)務(wù)形態(tài)。并且未來(lái)在 Flink 流批一體方面也會(huì)有更多的發(fā)揮空間;

第二,元數(shù)據(jù)的管理,AIFlow 對(duì)所有數(shù)據(jù)和模型都支持版本管理。有了版本管理,各種實(shí)驗(yàn)效果和實(shí)驗(yàn)參數(shù)就都可追溯;

第三,開(kāi)放的 notification 機(jī)制。整個(gè)鏈路中存在很多的外部系統(tǒng)節(jié)點(diǎn),難以歸納到平臺(tái)內(nèi)部,但是通過(guò) notification 機(jī)制,可以打通 AIFlow 內(nèi)部節(jié)點(diǎn)與外部節(jié)點(diǎn)的依賴。整套系統(tǒng)的部署分為三部分,notification service、 meta service 以及 scheduler,擴(kuò)展性也很好,我們?cè)趦?nèi)部化的過(guò)程中實(shí)現(xiàn)了很多自己的擴(kuò)展。

實(shí)時(shí)平臺(tái)在今年引入 AIFlow 的之后已經(jīng)經(jīng)歷了兩個(gè)版本的迭代,V2 版本是社區(qū) release 之前的一個(gè)內(nèi)部版本,我們進(jìn)行了分裝提供試用。V3 版本是今年 7 月社區(qū)正式 release 之后,我們進(jìn)行了版本的對(duì)接。

AIFlow 的構(gòu)建使用 Python 進(jìn)行描述,運(yùn)行時(shí)會(huì)有可視化的節(jié)點(diǎn)展示,可以很方便地追蹤各個(gè)節(jié)點(diǎn)的狀態(tài),運(yùn)維也可以做到節(jié)點(diǎn)級(jí)的管理,不需要做整個(gè)鏈路級(jí)別的運(yùn)維。

未來(lái)我們會(huì)對(duì)這套系統(tǒng)在流批一體、特征管理以及模型訓(xùn)練三個(gè)方向進(jìn)行重點(diǎn)的迭代與開(kāi)發(fā),更好地發(fā)揮它的價(jià)值。

責(zé)任編輯:未麗燕 來(lái)源: Apache Flink
相關(guān)推薦

2021-05-20 09:55:23

Apache Flin阿里云大數(shù)據(jù)

2024-05-13 10:44:22

云計(jì)算

2023-11-03 12:54:00

KAFKA探索中間件

2024-07-08 14:41:51

2011-05-05 14:52:10

無(wú)縫拼接拼接大屏幕

2023-02-28 12:12:21

語(yǔ)音識(shí)別技術(shù)解碼器

2022-04-24 11:27:05

邊緣計(jì)算數(shù)據(jù)自動(dòng)駕駛

2025-02-26 01:17:57

2021-03-17 07:59:36

邊緣計(jì)算遠(yuǎn)程辦公數(shù)字化轉(zhuǎn)型

2019-04-30 09:00:33

SQL數(shù)據(jù)庫(kù)Apache Flin

2023-04-04 12:38:50

GPT機(jī)器人LLM

2022-09-15 15:18:23

計(jì)算實(shí)踐

2014-01-15 16:46:07

多元化

2022-10-08 15:41:08

分布式存儲(chǔ)

2015-05-28 17:34:50

順豐田民IT驅(qū)動(dòng)

2015-12-14 17:36:16

5G無(wú)線網(wǎng)絡(luò)

2010-05-13 23:34:39

統(tǒng)一通信環(huán)境

2023-07-19 08:58:00

數(shù)據(jù)管理數(shù)據(jù)分析

2017-05-18 11:43:41

Android模塊化軟件

2024-04-26 12:13:45

NameNodeHDFS核心
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 成人精品毛片国产亚洲av十九禁 | 色婷婷综合久久久中字幕精品久久 | 欧美日韩一本 | 国产精品永久久久久久久www | 成人精品福利 | 日韩爱爱网站 | 欧美精品一区在线发布 | 国产91av视频在线观看 | 91亚洲精 | 国产精品久久久久久久久久久久冷 | 国产精品精品 | 黑人巨大精品欧美一区二区免费 | 成人久久| 日韩免费一区二区 | 毛片国产| 亚洲精品一区二区另类图片 | 日韩欧美精品一区 | 日本 欧美 国产 | 日日操日日干 | 国产免费一区二区三区 | 欧美性久久久 | 九七午夜剧场福利写真 | 国产精品久久久久久久久久妇女 | 久久精品国产久精国产 | 欧美精品国产精品 | 欧美精品久久久 | 在线视频一区二区三区 | 天堂在线中文 | 国产福利观看 | 欧美aⅴ| 日本不卡一区二区三区 | 亚洲视频区 | 在线一区| 一区二区三区播放 | 超碰超碰 | 九九免费| 久久av影院 | 97精品国产手机 | 国产一级片精品 | 国产精品国产a级 | 日韩欧美一级 |