Flink Task調度部署機制
1背景
在日常Flink使用過程中,我們經常遇到Flink任務中某些Slot或者TM負載過重的問題,對日常的資源調配、運維以及降本都帶來了很大的影響,所以我們對Flink的task部署機制進行了梳理和調研,準備在后續的工作中進行優化。由于jobGraph的生成以及任務提交流程因任務部署方式而不同,對我們后續的分析也沒有影響,這里忽略前置流程,直接從Dispatcher出發,重點關注submit后executionGraph構建以及后續的任務部署過程。
2Flink Scheduling Components 構成
2.1 SchedulerNG
在Dispatcher收到submit請求后,先是啟動了JobManagerRunner,再啟動JobMaster,在初始化jobMaster的過程中,我們注意到這里開始了整個作業的Scheduling第一步,創建SchedulerNG。
我們看下SchedulerNG的職責,可以看到調度的發起,作業狀態的跟蹤以及我們熟悉的cp,sp的trigger都是在這里:
我們這次主要跟蹤構建executionGraph,然后根據Scheduling策略發起的整個部署過程。
2.2 ExecutionGraph
現階段(1.13)SchedulerNG默認實現是DefaultScheduler,初始化過程中就會開始構建我們的ExecutionGraph,ExecutionGraph中有幾個重要元素
- ExecutionJobVertex: 代表jobGraph中的一個JobVertex,是所有并行Task的集合
- ExecutionVertex: 代表ExecutionJobVertex中并行task中的一個,一個ExecutionJobVertex可能同時有很多并行運行的ExecutionVertex
- Execution: 代表ExecutionVertex的一次部署/執行,一個ExecutionVertex可能會有很多次Execution
這里executionGraph通過jobGraph的拓撲圖構建了自己的核心結構,看下從JobVertex到ExecutionJobVertex 的轉換流程:
2.3 執行層拓撲結構
我們知道Flink引擎在不停的致力于批流一體建設,調度層的統一也是其中核心的一層。為了提高failover后recovery速度,減少對Flink任務的影響,現在Flink對于批、流的任務task調度都是以pipeline region為基礎。
Pipeline region的構建內嵌在executionGraph的初始化過程中,我們知道Flink中各個節點之間的鏈接都會有IntermediateDataSet這一種邏輯結構,用來表示JobVertex的輸出,即該JobVertex中包含的算子會產生的數據集。這個數據集的ResultPartitionType有幾種類型:
接下來我們看看executionGraph的核心拓撲結構ExecutionTopology是如何構建的:
2.4 Scheduling 策略
SchedulerNG Scheduling策略默認為PipelinedRegionSchedulingStrategy,在executionGraph完成之后,就可以根據生成的剛剛executionTopology來初步構建初步的Scheduling策略了。這里看下startScheduling代碼,可以看到Scheduling過程就是我們常說的基于pipeline region的Scheduling。
2.5 Execution Slot 分配器
默認實現是SlotSharingExecutionSlotAllocator,在schedulerNG完成executionGraph構建完成后,需要進一步構建Execution Slot 分配器。用于將physical shared slots分配到我們的logical slots 上,并將logical slot 分配給我們executionGraph中的execution(task)。通過代碼我們可以看到ExecutionSlotAllocator的職責非常簡單,只有簡單的allocate和cancel。
但在實現上這里有幾個重要元素需要了解:
LocalInputPreferredSlotSharingStrategy :在Flink內部,所有的slot分配都是基于sharingslot來操作的,在滿足co-location的基礎上,Flink期望將producer和consumeNode task盡可能的分布在一起,以減少數據傳輸成本。
SlotProfile:slot的資源信息,對task -> logical slot -> physical slot的mapping有非常重要的作用,包含了task的資源信息,slot的物理資源信息,傾向的location(TaskManagerLocation),傾向的allocation以及整個executionGraph之前分配過的allocation(用于黑名單,重啟后盡量避免分配在之前的slot里)。
ResourceProfileRetriever: 用于獲取executionVertex的實際資源信息。默認是unknown,如果有明細配置會用于后續的executionSlotSharingGroup資源構建。
ExecutionSlotSharingGroup:Flink task資源申請的最終邏輯載體,用于將sharing到一起的task(execution group)組合成一個group用于生成資源,后續部署也會綁定對應的task。
3Scheduling 主要過程
在JobMaster完成自身構建之后,就委托SchedulerNG來開始了整個job的Scheduling:
可以看到這里是由schedulingStrategy來負責整個調度過程的,也就是我們的PipelinedRegionSchedulingStrategy,
one by one將pipeline region進行部署
遍歷region中的ExecutionVertex依次進行部署
將vertexDeployment交給SlotSharingExecutionSlotAllocator處理
通過SlotSharingStrategy拿到每個execution對應的ExecutionSlotSharingGroup
- 先從 corresponding co-location constraint 去mapping中尋找是否有存在的slot sharing group
- 接著從producer 的角度來逐一檢查是否可以合并到同一個slot sharing group.
- 最后嘗試所有剩下的slot sharing group看是否符合execution 的要求(如同屬于一個job vertex的task不能分配到同一個 slot sharing group).
- 如果以上都沒有滿足條件的就創建一個新的slot sharing group
- 檢查ExecutionSlotSharingGroup是否已經有了對應的sharedSlot
- 遍歷尚未得到分配的ExecutionSlotSharingGroup
- 計算對應的SlotProfile
- 向PhysicalSlotProvider申請新的physical slot
rm側會先檢查是否已經有滿足條件的excess slot
如果沒有嘗試會申請新的woker以提供資源
由sharedSlotProfileRetriever來創建對應的slotProfile并構建PhysicalSlotRequest
PhysicalSlotProvider向slotPool申請新的slot
slotPool會向rm側申請新的slot
利用physical slot future提前創建sharedSlotFutrue
將sharedSlotFutrue 分配給所有相關的executions
最后生成所有的SlotExecutionVertexAssignments
在完成所有的SlotExecutionVertexAssignment之后,生成對應的DeploymentHandle并等待所有的assignedSlot創建完畢,正式開始部署對應的任務。?
4問題思考
我們對整個Flink task的部署過程完成梳理后,重新對我們一開始的問題進行思考:
4.1 為什么會出現slot負載過重的情況?如何避免?
問題的產生在于大量的task集中分配到了統一個sharedSlot,這個我們可以發現其實是在ExecutionSlotSharingGroup的構建過程中產生的。我們看下源碼,可以很直接的看到整個group的分配是一個roundRobin過程,而executionVertices來自于有序拓撲結構,中間傳遞過程也保證了有序性,所以最終會導致大量的task分配的index靠前的group中,最后落到了同一個slot。
為了避免這種情況,我們的做法其實有比較多,一種是在保證各種constraint的同時添加隨機性,以打散各個不均勻的task;還有一種就是構建基于load-balance的分配過程,以盡可能的將task分布均勻。
附Flink部分源碼:
這個問題主要是在于說有一些過重的task對應的slot都分配在了同一個tm上,導致整個tm壓力過大,資源難以協調。在整個過程中其實我們有看到tm信息的交互,在co-location constraint上。我們看下該hint職責:
The co-location group is used to make sure that the i-th subtasks for iteration head and iteration tail are scheduled on the same TaskManager.
也就是說其實是為了解決算子間相同index的task數據傳遞之類的問題,但對于task的均衡負載無法介入。對此我們嘗試去做的事情:
在當前不使用細粒度資源配置的情況下,考慮task-slot之間均衡分布的同事,task-tm也能做到一定的負載均衡。這種情況可以通過tm單slot來解決,也可以在保證task-slotSharingGroup足夠隨機性的同時,保證slotSharingGroup-tm的足夠隨機性。
在后續使用使用細粒度資源配置的情況下,不使用slotsharing,且將相同jobVertex對應的task盡量分布在同一個task當中。這個我們后續準備在slotProfile中加入jobVertex相關的tag,SlotAllocator做slot matching的時候加入jobVertex constraint來保證task的位置分配。
5寫在最后
Flink開源社區較活躍,Task側的部署鏈路也一直在演進中,持續跟進并深入了解內部實現邏輯能更好的支持我們解決Flink個性化調度策略上的一些問題。后續我們也準備進一步完善Flink在operator級別的細粒度資源配置能力,降低資源使用率的同時進一步提高Flink作業穩定性。