成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Spark踩坑記:共享變量

大數(shù)據(jù) Spark
使用spark過(guò)程當(dāng)中踩過(guò)的一些坑和經(jīng)驗(yàn)。我們知道Spark是多機(jī)器集群部署的,分為Driver/Master/Worker,Master負(fù)責(zé)資源調(diào)度,Worker是不同的運(yùn)算節(jié)點(diǎn),由Master統(tǒng)一調(diào)度。

前言

前面總結(jié)的幾篇spark踩坑博文中,我總結(jié)了自己在使用spark過(guò)程當(dāng)中踩過(guò)的一些坑和經(jīng)驗(yàn)。我們知道Spark是多機(jī)器集群部署的,分為Driver/Master/Worker,Master負(fù)責(zé)資源調(diào)度,Worker是不同的運(yùn)算節(jié)點(diǎn),由Master統(tǒng)一調(diào)度。

而Driver是我們提交Spark程序的節(jié)點(diǎn),并且所有的reduce類型的操作都會(huì)匯總到Driver節(jié)點(diǎn)進(jìn)行整合。節(jié)點(diǎn)之間會(huì)將map/reduce等操作函數(shù)傳遞一個(gè)獨(dú)立副本到每一個(gè)節(jié)點(diǎn),這些變量也會(huì)復(fù)制到每臺(tái)機(jī)器上,而節(jié)點(diǎn)之間的運(yùn)算是相互獨(dú)立的,變量的更新并不會(huì)傳遞回Driver程序。

那么有個(gè)問題,如果我們想在節(jié)點(diǎn)之間共享一份變量,比如一份公共的配置項(xiàng),該怎么辦呢?Spark為我們提供了兩種特定的共享變量,來(lái)完成節(jié)點(diǎn)間變量的共享。 本文首先簡(jiǎn)單的介紹spark以及spark streaming中累加器和廣播變量的使用方式,然后重點(diǎn)介紹一下如何更新廣播變量。

累加器

顧名思義,累加器是一種只能通過(guò)關(guān)聯(lián)操作進(jìn)行“加”操作的變量,因此它能夠高效的應(yīng)用于并行操作中。它們能夠用來(lái)實(shí)現(xiàn)counters和sums。Spark原生支持?jǐn)?shù)值類型的累加器,開發(fā)者可以自己添加支持的類型,在2.0.0之前的版本中,通過(guò)繼承AccumulatorParam來(lái)實(shí)現(xiàn),而2.0.0之后的版本需要繼承AccumulatorV2來(lái)實(shí)現(xiàn)自定義類型的累加器。

如果創(chuàng)建了一個(gè)具名的累加器,它可以在spark的UI中顯示。這對(duì)于理解運(yùn)行階段(running stages)的過(guò)程有很重要的作用。如下圖:

在2.0.0之前版本中,累加器的聲明使用方式如下:

  1. scala> val accum = sc.accumulator(0, "My Accumulator"
  2. accum: spark.Accumulator[Int] = 0 
  3.  
  4. scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) 
  5. ... 
  6. 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s 
  7.  
  8. scala> accum.value 
  9. res2: Int = 10 

累加器的聲明在2.0.0發(fā)生了變化,到2.1.0也有所變化,具體可以參考官方文檔,我們這里以2.1.0為例將代碼貼一下:

  1. scala> val accum = sc.longAccumulator("My Accumulator"
  2. accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, nameSome(My Accumulator), value: 0) 
  3.  
  4. scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) 
  5.  
  6. 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s 
  7.  
  8. scala> accum.value 
  9. res2: Long = 10 

廣播變量

累加器比較簡(jiǎn)單直觀,如果我們需要在spark中進(jìn)行一些全局統(tǒng)計(jì)就可以使用它。但是有時(shí)候僅僅一個(gè)累加器并不能滿足我們的需求,比如數(shù)據(jù)庫(kù)中一份公共配置表格,需要同步給各個(gè)節(jié)點(diǎn)進(jìn)行查詢。OK先來(lái)簡(jiǎn)單介紹下spark中的廣播變量:

