成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Flink Task調度部署機制

開發 前端
Flink開源社區較活躍,Task側的部署鏈路也一直在演進中,持續跟進并深入了解內部實現邏輯能更好的支持我們解決Flink個性化調度策略上的一些問題。

1背景

在日常Flink使用過程中,我們經常遇到Flink任務中某些Slot或者TM負載過重的問題,對日常的資源調配、運維以及降本都帶來了很大的影響,所以我們對Flink的task部署機制進行了梳理和調研,準備在后續的工作中進行優化。由于jobGraph的生成以及任務提交流程因任務部署方式而不同,對我們后續的分析也沒有影響,這里忽略前置流程,直接從Dispatcher出發,重點關注submit后executionGraph構建以及后續的任務部署過程。

2Flink Scheduling Components 構成

2.1   SchedulerNG

在Dispatcher收到submit請求后,先是啟動了JobManagerRunner,再啟動JobMaster,在初始化jobMaster的過程中,我們注意到這里開始了整個作業的Scheduling第一步,創建SchedulerNG。

this.schedulerNG =
createScheduler(
slotPoolServiceSchedulerFactory,
executionDeploymentTracker,
jobManagerJobMetricGroup,
jobStatusListener);

我們看下SchedulerNG的職責,可以看到調度的發起,作業狀態的跟蹤以及我們熟悉的cp,sp的trigger都是在這里:

圖片

我們這次主要跟蹤構建executionGraph,然后根據Scheduling策略發起的整個部署過程。

2.2   ExecutionGraph

現階段(1.13)SchedulerNG默認實現是DefaultScheduler,初始化過程中就會開始構建我們的ExecutionGraph,ExecutionGraph中有幾個重要元素

  1. ExecutionJobVertex: 代表jobGraph中的一個JobVertex,是所有并行Task的集合
  2. ExecutionVertex: 代表ExecutionJobVertex中并行task中的一個,一個ExecutionJobVertex可能同時有很多并行運行的ExecutionVertex
  3. Execution: 代表ExecutionVertex的一次部署/執行,一個ExecutionVertex可能會有很多次Execution

這里executionGraph通過jobGraph的拓撲圖構建了自己的核心結構,看下從JobVertex到ExecutionJobVertex 的轉換流程:

// topologically sort the job vertices and attach the graph to the existing one
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
executionGraph.attachJobGraph(sortedTopology){
1. executionGraph第一步拿到了jobGraph中的有序JobVertex列表
2. 接著一對一創建ExecutionJobVertex
3. 根據producer并行度生成producedDataSets(IntermediateDataSet)
4. 再根據自身并行度生成所屬的ExecutionVertex[]
5. 構建stateBackend信息和checkpointStorage信息等
6. 最后完成executionGraph的拓撲構建executionTopology
}

2.3   執行層拓撲結構

我們知道Flink引擎在不停的致力于批流一體建設,調度層的統一也是其中核心的一層。為了提高failover后recovery速度,減少對Flink任務的影響,現在Flink對于批、流的任務task調度都是以pipeline region為基礎。

Pipeline region的構建內嵌在executionGraph的初始化過程中,我們知道Flink中各個節點之間的鏈接都會有IntermediateDataSet這一種邏輯結構,用來表示JobVertex的輸出,即該JobVertex中包含的算子會產生的數據集。這個數據集的ResultPartitionType有幾種類型:

BLOCKING:都上游處理完數據后,再交給下游處理。這個數據分區可以被消費多次,也可以并發消費。這個分區并不會被自動銷毀,而是交給調度器判斷。
BLOCKING_PERSISTENT:類似于Blocking,但是其生命周期由用戶端指定。調用JobMaster或者ResourceManager的API來銷毀,而不是由調度器控制。
PIPELINED:流交換模式??梢杂糜谟薪绾蜔o界流。這種分區類型的數據只能被每個消費者消費一次。且這種分區可以保留任意數據。
PIPELINED_BOUNDED:該策略在PIPELINED的基礎上保留有限制的buffer,避免對barrier造成阻塞。
PIPELINED_APPROXIMATE:和PIPELINED_BOUNDED類似,可以支持下游task重啟后繼續消費,用來支持task failover后的Approximate Local-Recovery策略。

