Flink分布式程序的異常處理
本文轉載自微信公眾號「逸言」,作者逸言。轉載本文請聯系逸言公眾號。
在我們的數據平臺產品中,為了簡化開發,對Flink做了一層封裝,定義了Job和Flow的抽象。一個Job其實就是Flink的一個作業,每個Job可以定義多個Flow,一個Flow可以理解為是Flink的一個DataStream,利用Job傳遞的StreamExecutionEnvironment可以在Flow中添加包括Source與Sink的多個算子。
Job與Flow之間的關系可以利用自定義的@JobFlow注解進行配置,如此就可以在執行抽象的AbstractJob的run()方法時,利用反射獲得該Job下的所有Flow,遍歷執行每個Flow的run()方法。在Flow的run()方法中,才會真正根據StreamExecutionEnvironment執行多個算子。
Flink為了保證計算的穩定性,提供了不同的重啟策略。例如,當我們將重啟策略設置為失敗率(failure-rate)時,如果執行的任務出錯次數達到了失敗率配置的要求,Flink的Worker節點的TaskManager就會重啟。如果超過重啟次數,Task Manager就會停止運行。
失敗的原因可能有很多,例如資源不足、網絡通信出現故障等Flink集群環境導致的故障,但是也可能是我們編寫的作業在處理流式數據時,因為處理數據不當拋出了業務異常,使得Flink將其視為一次失敗。
為了減少因為業務原因拋出異常導致Task Manager的不必要重啟,需要規定我們編寫的Flink程序的異常處理機制。由于封裝了Flink的Job,從一開始,我就考慮一勞永逸地解決業務異常的問題,即在AbstractJob的run()方法中,捕獲我們自定義的業務異常,在日志記錄了錯誤信息后,把該異常“吃”掉,避免異常的拋出導致執行失敗,造成TaskManager的重啟,如:
- public abstract class AbstractFlow implements Flow {
- public void run() {
- try {
- runBare();
- } catch (DomainException ex) {
- //...
- }
- }
- protected abstract void runBare();
- }
哪知道這一處理機制壓根兒就無法捕獲業務異常!為什么呢?這就要從Flink的分布式機制說起了。
在Flink集群上執行任務,需要Client將作業提交給Flink集群的Master節點。Master的Dispatcher接收到Job并啟動JobManager,通過解析Job的邏輯視圖,了解Job對資源的要求,然后向ResourceManager(Standalone模式,如果是YARN,則由YARN管理和調度資源)申請本次Job需要的資源。JobManager將Job的邏輯視圖轉換為物理視圖,并將計算任務分發部署到Flink集群的TaskManager上。整個執行過程如下圖所示:
我們封裝的一個Flow,在物理視圖中,其實就是一個作業,即前面所說的計算任務。一個作業可以包含多個算子。如果相鄰算子之間不存在數據Shuffle、并行度相同,則會合并為算子鏈(Operator Chain)。每個算子或算子鏈組成一個JobVertex,在執行時作為一個任務(Task)。根據并行度的設置,每個任務包含并行度數目的子任務(SubTask),這些子任務就是作業調度的最小邏輯單元,對應于進程資源中的一個線程,在Flink中,就是一個Slot(如果不考慮Slot共享的話)。
假定Flink環境的并行度設置為1,作業的前面兩個算子滿足合并算子鏈的要求,且并行度設置為2;之后,通過keyBy()之類的算子完成了數據的Shuffle,然后再合并到同一個Sink中。那么它們的關系如下圖所示:
顯然,Flink集群在執行作業時,會對作業進行劃分,并將劃分后的各個子任務分發到TaskManager中的每個Slot。一個TaskManager就是一個JVM,Slot則是進程中的一個線程。
答案不言而喻。AbstractFlow之所以無法捕獲到各個算子執行任務時拋出的業務異常,是因為它們根本就沒有執行在一個JVM上,也沒有運行在同一個線程中。這正是分布式開發與本地開發的本質區別。如果不了解Flink的執行原理,可能就會困惑Java的異常處理機制為何不生效。在進行分布式開發時,如果還是照搬本地開發的經驗,可能真的會撞得頭碰血流才會看清真相。因此,正確的做法是在每個算子的實現中捕獲各自的異常,也就是要保證每個算子自身都是健壯的,如此才能保證作業盡可能健壯。
當然,分布式開發與本地開發的本質區別不只限于此,例如分布式開發跨進程調用對序列化的要求,對數據一致性的不同要求,對異步通信機制以及阻塞調用的認識,都可能給程序員帶來不同的體驗。歸根結底,了解分布式開發或分布式系統的底層原理,可以讓我們盡早看到真相,避免調到坑里而不自知。