廣播變量允許程序員緩存一個(gè)只讀的變量在每臺(tái)機(jī)器上面,而不是每個(gè)任務(wù)保存一份拷貝。例如,利用廣播變量,我們能夠以一種更有效率的方式將一個(gè)大數(shù)據(jù)量輸入集合的副本分配給每個(gè)節(jié)點(diǎn)。Spark也嘗試著利用有效的廣播算法去分配廣播變量,以減少通信的成本。

一個(gè)廣播變量可以通過(guò)調(diào)用SparkContext.broadcast(v)方法從一個(gè)初始變量v中創(chuàng)建。廣播變量是v的一個(gè)包裝變量,它的值可以通過(guò)value方法訪問,下面的代碼說(shuō)明了這個(gè)過(guò)程:

  1. scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) 
  2. broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) 
  3.  
  4. scala> broadcastVar.value 
  5. res0: Array[Int] = Array(1, 2, 3) 

從上文我們可以看出廣播變量的聲明很簡(jiǎn)單,調(diào)用broadcast就能搞定,并且scala中一切可序列化的對(duì)象都是可以進(jìn)行廣播的,這就給了我們很大的想象空間,可以利用廣播變量將一些經(jīng)常訪問的大變量進(jìn)行廣播,而不是每個(gè)任務(wù)保存一份,這樣可以減少資源上的浪費(fèi)。

更新廣播變量(rebroadcast)

廣播變量可以用來(lái)更新一些大的配置變量,比如數(shù)據(jù)庫(kù)中的一張表格,那么有這樣一個(gè)問題,如果數(shù)據(jù)庫(kù)當(dāng)中的配置表格進(jìn)行了更新,我們需要重新廣播變量該怎么做呢。上文對(duì)廣播變量的說(shuō)明中,我們知道廣播變量是只讀的,也就是說(shuō)廣播出去的變量沒法再修改,那么我們應(yīng)該怎么解決這個(gè)問題呢?

答案是利用spark中的unpersist函數(shù)

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

上文是從spark官方文檔摘抄出來(lái)的,我們可以看出,正常來(lái)說(shuō)每個(gè)節(jié)點(diǎn)的數(shù)據(jù)是不需要我們操心的,spark會(huì)自動(dòng)按照LRU規(guī)則將老數(shù)據(jù)刪除,如果需要手動(dòng)刪除可以調(diào)用unpersist函數(shù)。

那么更新廣播變量的基本思路:將老的廣播變量刪除(unpersist),然后重新廣播一遍新的廣播變量,為此簡(jiǎn)單包裝了一個(gè)用于廣播和更新廣播變量的wraper類,如下:

  1. import java.io.{ ObjectInputStream, ObjectOutputStream } 
  2. import org.apache.spark.broadcast.Broadcast 
  3. import org.apache.spark.streaming.StreamingContext 
  4. import scala.reflect.ClassTag 
  5.  
  6. // This wrapper lets us update brodcast variables within DStreams' foreachRDD 
  7. // without running into serialization issues 
  8. case class BroadcastWrapper[T: ClassTag]( 
  9.     @transient private val ssc: StreamingContext, 
  10.     @transient private val _v: T) { 
  11.  
  12.   @transient private var v = ssc.sparkContext.broadcast(_v) 
  13.  
  14.   def update(newValue: T, blocking: Boolean = false): Unit = { 
  15.     // 刪除RDD是否需要鎖定 
  16.     v.unpersist(blocking) 
  17.     v = ssc.sparkContext.broadcast(newValue) 
  18.   } 
  19.  
  20.   def value: T = v.value 
  21.  
  22.   private def writeObject(out: ObjectOutputStream): Unit = { 
  23.     out.writeObject(v) 
  24.   } 
  25.  
  26.   private def readObject(in: ObjectInputStream): Unit = { 
  27.     v = in.readObject().asInstanceOf[Broadcast[T]] 
  28.   } 

利用該wrapper更新廣播變量,大致的處理邏輯如下:

  1. // 定義 
  2. val yourBroadcast = BroadcastWrapper[yourType](ssc, yourValue) 
  3.  
  4. yourStream.transform(rdd => { 
  5.   //定期更新廣播變量 
  6.   if (System.currentTimeMillis - someTime > Conf.updateFreq) { 
  7.     yourBroadcast.update(newValue, true
  8.   } 
  9.   // do something else 
  10. }) 

總結(jié)

spark中的共享變量是我們能夠在全局做出一些操作,比如record總數(shù)的統(tǒng)計(jì)更新,一些大變量配置項(xiàng)的廣播等等。而對(duì)于廣播變量,我們也可以監(jiān)控?cái)?shù)據(jù)庫(kù)中的變化,做到定時(shí)的重新廣播新的數(shù)據(jù)表配置情況,另外我使用上述方式,在每天***的數(shù)據(jù)實(shí)時(shí)流統(tǒng)計(jì)中表現(xiàn)穩(wěn)定,所以有相似問題的同學(xué)也可以進(jìn)行嘗試,有任何問題,歡迎隨時(shí)騷擾溝通。