接下來我們看看executionGraph的核心拓撲結構ExecutionTopology是如何構建的:

第一步 先根據executionTopology構建rawPipelinedRegions,多個vertex能否組合成一個pipeline region的關鍵在于這個vertex的consumedResult.getResultType().isReconnectable(),如果支持重連,那么兩個vertex之間就會進行拆分,劃到不同的region。這里的isReconnectable就和我們的ResultPartitionType類型有關,流處理中的PIPELINED和PIPELINED_BOUNDED都是默認的false,在這種情況下所有的vertex其實都會放入同一個region。故我們日常的flink作業其實都只會生成一個pipeline region。
第二步 根據不同的pipeline region構建自己的resultPartition信息,這個是為了構建后續的PartitionReleaseStrategy,決定一個resultPartition何時finish以及被release
第三步 對vertex的coLocation情況進行校驗,保證co-located tasks必須在同一個pipeline Region里。這里是因為后續的scheduling strategy里會保證不同pipeline region的調度部署是階段隔離的,可能無法滿足colocation-constraint

2.4   Scheduling 策略

SchedulerNG Scheduling策略默認為PipelinedRegionSchedulingStrategy,在executionGraph完成之后,就可以根據生成的剛剛executionTopology來初步構建初步的Scheduling策略了。這里看下startScheduling代碼,可以看到Scheduling過程就是我們常說的基于pipeline region的Scheduling。

@Override
public void startScheduling() {
final Set<SchedulingPipelinedRegion> sourceRegions =
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
.filter(this::isSourceRegion)
.collect(Collectors.toSet());
maybeScheduleRegions(sourceRegions);
}

2.5   Execution Slot 分配器

默認實現是SlotSharingExecutionSlotAllocator,在schedulerNG完成executionGraph構建完成后,需要進一步構建Execution Slot 分配器。用于將physical shared slots分配到我們的logical slots 上,并將logical slot 分配給我們executionGraph中的execution(task)。通過代碼我們可以看到ExecutionSlotAllocator的職責非常簡單,只有簡單的allocate和cancel。

圖片

但在實現上這里有幾個重要元素需要了解:

LocalInputPreferredSlotSharingStrategy :在Flink內部,所有的slot分配都是基于sharingslot來操作的,在滿足co-location的基礎上,Flink期望將producer和consumeNode task盡可能的分布在一起,以減少數據傳輸成本。

SlotProfile:slot的資源信息,對task -> logical slot -> physical slot的mapping有非常重要的作用,包含了task的資源信息,slot的物理資源信息,傾向的location(TaskManagerLocation),傾向的allocation以及整個executionGraph之前分配過的allocation(用于黑名單,重啟后盡量避免分配在之前的slot里)。

ResourceProfileRetriever: 用于獲取executionVertex的實際資源信息。默認是unknown,如果有明細配置會用于后續的executionSlotSharingGroup資源構建。

ExecutionSlotSharingGroup:Flink task資源申請的最終邏輯載體,用于將sharing到一起的task(execution group)組合成一個group用于生成資源,后續部署也會綁定對應的task。

3Scheduling 主要過程

在JobMaster完成自身構建之后,就委托SchedulerNG來開始了整個job的Scheduling:

@Override
protected void startSchedulingInternal() {
log.info(
"Starting scheduling with scheduling strategy [{}]",
schedulingStrategy.getClass().getName());
transitionToRunning();
schedulingStrategy.startScheduling();
}

可以看到這里是由schedulingStrategy來負責整個調度過程的,也就是我們的PipelinedRegionSchedulingStrategy,

one by one將pipeline region進行部署

private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
final List<SchedulingPipelinedRegion> regionsSorted =
SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(
schedulingTopology, regions);


