Flink 原理與實現(xiàn):架構(gòu)和拓撲概覽
要了解一個系統(tǒng),一般都是從架構(gòu)開始。我們關(guān)心的問題是:系統(tǒng)部署成功后各個節(jié)點都啟動了哪些服務,各個服務之間又是怎么交互和協(xié)調(diào)的。下方是 Flink 集群啟動后架構(gòu)圖。
當 Flink 集群啟動后,首先會啟動一個 JobManger 和一個或多個的 TaskManager。由 Client 提交任務給 JobManager,JobManager 再調(diào)度任務到各個 TaskManager 去執(zhí)行,然后 TaskManager 將心跳和統(tǒng)計信息匯報給 JobManager。TaskManager 之間以流的形式進行數(shù)據(jù)的傳輸。上述三者均為獨立的 JVM 進程。
- Client 為提交 Job 的客戶端,可以是運行在任何機器上(與 JobManager 環(huán)境連通即可)。提交 Job 后,Client 可以結(jié)束進程(Streaming的任務),也可以不結(jié)束并等待結(jié)果返回。
- JobManager 主要負責調(diào)度 Job 并協(xié)調(diào) Task 做 checkpoint,職責上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包等資源后,會生成優(yōu)化后的執(zhí)行計劃,并以 Task 的單元調(diào)度到各個 TaskManager 去執(zhí)行。
- TaskManager 在啟動的時候就設置好了槽位數(shù)(Slot),每個 slot 能啟動一個 Task,Task 為線程。從 JobManager 處接收需要部署的 Task,部署啟動后,與自己的上游建立 Netty 連接,接收數(shù)據(jù)并處理。
可以看到 Flink 的任務調(diào)度是多線程模型,并且不同Job/Task混合在一個 TaskManager 進程中。雖然這種方式可以有效提高 CPU 利用率,但是個人不太喜歡這種設計,因為不僅缺乏資源隔離機制,同時也不方便調(diào)試。類似 Storm 的進程模型,一個JVM 中只跑該 Job 的 Tasks 實際應用中更為合理。
Job 例子
本文所示例子為 flink-1.0.x 版本
我們使用 Flink 自帶的 examples 包中的 SocketTextStreamWordCount ,這是一個從 socket 流中統(tǒng)計單詞出現(xiàn)次數(shù)的例子。
-
首先,使用 netcat 啟動本地服務器:
$ nc -l 9000
-
然后提交 Flink 程序
$ bin/flink run examples/streaming/SocketTextStreamWordCount.jar \ --hostname 10.218.130.9 \ --port 9000
在netcat端輸入單詞并監(jiān)控 taskmanager 的輸出可以看到單詞統(tǒng)計的結(jié)果。
SocketTextStreamWordCount 的具體代碼如下:
public static void main(String[] args) throws Exception{ // 檢查輸入 final ParameterTool params = ParameterTool.fromArgs(args); ... // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data DataStream<String> text = env.socketTextStream(params.get("hostname"), params.getInt("port"), '\n', 0); DataStream<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .keyBy(0) .sum(1); counts.print(); // execute program env.execute("WordCount from SocketTextStream Example"); }
我們將***一行代碼 env.execute 替換成 System.out.println(env.getExecutionPlan()); 并在本地運行該代碼(并發(fā)度設為2),可以得到該拓撲的邏輯執(zhí)行計劃圖的 JSON 串,將該 JSON 串粘貼到 http://flink.apache.org/visualizer/ 中,能可視化該執(zhí)行圖。
但這并不是最終在 Flink 中運行的執(zhí)行圖,只是一個表示拓撲節(jié)點關(guān)系的計劃圖,在 Flink 中對應了 SteramGraph。另外,提交拓撲后(并發(fā)度設為2)還能在 UI 中看到另一張執(zhí)行計劃圖,如下所示,該圖對應了 Flink 中的 JobGraph。
Graph
看起來有點亂,怎么有這么多不一樣的圖。實際上,還有更多的圖。Flink 中的執(zhí)行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執(zhí)行圖。
- StreamGraph: 是根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結(jié)構(gòu)。
- JobGraph: StreamGraph經(jīng)過優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個符合條件的節(jié)點 chain 在一起作為一個節(jié)點,這樣可以減少數(shù)據(jù)在節(jié)點之間流動所需要的序列化/反序列化/傳輸消耗。
- ExecutionGraph: JobManager 根據(jù) JobGraph 生成的分布式執(zhí)行圖,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。
- 物理執(zhí)行圖: JobManager 根據(jù) ExecutionGraph 對 Job 進行調(diào)度后,在各個TaskManager 上部署 Task 后形成的“圖”,并不是一個具體的數(shù)據(jù)結(jié)構(gòu)。
例如上文中的2個并發(fā)度(Source為1個并發(fā)度)的 SocketTextStreamWordCount 四層執(zhí)行圖的演變過程如下圖所示(點擊查看大圖):
這里對一些名詞進行簡單的解釋。
-
StreamGraph:根據(jù)用戶通過 Stream API 編寫的代碼生成的最初的圖。
- StreamNode:用來代表 operator 的類,并具有所有相關(guān)的屬性,如并發(fā)度、入邊和出邊等。
- StreamEdge:表示連接兩個StreamNode的邊。
-
JobGraph:StreamGraph經(jīng)過優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。
- JobVertex:經(jīng)過優(yōu)化后符合條件的多個StreamNode可能會chain在一起生成一個JobVertex,即一個JobVertex包含一個或多個operator,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet。
- IntermediateDataSet:表示JobVertex的輸出,即經(jīng)過operator處理產(chǎn)生的數(shù)據(jù)集。producer是JobVertex,consumer是JobEdge。
- JobEdge:代表了job graph中的一條數(shù)據(jù)傳輸通道。source 是 IntermediateDataSet,target 是 JobVertex。即數(shù)據(jù)通過JobEdge由IntermediateDataSet傳遞給目標JobVertex。
-
ExecutionGraph:JobManager 根據(jù) JobGraph 生成的分布式執(zhí)行圖,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。
- ExecutionJobVertex:和JobGraph中的JobVertex一一對應。每一個ExecutionJobVertex都有和并發(fā)度一樣多的 ExecutionVertex。
- ExecutionVertex:表示ExecutionJobVertex的其中一個并發(fā)子任務,輸入是ExecutionEdge,輸出是IntermediateResultPartition。
- IntermediateResult:和JobGraph中的IntermediateDataSet一一對應。每一個IntermediateResult的IntermediateResultPartition個數(shù)等于該operator的并發(fā)度。
- IntermediateResultPartition:表示ExecutionVertex的一個輸出分區(qū),producer是ExecutionVertex,consumer是若干個ExecutionEdge。
- ExecutionEdge:表示ExecutionVertex的輸入,source是IntermediateResultPartition,target是ExecutionVertex。source和target都只能是一個。
- Execution:是執(zhí)行一個 ExecutionVertex 的一次嘗試。當發(fā)生故障或者數(shù)據(jù)需要重算的情況下 ExecutionVertex 可能會有多個 ExecutionAttemptID。一個 Execution 通過 ExecutionAttemptID 來唯一標識。JM和TM之間關(guān)于 task 的部署和 task status 的更新都是通過 ExecutionAttemptID 來確定消息接受者。
-
物理執(zhí)行圖:JobManager 根據(jù) ExecutionGraph 對 Job 進行調(diào)度后,在各個TaskManager 上部署 Task 后形成的“圖”,并不是一個具體的數(shù)據(jù)結(jié)構(gòu)。
- Task:Execution被調(diào)度后在分配的 TaskManager 中啟動對應的 Task。Task 包裹了具有用戶執(zhí)行邏輯的 operator。
- ResultPartition:代表由一個Task的生成的數(shù)據(jù),和ExecutionGraph中的IntermediateResultPartition一一對應。
- ResultSubpartition:是ResultPartition的一個子分區(qū)。每個ResultPartition包含多個ResultSubpartition,其數(shù)目要由下游消費 Task 數(shù)和 DistributionPattern 來決定。
- InputGate:代表Task的輸入封裝,和JobGraph中JobEdge一一對應。每個InputGate消費了一個或多個的ResultPartition。
- InputChannel:每個InputGate會包含一個以上的InputChannel,和ExecutionGraph中的ExecutionEdge一一對應,也和ResultSubpartition一對一地相連,即一個InputChannel接收一個ResultSubpartition的輸出。
那么 Flink 為什么要設計這4張圖呢,其目的是什么呢?Spark 中也有多張圖,數(shù)據(jù)依賴圖以及物理執(zhí)行的DAG。其目的都是一樣的,就是解耦,每張圖各司其職,每張圖對應了 Job 不同的階段,更方便做該階段的事情。我們給出更完整的 Flink Graph 的層次圖。
首先我們看到,JobGraph 之上除了 StreamGraph 還有 OptimizedPlan。OptimizedPlan 是由 Batch API 轉(zhuǎn)換而來的。StreamGraph 是由 Stream API 轉(zhuǎn)換而來的。為什么 API 不直接轉(zhuǎn)換成 JobGraph?因為,Batch 和 Stream 的圖結(jié)構(gòu)和優(yōu)化方法有很大的區(qū)別,比如 Batch 有很多執(zhí)行前的預分析用來優(yōu)化圖的執(zhí)行,而這種優(yōu)化并不普適于 Stream,所以通過 OptimizedPlan 來做 Batch 的優(yōu)化會更方便和清晰,也不會影響 Stream。JobGraph 的責任就是統(tǒng)一 Batch 和 Stream 的圖,用來描述清楚一個拓撲圖的結(jié)構(gòu),并且做了 chaining 的優(yōu)化,chaining 是普適于 Batch 和 Stream 的,所以在這一層做掉。ExecutionGraph 的責任是方便調(diào)度和各個 tasks 狀態(tài)的監(jiān)控和跟蹤,所以 ExecutionGraph 是并行化的 JobGraph。而“物理執(zhí)行圖”就是最終分布式在各個機器上運行著的tasks了。所以可以看到,這種解耦方式極大地方便了我們在各個層所做的工作,各個層之間是相互隔離的。