一篇文章看懂 Spark RDD
1 簡介
Apache Spark 是專為大規(guī)模數(shù)據(jù)處理而設(shè)計的快速通用的計算引擎。它產(chǎn)生于 UC Berkeley AMP Lab,繼承了 MapReduce 的優(yōu)點,但是不同于 MapReduce 的是,Spark 可以將結(jié)果保存在內(nèi)存中,一直迭代計算下去,除非遇到 shuffle 。因此 Spark 能更好的適用于數(shù)據(jù)挖掘與機器學(xué)習(xí)等要迭代的算法。值得注意的是,官網(wǎng)說的 Spark 是 MR 計算速度的 100 倍。僅僅適用于邏輯回歸等這樣的迭代計算。

2 Spark 的運行模式
- Local 模式:多用于本機編寫、測試代碼。
- Standalone 模式:這是 Spark 自帶的資源調(diào)度框架,它支持完全分布式。
- Yarn 模式:這是 hadoop 里面的一個資源調(diào)度框架,Spark 同樣也可以使用。
- Mesos 模式:為應(yīng)用程序(如Hadoop、Spark、Kafka、ElasticSearch)提供API的整個數(shù)據(jù)中心和云環(huán)境中的資源管理和調(diào)度。
下面分別介紹一下 Standalone 和 Yarn 模式下任務(wù)流程。
Standalone-client 提交方式
提交命令如下:以官方給的計算 PI 的代碼為例。
- ./spark-submit
- --master spark://node1:7077
- --class org.apache.spark.example.SaprkPi
- ../lib/spark-examples-1.6.0-hadoop2.6.0.jar
- 1000
執(zhí)行流程圖以及原理:

Standalone-cluster 提交方式
提交命令如下:以官方給的計算 PI 的代碼為例。
- ./spark-submit
- --master spark://node1:7077
- --deploy-mode cluster
- --class org.apache.spark.example.SaprkPi
- ../lib/spark-examples-1.6.0-hadoop2.6.0.jar
- 1000
執(zhí)行流程圖以及原理:

Yarn-cluster 提交方式
提交命令如下:以官方給的計算 PI 的代碼為例。
- ./spark-submit
- --master yarn
- --deploy-mode client
- --class org.apache.spark.example.SaprkPi
- ../lib/spark-examples-1.6.0-hadoop2.6.0.jar
- 1000
執(zhí)行流程圖以及原理:

Yarn-cluster 提交方式
提交命令如下:以官方給的計算 PI 的代碼為例。
- ./spark-submit
- --master yarn
- --deploy-mode cluster
- --class org.apache.spark.example.SaprkPi
- ../lib/spark-examples-1.6.0-hadoop2.6.0.jar
- 1000
執(zhí)行流程圖以及原理:

3 RDD
Spark core 最核心的就是 Resilient Distributed Dataset (RDD) 了,RDD 比較抽象了。源碼中 RDD.scala 中對 RDD 進行了一段描述。最主要的是下面的五個方面;
- /**
- * Internally, each RDD is characterized by five main properties:
- *
- * - A list of partitions
- * - A function for computing each split
- * - A list of dependencies on other RDDs
- * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
- * an HDFS file)
- *
- * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
- * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
- * reading data from a new storage system) by overriding these functions.
- */
RDD 的五大特性:
- RDD 是由一系列的 Partition 組成的。
- 函數(shù)作用在每一個 split 上。
- RDD 之間有一系列依賴關(guān)系。
- 分區(qū)器是作用在 K,V 格式的 RDD 上。
- RDD 提供一系列***的位置
先記住這五個特性,之后的學(xué)習(xí)會慢慢體會到這樣設(shè)計的好處。下面是理解 RDD 的邏輯圖;