final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache = new HashMap<>();
for (SchedulingPipelinedRegion region : regionsSorted) {
maybeScheduleRegion(region, consumableStatusCache);
}
}

遍歷region中的ExecutionVertex依次進行部署

final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
regionVerticesSorted.get(region), id -> deploymentOption);
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);

將vertexDeployment交給SlotSharingExecutionSlotAllocator處理

private List<SlotExecutionVertexAssignment> allocateSlots(
final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
return executionSlotAllocator.allocateSlotsFor(
executionVertexDeploymentOptions.stream()
.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
.collect(Collectors.toList()));
}
接下來整個allocate的主要過程如下(忽略physical fail等情況)

通過SlotSharingStrategy拿到每個execution對應的ExecutionSlotSharingGroup

  1. 先從 corresponding co-location constraint 去mapping中尋找是否有存在的slot sharing group
  2. 接著從producer 的角度來逐一檢查是否可以合并到同一個slot sharing group.
  3. 最后嘗試所有剩下的slot sharing group看是否符合execution 的要求(如同屬于一個job vertex的task不能分配到同一個 slot sharing group).
  4. 如果以上都沒有滿足條件的就創建一個新的slot sharing group
  1. 檢查ExecutionSlotSharingGroup是否已經有了對應的sharedSlot
  2. 遍歷尚未得到分配的ExecutionSlotSharingGroup
  3. 計算對應的SlotProfile
  4. 向PhysicalSlotProvider申請新的physical slot
  1. rm側會先檢查是否已經有滿足條件的excess slot

  2. 如果沒有嘗試會申請新的woker以提供資源

  3. 由sharedSlotProfileRetriever來創建對應的slotProfile并構建PhysicalSlotRequest

  4. PhysicalSlotProvider向slotPool申請新的slot

  5. slotPool會向rm側申請新的slot

  1. 利用physical slot  future提前創建sharedSlotFutrue

  2. 將sharedSlotFutrue 分配給所有相關的executions

  3. 最后生成所有的SlotExecutionVertexAssignments

在完成所有的SlotExecutionVertexAssignment之后,生成對應的DeploymentHandle并等待所有的assignedSlot創建完畢,正式開始部署對應的任務。?

4問題思考

我們對整個Flink task的部署過程完成梳理后,重新對我們一開始的問題進行思考:

4.1   為什么會出現slot負載過重的情況?如何避免?

問題的產生在于大量的task集中分配到了統一個sharedSlot,這個我們可以發現其實是在ExecutionSlotSharingGroup的構建過程中產生的。我們看下源碼,可以很直接的看到整個group的分配是一個roundRobin過程,而executionVertices來自于有序拓撲結構,中間傳遞過程也保證了有序性,所以最終會導致大量的task分配的index靠前的group中,最后落到了同一個slot。

為了避免這種情況,我們的做法其實有比較多,一種是在保證各種constraint的同時添加隨機性,以打散各個不均勻的task;還有一種就是構建基于load-balance的分配過程,以盡可能的將task分布均勻。

附Flink部分源碼:

private void findAvailableOrCreateNewExecutionSlotSharingGroupFor(
final List<SchedulingExecutionVertex> executionVertices) {


for (SchedulingExecutionVertex executionVertex : executionVertices) {
final SlotSharingGroup slotSharingGroup =
getSlotSharingGroup(executionVertex.getId());
final List<ExecutionSlotSharingGroup> groups =
executionSlotSharingGroups.computeIfAbsent(
slotSharingGroup.getSlotSharingGroupId(), k -> new ArrayList<>());


ExecutionSlotSharingGroup group = null;
for (ExecutionSlotSharingGroup executionSlotSharingGroup : groups) {
if (isGroupAvailableForVertex(
executionSlotSharingGroup, executionVertex.getId())) {
group = executionSlotSharingGroup;
break;
}
}


if (group == null) {
group = new ExecutionSlotSharingGroup();
group.setResourceProfile(slotSharingGroup.getResourceProfile());
groups.add(group);
}


addVertexToExecutionSlotSharingGroup(executionVertex, group);
}
}
4.2   如何避免tm級別的負載過重?

