成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Apache Spark源碼走讀之3:Task運(yùn)行期之函數(shù)調(diào)用

數(shù)據(jù)庫 Spark
本篇主要闡述在TaskRunner中執(zhí)行的task其業(yè)務(wù)邏輯是如何被調(diào)用到的,另外試圖講清楚運(yùn)行著的task其輸入的數(shù)據(jù)從哪獲取,處理的結(jié)果返回到哪里,如何返回。

準(zhǔn)備

  1. spark已經(jīng)安裝完畢

  2. spark運(yùn)行在local mode或local-cluster mode

local-cluster mode

local-cluster模式也稱為偽分布式,可以使用如下指令運(yùn)行

  1. MASTER=local[1,2,1024] bin/spark-shell 

[1,2,1024] 分別表示,executor number, core number和內(nèi)存大小,其中內(nèi)存大小不應(yīng)小于默認(rèn)的512M

Driver Programme的初始化過程分析

初始化過程的涉及的主要源文件

  1. SparkContext.scala       整個初始化過程的入口

  2. SparkEnv.scala          創(chuàng)建BlockManager, MapOutputTrackerMaster, ConnectionManager, CacheManager

  3. DAGScheduler.scala       任務(wù)提交的入口,即將Job劃分成各個stage的關(guān)鍵

  4. TaskSchedulerImpl.scala 決定每個stage可以運(yùn)行幾個task,每個task分別在哪個executor上運(yùn)行

  5. SchedulerBackend

    1. 最簡單的單機(jī)運(yùn)行模式的話,看LocalBackend.scala

    2. 如果是集群模式,看源文件SparkDeploySchedulerBackend

初始化過程步驟詳解

步驟1: 根據(jù)初始化入?yún)⑸蒘parkConf,再根據(jù)SparkConf來創(chuàng)建SparkEnv, SparkEnv中主要包含以下關(guān)鍵性組件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManager

  1. private[spark] val env = SparkEnv.create( 
  2.     conf, 
  3.     "", conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt, isDriver = true
  4.     isLocalisLocal = isLocal) 
  5.   SparkEnv.set(env) 

步驟2:創(chuàng)建TaskScheduler,根據(jù)Spark的運(yùn)行模式來選擇相應(yīng)的SchedulerBackend,同時啟動taskscheduler,這一步至為關(guān)鍵

  1. private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName) 
  2.  taskScheduler.start() 

TaskScheduler.start目的是啟動相應(yīng)的SchedulerBackend,并啟動定時器進(jìn)行檢測

  1. override def start() { 
  2.     backend.start() 
  3.  
  4.     if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatcher 
  5.       sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, 
  6.             SPECULATION_INTERVAL milliseconds) { 
  7.         checkSpeculatableTasks() 
  8.       } 
  9.     } 
  10.   } 

步驟3:以上一步中創(chuàng)建的TaskScheduler實例為入?yún)?chuàng)建DAGScheduler并啟動運(yùn)行

  1. @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler) 
  2.   dagScheduler.start() 

步驟4:啟動WEB UI

  1. ui.start() 

RDD的轉(zhuǎn)換過程

還是以最簡單的wordcount為例說明rdd的轉(zhuǎn)換過程

  1. sc.textFile("README.md").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) 

上述一行簡短的代碼其實發(fā)生了很復(fù)雜的RDD轉(zhuǎn)換,下面仔細(xì)解釋每一步的轉(zhuǎn)換過程和轉(zhuǎn)換結(jié)果

步驟1:val rawFile = sc.textFile("README.md")

textFile先是生成hadoopRDD,然后再通過map操作生成MappedRDD,如果在spark-shell中執(zhí)行上述語句,得到的結(jié)果可以證明所做的分析

  1. scala> sc.textFile("README.md") 14/04/23 13:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; 
  2. assuming yes 14/04/23 13:11:48 INFO MemoryStore: ensureFreeSpace(119741) called with curMem=0maxMem=311387750 14/04/23 13:11:48 INFO MemoryStore: 
  3. Block broadcast_0 stored as values to memory (estimated size 116.9 KB, free 296.8 MB) 14/04/23 13:11:48 DEBUG BlockManager: 
  4. Put block broadcast_0 locally took 277 ms 14/04/23 13:11:48 DEBUG BlockManager: Put for block broadcast_0 without replication took 281 ms res0:
  5.  org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13 

步驟2: val splittedText = rawFile.flatMap(line => line.split(" "))

flatMap將原來的MappedRDD轉(zhuǎn)換成為FlatMappedRDD

  1. def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =   
  2. new FlatMappedRDD(this, sc.clean(f)) 

步驟3:val wordCount = splittedText.map(word => (word, 1))

利用word生成相應(yīng)的鍵值對,上一步的FlatMappedRDD被轉(zhuǎn)換成為MappedRDD

步驟4:val reduceJob = wordCount.reduceByKey(_ + _),這一步最復(fù)雜

步驟2,3中使用到的operation全部定義在RDD.scala中,而這里使用到的reduceByKey卻在RDD.scala中見不到蹤跡。reduceByKey的定義出現(xiàn)在源文件PairRDDFunctions.scala

