數據編排技術在聯通的應用
首先,做一下自我介紹。我是聯通軟件研究院大數據工程師張策,同時在Alluxio社區擔任PMC member,也是Presto Contributor,對開源大數據比較感興趣,希望平時與大家多多交流。
全文將圍繞以下內容展開:
- 使用場景
- 在緩存加速方面的應用
- 在存算分離方面的應用
- 在混合負載領域的應用
- 輕量級分析相關探索
01 使用場景
首先介紹Alluxio作為分布式緩存加速大數據計算的使用場景。我們事業部存在多個數據處理與數據分析業務,這些業務分布在不同的平臺。使用GreenPlum數據庫的數據處理業務基于批處理存儲過程完成數據加工,對計算時間有嚴格的要求,有明確的deadline。這些業務遇到的主要問題是GreenPlum集群規模達到幾十臺時遇到擴容瓶頸,進一步擴展對業務運維工程師帶來很大挑戰;還有一部分業務基于hive運行T+1批處理作業,遇到的主要問題是內存、CPU等資源消耗比較大,并行度不高,跑的速度比較慢;還有一些傳統統計業務是跑在Oracle上,Oracle單機跑的效果不是特別好,并且Oracle比較貴,一臺小型機可能就要兩千多萬,業務方期望能有其他數據分析平臺替代Oracle。
當時,我們的課題是是基于Spark、Hadoop這個開源體系,去做一個統一的計算平臺,把上述三個引擎承接的業務全部接過來。我們直接用Spark + HDFS遇到了一些問題:
首先它的性能是沒有辦法滿足GreenPlum承載的生產業務,因為GreenPlum是MPP數據庫,同等體量下它會比Spark要快很多;其次GreenPlum承載的業務中存在伴生的交互式查詢的場景,同樣性能也是沒有辦法去滿足的。接著,對于一些批處理的業務來說,Spark + HDFS比較欠缺穩定性,批處理的業務每一次任務周期會有幾千、幾萬個SQL去做迭代計算,任務如果經常失敗的話,沒有辦法很好地在那之前保證去完成。
所以,為了加速Spark計算,我們引入了Alluxio來加速數據的讀寫性能,提高數據寫入的穩定性,以及接管Spark Cache后提升Spark作業的穩定性。這是我們一開始采用的架構,數據編排在我們的整個架構中的定位是緩存加速層。
02 在緩存加速方面的應用
具體看一下使用案例,如果拿加速迭代計算來說,我們會把Sink直接寫到Alluxio上,然后下一個SQL的Source去讀這個Sink,把Source和Sink之間的串聯交給Alluxio的緩存去做,這樣可以比較顯著提升整個pipeline的穩定性。因為不需要再和磁盤交互,可以極大降低磁盤壓力。而通過內存,pipeline的穩定性得到了比較好的改善,通過直接讀取內存會提高整個pipeline的運行速度。
第二個使用案例,是我們通過Alluxio去共享整個Job之間的數據,而避免使用Spark Cache。通過Spark Cache方式對會對整個Executor JVM造成比較大的壓力。性能測試結果表明在數據量達到一定規模的情況下,Spark Cache的性能是要比Alluxio差一些。而且Alluxio隨著Spark中間數據的增大,它對性能的影響是可以預測的,是線性而不是指數性上漲的。所以說,我們選擇用Alluxio做中間數據的共享層。
為了增些數據讀取的性能,可以顯示的為每個路徑去配置副本浮動范圍。而且Alluxio還支持了數據預加載的功能--distributedLoad.這個在2021年的時候,社區也是花了很多的精力在上面,目前是做的十分完善的工具了。我們可以在執行一些固定任務之前,提前先把數據加載到內存。這樣可以提高整個pipeline的運行速度。此外,Alluxio還支持一些內存操作工具,比如pin和free,這樣的話能方便我們更好地去管理內存。就是按照我們既定的一個pipeline可以把整個內存布局得到最好的優化。
03 在存算分離方面的應用
接下來是跟Alluxio數據編排Policy有關的例子。比如我們一個Spark任務去讀一個數據,它可能會在每個worker上都拉起一些副本,比如重復使用的一些數據。這種情況可能會迅速把緩存占滿。對于這種情況,Alluxio可以提供一個叫確定性哈希的策略。這個策略可以使我們在固定的某幾個worker上去讀。這樣可以把整個集群的副本數加起來,能更有效地去使用空間。除此之外,針對大量場景,它還支持一些跟負載均衡相關的策略。比如最高可控策略,它會選擇整個集群內空間剩余量最大的一個worker去讀。這樣可以使所有worker的使用比較均衡。但是,它也有一些問題,當大量Alluxio Client 在Spark Executor這邊如果大量去擠兌一個Alluxio Worker的話,那么可能會導致這個worker的使用量瞬間打滿,出現性能降級的情況。針對這種情況,我們可以去選擇一個輪循策略,在若干有剩余量的work之間去輪循使用,這樣可以比較好地去實現集群的負載均衡。
從總的使用效果來看,Alluxio還帶來了比較大的性能與穩定性的提升。這里是一個比較典型的Spark Task統計,Alluxio對整個GC的時間優化,還有整個任務總耗時的優化的收益是比較顯著的。它最大的優點是可以顯著降低每批次Job的重算概率,從而使整個過程更加穩定,更加可預測。
最終效果是我們從之前GreenPlum遷移過來的核心業務的規模已經有接近70倍的增長,并且承接的用戶數相比之前提高了100%,業務總體提速了26個小時。交互式查詢業務目前日常數據已經達到60TB,如使用Spark單表2.5tb數據,查詢耗時也是分鐘級別。原先Oracle業務遷移到Spark后可以做用戶級的計算,這對于他們來講是具有很大的價值。
接下來是我們針對集群繼續發展的特點,做了在存算分離方面的一些建設。存算分離的背景大概是什么樣的?這主要是跟整個集群的發展有一定的關系。首先在我們這邊的平臺建設是跟著業務走。隨著業務快速增長,會出現資源碎片化的情況。如圖所示,第一年新建一個機房,我們占用了一部分機架,其他業務或部門也會占用一些機架。到第二年的時候,第一年的機房就滿了,第二年新建的機房大家可以各自去申請一些。這就導致整個機器不具有連續性,可能會跨機房,甚至可能會跨樓,有時候甚至還會跨數據中心。與之伴隨的就是業務快速增長,基本上處于逐年倍增的情況,而且資源申請周期非常長,至少需要一年的時間才能交付,最后導致的情況就是集群呈現碎片化。
更為糟糕的是在業務增長過程中,其實它的資源需求是不平衡的,主要是存儲與計算之間的不平衡。首先,一個客觀事實是歷史數據的存儲需求是逐年遞增的。之前業務只需要保留1至2個月的數據。但是現在因為一些歷史的趨勢分析或是各方面測算,一些主要業務的數據需要保存12至24個月。業務數據每個周期之間的環比漲幅大概是10%,漲幅大時甚至有5~10倍的情況,主要是跟業務邏輯變化相關。其次,存儲規模的漲幅大概是計算規模漲幅的5到6倍,這也是存儲計算發展不平衡的情況。
我們使用了存算分離的技術來解決這些問題。
首先解決計算資源的問題。我們向其他的業務租借了一個現有的集群,利用該集群空閑的夜間完成數據加工作業。我們在上面部署了一套Spark,以及Alluxio和我們集群的HDFS構成一套存算分離的數據分析系統。為什么不在該集群部署Hadoop?原因是租借的業務集群已經搭建了一套Hadoop,我們不被允許二次搭建Hadoop,也不允許去使用他們的Hadoop。所以,我們就用該業務集群的Alluxio去掛載我們平臺的HDFS。掛載HDFS之后就跟我們自己平臺保持一樣的命名空間,這樣看起來都是一樣的,基本上做到用戶無感知。
更詳細的如此圖所示,就是對于2個Alluxio來說,看到的Path都是一樣的,都是映射到HDFS的一個Path,所以用戶認為這兩個Path都是一樣的。我們在讀寫不同Path的時候,會讓不同的Alluxio分別執行讀寫操作。比如遠程Alluxio對Path1是只讀的,它會去寫Path2,本地Alluxio會寫Path1,但只會去讀Path2,這樣就避免了兩個Alluxio相互之間沖突。
在具體落實方案上,我們還使用了一些自己的設計,來避免存算分離會遇到的一些問題。
首先,我們在遠程業務集群這塊,是基于RocksDB+Raft HA 的方式來解決沒有本地HDFS時Alluxio HA元數據操作性能的問題。因為我們的HDFS在我們的集群,中間有一定的網絡消耗。如果我們直接還是采用原來的zookeeper ha,首先我們需要在他們的集群搭打一套zookeeper ha,以及把元數據放在我們自己集群的HDFS上,跨網絡元數據交互可能會帶來很多的不確定性。比如帶寬打滿了,或者是因為其他網絡波動的因素,會導致Alluxio本身的性能抖動,甚至可能出現因為數據寫入或者讀取超時導致整個Alluxio掛掉的情況。所以,我們選擇了Alluxio 2.x之后不斷去建設去完善的RocksDB+Raft HA的方式來解決這個問題。
其次,我們在業務集群這邊因為要滿足所有中間數據的存儲,提升整個計算性能,所以我們使用HDD作為存儲介質。Spark作業的中間過程數據直接存儲在緩存磁盤里,不會與UFS有任何交互,所以對于用戶來說,這種模式的性能是不受影響的。
第三,最終結果還可以持久化至集群。因為最終結果的數量也不是特別大,所以它的耗時還是可以接受的。最后對于用戶來說,他可能需要跨集群部署任務,我們是在租借的業務集群之內搭建了一個Dolphin Scheduler Worker,通過Dolphin的調度策略,幫助用戶把他的特定任務起在這個Worker上面。通過Worker的選擇來控制它提交到不同的集群。對于用戶來說只是配置上做了變更,作業提交入口以及管理入口都是相同的,解決了跨集群作業管理的問題。
實現計算混合部署之后,我們又接到了大量的數據存儲需求。但是我們的集群短時間內沒有辦法擴容了,所以我們申請了一批大容量存儲,然后把大容量存儲mount到Alluxio,將歷史數據自動化降級到大容量存儲上,查詢的時候就經由Alluxio透明訪問。我們會把前2個月至前12個月的歷史數據降級到大容量存儲上,本地集群只保留最近幾個月會頻繁參與計算的數據。對于用戶來說,訪問的路徑跟之前還是一樣的,我們通過mount方式屏蔽了歷史數據分層管理的差異性。對于我們的好處是單位服務器的存儲容量顯著提高了,大容量存儲可以獨立擴容,對于我們來說緩解了很大的存儲壓力。
以下是我們做完存算分離以后的實際效果。
首先,某核心用戶租借算力占平臺分配算力的82%,這個是比較大的提升。承接新業務使用租借算力占比達到50%,Alluxio管理ETL過程數據達到148TB,算是非常龐大的數字了。因為Alluxio其實作為緩存來講并不會去管理特別大量的數據。管理100至200TB這個數據量,我們跟社區一起做了很多工作,包括worker啟動超時等優化,以滿足中間數據存儲的需求。單獨擴容的大容量存儲給我們帶來的好處是單臺服務器的存儲容量提升5倍,計算集群采用的計算型服務器存儲容量比較小,大容量存儲單臺機器就能存儲90TB數據,服務器臺數相同的情況下,容量提升比較明顯,所以歷史數據存儲顯著降低,擴容成本是降低了83%。
04 在混合負載領域的應用
當集群向后繼續發展的時候,我們引入了更多的計算引擎來適配更多的業務場景以得到更好的效果。其中比較典型的場景是我們在一個平臺上同時提供Spark和Presto,Spark主要用于ETL,Presto提供即席查詢服務。它們共用Alluxio時會遇到一些問題:首先,Alluxio System Cache不支持Quota,沒有辦法隔離Spark跟Presto之間的用量,由于Spark ETL作業寫入的數據較大,數據直接寫入Alluxio作為pipeline下一個節點的讀取加速。這種情況下內存沖刷比較頻繁,會一次性占用較多的內存。這樣Presto剛完成數據加載,想查數據的時候發現緩存中沒有了,Presto查詢的緩存利用率不是特別高。
第二個情況是一些用戶使用TensorFlow做自然語言處理或者時間序列分析的AI計算。在AI計算之前,用戶首先基于Spark做分布式ETL,然后想把ETL結果直接拿給TensorFlow使用,在這種情況下,用戶很難把分布式的數據和本地數據聯系到一起。第三方框架比如TensorFlow on Spark一般都會封裝一些標準模型。用戶使用的模型不是主流模型,但實際效果較好,這種情況沒有辦法套用現用的第三方框架,直接將Spark DataFrame轉換成TensorFlow的數據模型直接使用。這樣TensorFlow還是一種讀取本地文件的模式,就是不是很好的能把Spark與TensorFlow聯動起來。
針對使用案例剛才講到的一些問題,我們做了一些改進。首先,Presto這邊我們放棄了使用Alluxio System Cache,而是使用Alluxio Local Cache緩存數據,這樣跟Spark使用的Alluxio System Cache進行隔離。我們會為每個Presto Worker去開辟一個獨立的緩存。這個緩存是放在Ramdisk上面,官方推薦放到SSD里邊也是可以的。緩存不占用Presto Worker的JVM空間,不會對GC造成額外負擔。除此之外,因為我們有一些磁盤場景,所以跟社區一起完善了基于RockDB的緩存實現。以上是我們在緩存隔離上做的一些設計。
在AI這邊,我們利用Alluxio Fuse打通了ETL與AI訓練和推理。Alluxio Fuse首先做了本地文件跟分布式系統掛載,這樣就可以把分布式文件映射到本地文件。然后用戶可以直接使用讀寫本地文件的方式操作一個分布式系統中的文件,與筆記本上開發的讀寫本地代碼的邏輯完全相同。這樣就成功地打通了Spark ETL跟TensorFlow的計算。Alluxio目前是提供兩種Fuse掛在模式,首先提供為每個Worker創建一個Fuse的掛載目錄。這種方式可以更好地提高數據訪問性能,我們目前使用的是這種模式。還有一種是單獨起一個Fuse進程,這樣就不需要在本地區部署Alluxio集群,這種方式有助于靈活部署。大家可以按照自己需要去選擇不同的部署模式。
目前使用的效果,Presto因為引入了Cache,查詢性能提升了50%左右,相比OS Cache更加穩定。如果我們都用OS Cache性能是最快的,但因為集群支撐的是數據密集型的負載,所以OS Cache不是特別穩定,容易被刷下去。相比沒有Cache加速時,它的查詢性能有不錯的提升。大數據AI集成方面,用戶可以用TensorFlow去訓練推理的代碼不需要做任何改動,就可以跟Spark ETL做集成,目前首批業務是完成了60萬用戶的接入,整個過程基于Dolphin Scheduler實現大數據+AI全流程自動化調度,用戶的運維復雜度非常低。這里幫Alluxio的邱璐做個廣告,她目前是Alluxio Fuse的負責人。Alluxio Fuse已經在微軟、Boss直聘、嗶哩嗶哩、陌陌等公司的生產訓練中完成了部署。
05 輕量級分析相關探索
現狀一,目前聯通在大力推動數字化轉型,之前很多跟數據分析沒有關系的業務,如一些傳統業務,也想做數據分析。利用之前沉淀的數據,想做預測或者是原因定位。目標用戶更多是業務工程師,更喜歡關系型數據庫以及SQL。
現狀二,不同的業務之間同時需要訪問平臺運營的公有數據,以及訪問、管理業務私有數據。這些私有數據的業務口徑由業務方制定,并按照自己的開放策略向其他業務方提供。因此共享私有數據需要數據提供方授權,公有數據只需要平臺授權就可以。
現狀三,服務器資源增量目前低于業務的需求量,因為數據分析需求上漲很厲害,而且不可預測,很多需求不在總體規劃里。與此同時,業務自有服務器在空閑時間段負載比較低,一般主要是白天比較忙。這樣的話,他們的現有資源其實是能夠去支撐新增的一些數據分析業務的。
所以,考慮到平臺在擴容的時候是沒有辦法跟上業務的發展腳步的。為了能盡快滿足業務數字化建設需求,我們計劃為這些業務做有化部署,基于Spark做私有化部署的時候,我們遇到一些問題。
首先,如果按照我們自己標準的模式為每個用戶去獨立部署一個Spark on Yarn的話,管理復雜度就太高了。我們可能會管很多個集群的Spark on Yarn,這樣的管理成本是無法支撐的。另外,部分用戶服務器已經部署了HBase on Hadoop,這些集群不允許我們部署Hadoop也不允許我們在上面部署Hbase,而且不能使用已有的Hadoop。這樣就給Spark體系增加了很多難度。并且,業務用戶希望使用這種純SQL分析數據,如果是這樣,我們會需要添加類似于Kyuubi的工具,我們需要為了滿足一個需求添加很多組件,這樣的話多個集群的管理復雜度會非常高。還有一種方式就是我們不采用on Yarn的方式,采用Spark Standalone模式,其實部署起來相對還好,只使用Spark + Alluxio還不錯。
但這種方式的問題是,如果我們采用Spark per job作業模式,它的并發度是有限的。如果我們給一個通用值,它并發度是一個查詢起一個job,并發度是比較低的,可能同時只能并發幾個。而且Spark配置復雜度偏高,對于SQL用戶來說不是很友好,還需要配置內存大小,executor數目,或者Dynamic allocate限制參數。如果不采用Spark per job采用單Session的模式,又沒有辦法在多任務間做優先級隔離,無法對一個業務的多個SQL進行編排。
我們的思路是希望使用Presto+Alluxio做一個輕量級的數據分析,基于Presto Iceberg native connector,直接去操作存儲在Alluxio上面的Hadoop catelog表,只需要加Presto + Alluxio兩個組件,我們就可以完成整個ETL的過程。我們用了兩套mount系統:第一套是比較傳統的Alluxio直接去mount平臺的HDFS;第二套是Alluxio structure data service(結構化數據服務)mount hive表,對于用戶集群來說,他看到的hive表跟平臺這邊的hive表是一樣的。Mount的好處是可以將數據透明地映射到Alluxio命名空間之內,通過Hive Table load提前把數據加載至Alluxio緩存中,增加數據讀取性能。所有的中間過程只需要Presto以must cashe的方式寫入Alluxio本地,加工完之后,就可以把這個數據最后的結果數據寫回平臺的hive。整個架構比較簡單的,只需要兩個組件,每臺機只要起兩個進程,沒有額外的負擔。
而且用戶可以通過JDBC或者是Command line兩種形式訪問Presto,并且用戶可以通過選擇性的persist Iceberg Hadoop table來授權給其他集群用戶訪問中間數據。通過Alluxio的統一命名空間設計,用戶看到的hive table、Iceberg table是完全一致的,實現了多個集群之間的透明管理。
在這個輕量級數據分析棧內,我們只需要用在戶集群搭建兩個組件就可以滿足需求。因為用戶基于Presto以純SQL的方式完成數據分析,體驗比較好。目前我們用戶喜歡用JDBC的方式去連接,所以暫時不需要提供更多可視化工具。如果需要,只要在我們現有的可視化工具里配置鏈接就行。Alluxio mount平臺HDFS用于私有化數據共享,SDS mount平臺Hive用于共有數據訪問及性能加,基于Presto Iceberg connector的hadoop catalog mode這種模式,可以采用只寫緩存的方式在本地完成ETL。Alluxio全部使用的掛載磁盤的方式來保證緩存空間足夠容納數據分析中間數據,最終數據持久化至平臺HDFS。Alluxio在寫入的時候會使用三副本的策略,保證緩存數據的可用性。
今天的分享就到這里,謝謝大家。