責(zé)任編輯:武曉燕 來(lái)源: 36大數(shù)據(jù)
相關(guān)推薦

2020-09-15 08:46:26

Kubernetes探針服務(wù)端

2021-10-28 19:10:02

Go語(yǔ)言編碼

2021-09-03 11:15:18

場(chǎng)景sql配置

2022-01-07 11:48:59

RabbitMQGolang 項(xiàng)目

2015-09-07 10:15:53

移動(dòng)端開發(fā)

2021-06-09 08:21:14

Webpack環(huán)境變量前端

2017-10-24 13:02:29

2023-01-18 23:20:25

編程開發(fā)

2023-02-20 08:11:04

2024-04-10 08:39:56

BigDecimal浮點(diǎn)數(shù)二進(jìn)制

2024-04-01 08:05:27

Go開發(fā)Java

2023-09-22 11:29:11

JavasubList

2021-05-27 22:46:00

Nacos Clien版本Nacos

2017-07-17 15:46:20

Oracle并行機(jī)制

2021-10-15 06:49:37

MySQL

2024-10-09 08:09:11

2022-11-18 07:34:12

Docker項(xiàng)目目錄

2025-05-27 01:55:00

MySQL數(shù)據(jù)庫(kù)工具鏈

2023-06-30 08:10:14

JavaBigDecimal

2025-04-29 10:17:42

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 一级在线免费观看 | 亚洲有码转帖 | 天天精品在线 | 欧美成人第一页 | 亚洲精品视频免费 | 久久香蕉网 | 亚洲一区在线日韩在线深爱 | 婷婷91| 久久精品国产亚洲一区二区三区 | 精品一区二区三区电影 | 久久久免费电影 | 亚洲精品电影网在线观看 | 亚洲精选久久 | 国产精品成人69xxx免费视频 | 久久大陆| 成人久久网 | 日韩综合在线 | 亚洲啊v在线 | 欧美国产日韩在线观看 | 欧美在线观看一区 | 国产福利在线 | 久久综合一区 | www.欧美 | 人人种亚洲 | 黑人成人网 | 99re国产精品 | 精精国产xxxx视频在线野外 | 亚洲一区二区三区四区五区中文 | 国产九一精品 | 国产乱码精品一区二区三区忘忧草 | 国产精品波多野结衣 | 国产激情一区二区三区 | 在线观看亚洲专区 | 91免费在线播放 | 综合九九| 欧美a√ | 国产1区| 黄视频网站免费观看 | 亚洲国产一区二区三区在线观看 | av在线播放网站 | 玖玖在线精品 |