看這個圖再回頭理解一下上面的五個 RDD 的特性。
RDD 的彈性表現(xiàn)在 Partition 的數(shù)量上,并且大小沒有限制。RDD 的依賴關(guān)系,可以基于上一個 RDD 計算出下一個 RDD。RDD 的每個 partition 是分布在不同數(shù)據(jù)節(jié)點上的,所有 RDD 的分布式的。RDD 提供了一些列的***的計算位置,體現(xiàn)了數(shù)據(jù)的本地化,我之前的這篇文章寫過:一文搞懂?dāng)?shù)據(jù)本地化級別
RDD 還有一個 Lineage 的東西,叫做血統(tǒng)。
Lineage 簡介:利用內(nèi)存加快數(shù)據(jù)加載,在其它的In-Memory類數(shù)據(jù)庫或Cache類系統(tǒng)中也有實現(xiàn)。Spark的主要區(qū)別在于它采用血統(tǒng)來實現(xiàn)分布式運算環(huán)境下的數(shù)據(jù)容錯性(節(jié)點失效、數(shù)據(jù)丟失)問題。
RDD Lineage被稱為RDD運算圖或RDD依賴關(guān)系圖,是RDD所有父RDD的圖。它是在RDD上執(zhí)行transformations函數(shù)并創(chuàng)建邏輯執(zhí)行計劃(logical execution plan)的結(jié)果,是RDD的邏輯執(zhí)行計劃。
相比其它系統(tǒng)的細(xì)顆粒度的內(nèi)存數(shù)據(jù)更新級別的備份或者LOG機制,RDD 的 Lineage 記錄的是粗顆粒度的特定數(shù)據(jù)轉(zhuǎn)換(Transformation)操作(filter, map, join etc.)行為。當(dāng)這個 RDD 的部分分區(qū)數(shù)據(jù)丟失時,它可以通過Lineage找到丟失的父RDD的分區(qū)進行局部計算來恢復(fù)丟失的數(shù)據(jù),這樣可以節(jié)省資源提高運行效率。這種粗顆粒的數(shù)據(jù)模型,限制了Spark的運用場合,但同時相比細(xì)顆粒度的數(shù)據(jù)模型,也帶來了性能的提升。
4 控制算子
控制算子有三種:cache, persist, checkpoint, 以上算子都可以將 RDD 持久化、持久化的單位是 Partition。
cache 和 persist 都是懶執(zhí)行的,必須有一個 action 算子來觸發(fā)他們執(zhí)行。checkpoint 不僅可以將 RDD 持久化到磁盤,還能切斷 RDD 之間的依賴關(guān)系。
說幾點區(qū)別:
- cache 的持久化級別是 Memory_Only,就這一個。
- persist 的持久化級別:常用的有Memory_Only 和Memory_and_Disk_2, 數(shù)字 2 表示副本數(shù)。
- checkpoint 主要是用來做容錯的。
checkpoint 的執(zhí)行原理是:當(dāng) RDD 的 job 執(zhí)行完畢之后,會從 finalRDD 進行回溯。當(dāng)回溯到某一個 RDD 調(diào)用了 checkpoint 方法,會對當(dāng)前的 RDD 做一個標(biāo)記。Spark 框架會自動啟動一個新的 Job ,重新計算這個 RDD 的數(shù)據(jù),將數(shù)據(jù)持久化到 HDFS 上。根據(jù)這個原理,我們可以進行優(yōu)化,對 RDD 進行 checkpoint 之前,***先對這個 RDD 進行 cache, 這樣啟動新的 job 只需要將內(nèi)存中的數(shù)據(jù)拷貝到 HDFS 上就可以了,節(jié)省了重新計算這一步。
5 RDD 的依賴關(guān)系
窄依賴:指父RDD的每一個分區(qū)最多被一個子RDD的分區(qū)所用,表現(xiàn)為一個父RDD的分區(qū)對應(yīng)于一個子RDD的分區(qū),和兩個父RDD的分區(qū)對應(yīng)于一個子RDD 的分區(qū)。圖中,map/filter/union屬于***類,對輸入進行協(xié)同劃分(co-partitioned)的join屬于第二類。窄依賴不會產(chǎn)生 shuffle。
寬依賴:指子RDD的分區(qū)依賴于父RDD的所有分區(qū),這是因為 shuffle 類操作,如圖中的 groupByKey 和未經(jīng)協(xié)同劃分的 join。 遇到寬依賴會產(chǎn)生 shuffle 。
上面我們說到了 RDD 之間的依賴關(guān)系,這些依賴關(guān)系形成了一個人 DAG 有向無環(huán)圖。DAG 創(chuàng)建完成之后,會被提交給 DAGScheduler, 它負(fù)責(zé)把 DAG 劃分相互依賴的多個 stage ,劃分依據(jù)就是 RDD 之間的窄寬依賴。換句話說就是,遇到一個寬依賴就劃分一個 stage,每一個 stage 包含一個或多個 stask 任務(wù)。然后將這些 task 以 taskset 的方式提交給 TaskScheduler 運行。也可以說 stage 是由一組并行的 task 組成。下圖很清楚的描述了 stage 的劃分。

6 Stage劃分思路
接上圖,Spark 劃分 stage 的整體思路是:從后往前推,遇到寬依賴就斷開,劃分為一個stage;遇到窄依賴就將這個 RDD 加入該 stage 中。
因此在圖中 RDD C, RDD D, RDD E, RDD F 被構(gòu)建在一個 stage 中, RDD A被構(gòu)建在一個單獨的Stage中,而 RDD B 和 RDD G 又被構(gòu)建在同一個 stage中。
另一個角度
一個 Job 會被拆分為多組 Task,每組任務(wù)被稱為一個Stage就像 Map Stage,Reduce Stage。
Stage 的劃分簡單的說是以 shuffle 和 result 這兩種類型來劃分。在 Spark中有兩類 task,一類是 shuffleMapTask,一類是 resultTask,***類 task的輸出是 shuffle 所需數(shù)據(jù),第二類 task 的輸出是 result,stage的劃分也以此為依據(jù),shuffle 之前的所有變換是一個 stage,shuffle之后的操作是另一個stage。
如果 job 中有多次 shuffle,那么每個 shuffle 之前都是一個 stage. 會根據(jù) RDD 之間的依賴關(guān)系將 DAG圖劃分為不同的階段,對于窄依賴,由于 partition 依賴關(guān)系的確定性,partition 的轉(zhuǎn)換處理就可以在同一個線程里完成,窄依賴就被 spark 劃分到同一個 stage 中,而對于寬依賴,只能等父 RDD shuffle 處理完成后,下一個 stage 才能開始接下來的計算。之所以稱之為 ShuffleMapTask 是因為它需要將自己的計算結(jié)果通過 shuffle 到下一個 stage 中。