細(xì)心的你一定會問reduceByKey不是MappedRDD的屬性和方法啊,怎么能被MappedRDD調(diào)用呢?其實這背后發(fā)生了一個隱式的轉(zhuǎn)換,該轉(zhuǎn)換將MappedRDD轉(zhuǎn)換成為PairRDDFunctions

  1. implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) = 
  2.     new PairRDDFunctions(rdd) 

這種隱式的轉(zhuǎn)換是scala的一個語法特征,如果想知道的更多,請用關(guān)鍵字"scala implicit method"進(jìn)行查詢,會有不少的文章對此進(jìn)行詳盡的介紹。

接下來再看一看reduceByKey的定義

  1. def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { 
  2.   reduceByKey(defaultPartitioner(self), func) 
  3.  
  4. def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { 
  5.   combineByKey[V]((v: V) => v, func, func, partitioner) 
  6.  
  7. def combineByKey[C](createCombiner: V => C, 
  8.     mergeValue: (C, V) => C, 
  9.     mergeCombiners: (C, C) => C, 
  10.     partitioner: Partitioner, 
  11.     mapSideCombine: Boolean = true
  12.     serializerClass: String = null): RDD[(K, C)] = { 
  13.   if (getKeyClass().isArray) { 
  14.     if (mapSideCombine) { 
  15.       throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner])
  16.  { throw new SparkException("Default partitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
  17.  if (self.partitioner == Some(partitioner)) { self.mapPartitionsWithContext((context, iter) => 
  18. { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } 
  19. else if (mapSideCombine) 
  20. { val combined = self.mapPartitionsWithContext((context, iter) => 
  21. { aggregator.combineValuesByKey(iter, context) }, preservesPartitioning = true) val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializerClass) partitioned.mapPartitionsWithContext((context, iter) =>
  22.  { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) }
  23.  else { // Don't apply map-side combiner. val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitionsWithContext((context, iter) =>
  24.  { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true
  25.   } 

reduceByKey最終會調(diào)用combineByKey, 在這個函數(shù)中PairedRDDFunctions會被轉(zhuǎn)換成為ShuffleRDD,當(dāng)調(diào)用mapPartitionsWithContext之后,shuffleRDD被轉(zhuǎn)換成為MapPartitionsRDD

Log輸出能證明我們的分析

  1. res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13 

RDD轉(zhuǎn)換小結(jié)

小結(jié)一下整個RDD轉(zhuǎn)換過程

HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->PairRDDFunctions->ShuffleRDD->MapPartitionsRDD

整個轉(zhuǎn)換過程好長啊,這一切的轉(zhuǎn)換都發(fā)生在任務(wù)提交之前。

運(yùn)行過程分析

數(shù)據(jù)集操作分類

在對任務(wù)運(yùn)行過程中的函數(shù)調(diào)用關(guān)系進(jìn)行分析之前,我們也來探討一個偏理論的東西,作用于RDD之上的Transformantion為什么會是這個樣子?

對這個問題的解答和數(shù)學(xué)搭上關(guān)系了,從理論抽象的角度來說,任務(wù)處理都可歸結(jié)為“input->processing->output"。input和output對應(yīng)于數(shù)據(jù)集dataset.

在此基礎(chǔ)上作一下簡單的分類

  1. one-one 一個dataset在轉(zhuǎn)換之后還是一個dataset,而且dataset的size不變,如map

  2. one-one 一個dataset在轉(zhuǎn)換之后還是一個dataset,但size發(fā)生更改,這種更改有兩種可能:擴(kuò)大或縮小,如flatMap是size增大的操作,而subtract是size變小的操作

  3. many-one 多個dataset合并為一個dataset,如combine, join

  4. one-many 一個dataset分裂為多個dataset, 如groupBy

Task運(yùn)行期的函數(shù)調(diào)用

task的提交過程參考本系列中的第二篇文章。本節(jié)主要講解當(dāng)task在運(yùn)行期間是如何一步步調(diào)用到作用于RDD上的各個operation

TaskRunner.run

Task.run

Task.runTask (Task是一個基類,有兩個子類,分別為ShuffleMapTask和ResultTask)

RDD.iterator

RDD.computeOrReadCheckpoint

RDD.compute 

或許當(dāng)看到RDD.compute函數(shù)定義時,還是覺著f沒有被調(diào)用,以MappedRDD的compute定義為例

  1. override def compute(split: Partition, context: TaskContext) =                                                                                                       
  2.   firstParent[T].iterator(split, context).map(f)   

注意,這里最容易產(chǎn)生錯覺的地方就是map函數(shù),這里的map不是RDD中的map,而是scala中定義的iterator的成員函數(shù)map, 請自行參考http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.Iterator

堆棧輸出

  1. 80         at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111) 
  2. 81         at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:154) 
  3. 82         at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149) 
  4. 83         at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64) 
  5. 84         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 
  6. 85         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 
  7. 86         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 
  8. 87         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 
  9. 88         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 
  10. 89         at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) 
  11. 90         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 
  12. 91         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 
  13. 92         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 
  14. 93         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 
  15. 94         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 
  16. 95         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) 
  17. 96         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 
  18. 97         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 
  19. 98         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) 
  20. 99         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) 
  21. 100         at org.apache.spark.scheduler.Task.run(Task.scala:53) 
  22. 101         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) 