這個問題主要是在于說有一些過重的task對應的slot都分配在了同一個tm上,導致整個tm壓力過大,資源難以協調。在整個過程中其實我們有看到tm信息的交互,在co-location constraint上。我們看下該hint職責:

The co-location group is used to make sure that the i-th subtasks for iteration head and iteration tail are scheduled on the same TaskManager.

也就是說其實是為了解決算子間相同index的task數據傳遞之類的問題,但對于task的均衡負載無法介入。對此我們嘗試去做的事情:

在當前不使用細粒度資源配置的情況下,考慮task-slot之間均衡分布的同事,task-tm也能做到一定的負載均衡。這種情況可以通過tm單slot來解決,也可以在保證task-slotSharingGroup足夠隨機性的同時,保證slotSharingGroup-tm的足夠隨機性。

在后續使用使用細粒度資源配置的情況下,不使用slotsharing,且將相同jobVertex對應的task盡量分布在同一個task當中。這個我們后續準備在slotProfile中加入jobVertex相關的tag,SlotAllocator做slot matching的時候加入jobVertex constraint來保證task的位置分配。

5寫在最后

Flink開源社區較活躍,Task側的部署鏈路也一直在演進中,持續跟進并深入了解內部實現邏輯能更好的支持我們解決Flink個性化調度策略上的一些問題。后續我們也準備進一步完善Flink在operator級別的細粒度資源配置能力,降低資源使用率的同時進一步提高Flink作業穩定性。

責任編輯:武曉燕 來源: 得物技術
相關推薦

2024-02-27 08:05:32

Flink分區機制數據傳輸

2014-01-06 17:09:10

ApacheMesos

2022-01-14 07:56:38

Checkpoint機制Flink

2025-01-15 09:13:53

2021-11-02 06:58:55

FlinkWindow機制

2024-06-04 15:56:48

Task?.NET異步編程

2022-12-20 10:22:16

計算函數

2015-03-24 16:29:55

默認線程池java

2020-10-10 14:21:49

CDH6.3.2flink部署

2013-08-05 17:09:57

2023-06-20 07:32:04

2021-07-30 19:44:51

AndroidJava線程

2020-03-03 08:29:07

時延敏感網絡TSN網絡

2022-06-20 06:38:50

Flink批作業算子

2025-06-03 07:00:00

大數據Flink并行度

2009-03-02 14:19:33

CiscoWi-Fi通話調度

2021-02-01 11:30:13

React前端調度

2021-11-29 08:48:00

K8S KubernetesAirflow

2022-05-19 08:47:30

Flinkwatermark窗口計算

2019-06-04 13:44:53

架構運維技術
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产免费一级片 | 国产 日韩 欧美 在线 | 欧美一区二区三区 | 国产精品99久久久精品免费观看 | 在线啊v| 国产精品一区在线 | 在线一区| 亚洲精品久久久久久一区二区 | 免费毛片在线 | 久草在线青青草 | 国产高清在线精品一区二区三区 | 国产欧美视频一区 | 亚洲美女一区 | 久久99视频精品 | 亚洲精品二区 | av入口 | 一区二区三区精品视频 | 亚洲在线一区 | 中文字幕亚洲视频 | 亚洲视频免费一区 | 久久久久一区二区三区四区 | 成人深夜福利网站 | 操操日| 久久久久国产一级毛片 | 精品福利在线视频 | 精品国产一区二区三区四区在线 | 成人在线视频观看 | 国产 欧美 日韩 一区 | 欧美高清视频一区 | 欧美一区二区在线 | 国产高清一区二区 | 亚洲美女在线一区 | 日日夜夜影院 | 欧美一级在线观看 | 久久99精品久久久久久青青日本 | 国产精品福利久久久 | 日韩欧美一区二区三区四区 | 免费成人高清在线视频 | 久久99精品国产99久久6男男 | 亚洲电影免费 | 国产成人一区二区三区精 |