ResultTask

compute的計算過程對于ShuffleMapTask比較復(fù)雜,繞的圈圈比較多,對于ResultTask就直接許多。

  1. override def runTask(context: TaskContext): U = { 
  2.     metrics = Some(context.taskMetrics) 
  3.     try { 
  4.       func(context, rdd.iterator(split, context)) 
  5.     } finally { 
  6.       context.executeOnCompleteCallbacks() 
  7.     } 
  8.   }  

 計算結(jié)果的傳遞

上面的分析知道,wordcount這個job在最終提交之后,被DAGScheduler分為兩個stage,***個Stage是shuffleMapTask,第二個Stage是ResultTask.

那么ShuffleMapTask的計算結(jié)果是如何被ResultTask取得的呢?這個過程簡述如下

  1. ShffuleMapTask將計算的狀態(tài)(注意不是具體的數(shù)據(jù))包裝為MapStatus返回給DAGScheduler

  2. DAGScheduler將MapStatus保存到MapOutputTrackerMaster中

  3. ResultTask在執(zhí)行到ShuffleRDD時會調(diào)用BlockStoreShuffleFetcher的fetch方法去獲取數(shù)據(jù)

    1. ***件事就是咨詢MapOutputTrackerMaster所要取的數(shù)據(jù)的location

    2. 根據(jù)返回的結(jié)果調(diào)用BlockManager.getMultiple獲取真正的數(shù)據(jù)

BlockStoreShuffleFetcher的fetch函數(shù)偽碼

  1. val blockManager = SparkEnv.get.blockManager 
  2.  
  3. val startTime = System.currentTimeMillis 
  4. val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) 
  5. logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format( shuffleId, reduceId, System.currentTimeMillis - startTime)) 
  6. val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer) val itr = blockFetcherItr.flatMap(unpackBlock)  

注意上述代碼中的getServerStatusesgetMultiple,一個是詢問數(shù)據(jù)的位置,一個是去獲取真正的數(shù)據(jù)。

有關(guān)Shuffle的詳細(xì)解釋,請參考”詳細(xì)探究Spark的shuffle實現(xiàn)一文" http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/

原文鏈接:http://www.cnblogs.com/hseagle/p/3673132.html

 

責(zé)任編輯:彭凡 來源: 博客園
相關(guān)推薦

2014-07-04 10:58:47

Apache Spar

2014-07-03 15:40:09

Apache Spar

2014-07-15 10:59:58

Spark代碼跟讀

2022-08-27 08:02:09

SQL函數(shù)語法

2021-08-09 09:00:00

Kubernetes云計算架構(gòu)

2021-08-30 18:09:57

鴻蒙HarmonyOS應(yīng)用

2011-03-21 10:49:33

LAMPApache

2012-02-22 22:56:19

開源Apache

2014-02-14 15:43:16

ApacheSpark

2015-03-31 18:26:43

陌陌社交

2010-06-04 09:11:10

.NET并行編程

2010-09-16 09:35:17

SQL函數(shù)

2011-03-21 11:33:09

LAMPApache

2021-04-15 08:45:25

Zabbix 5.2Apache監(jiān)控

2022-05-19 15:08:43

技術(shù)函數(shù)調(diào)用棧Linux

2011-06-23 14:27:48

QT QLibrary 動態(tài)庫

2010-06-08 08:41:08

.NET 4并行編程

2010-06-07 08:43:46

.NET 4并行編程

2024-03-14 08:17:33

JVMJava對象

2021-07-10 08:04:07

Reactor模式Netty
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 国产精品高清在线 | 国产免费一级一级 | 亚洲色片网站 | 久久精品二区 | 精品久久久久一区 | 亚洲精品一区二区三区中文字幕 | 在线午夜 | 超碰地址| 亚洲精品中文字幕在线 | 久久伊人久久 | 中文字字幕一区二区三区四区五区 | 中文字幕一区二区三区四区五区 | 韩国毛片一区二区三区 | 粉嫩高清一区二区三区 | 亚洲精品国产偷自在线观看 | 成人精品区 | 看片国产 | 成人免费日韩 | 精品欧美一区二区三区久久久小说 | 先锋资源亚洲 | 亚洲精品在线播放 | 欧美一级二级三级 | 成人在线免费电影 | 凹凸日日摸日日碰夜夜 | 婷婷一级片 | 激情网站在线观看 | 国产一二区视频 | 在线观看视频91 | 国产毛片久久久久久久久春天 | 国产免费又色又爽又黄在线观看 | 成人日韩 | 精品福利视频一区二区三区 | 一区二区三区在线 | 欧 | 日韩高清一区 | 午夜视频免费在线观看 | 男人的天堂在线视频 | 日韩一区二区在线视频 | 日韩爱爱网站 | 自拍 亚洲 欧美 老师 丝袜 | 九九热精品在线 | 日韩成人影院 |