成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Spark Streaming場景應用- Spark Streaming計算模型及監控

大數據 Spark
本篇文章主要介紹了Spark Streaming在實際應用場景中的兩種計算模型,包括無狀態模型以及狀態模型;并且重點關注了下Spark Streaming在監控方面所作的努力。

摘要

Spark Streaming是一套優秀的實時計算框架。其良好的可擴展性、高吞吐量以及容錯機制能夠滿足我們很多的場景應用。本篇結合我們的應用場景,介結我們在使用Spark Streaming方面的技術架構,并著重講解Spark Streaming兩種計算模型,無狀態和狀態計算模型以及該兩種模型的注意事項;接著介紹了Spark Streaming在監控方面所做的一些事情,***總結了Spark Streaming的優缺點。

[[193096]]

一、概述

數據是非常寶貴的資源,對各級企事業單均有非常高的價值。但是數據的爆炸,導致原先單機的數據處理已經無法滿足業務的場景需求。因此在此基礎上出現了一些優秀的分布式計算框架,諸如Hadoop、Spark等。離線分布式處理框架雖然能夠處理非常大量的數據,但是其遲滯性很難滿足一些特定的需求場景,比如push反饋、實時推薦、實時用戶行為等。為了滿足這些場景,使數據處理能夠達到實時的響應和反饋,又隨之出現了實時計算框架。目前的實時處理框架有Apache Storm、Apache Flink以及Spark Streaming等。其中Spark Streaming由于其本身的擴展性、高吞吐量以及容錯能力等特性,并且能夠和離線各種框架有效結合起來,因而是當下是比較受歡迎的一種流式處理框架。

根據其官方文檔介紹,Spark Streaming 有高擴展性、高吞吐量和容錯能力強的特點。Spark Streaming 支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入后可以用 Spark 的高度抽象原語如:map、reduce、join、window 等進行運算。而結果也能保存在很多地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx ***融合。其架構見下圖:

Spark Streaming 其優秀的特點給我們帶來很多的應用場景,如網站監控和網絡監控、異常監測、網頁點擊、用戶行為、用戶遷移等。本文中,將為大家詳細介紹,我們的應用場景中,Spark Streaming的技術架構、兩種狀態模型以及Spark Streaming監控等。

二、應用場景

在 Spark Streaming 中,處理數據的單位是一批而不是單條,而數據采集卻是逐條進行的,因此 Spark Streaming 系統需要設置間隔使得數據匯總到一定的量后再一并操作,這個間隔就是批處理間隔。批處理間隔是 Spark Streaming 的核心概念和關鍵參數,它決定了 Spark Streaming 提交作業的頻率和數據處理的延遲,同時也影響著數據處理的吞吐量和性能。

2.1 框架

目前我們Spark Streaming的業務應用場景包括異常監測、網頁點擊、用戶行為以及用戶地圖遷徙等場景。按計算模型來看大體可分為無狀態的計算模型以及狀態計算模型兩種。在實際的應用場景中,我們采用Kafka作為實時輸入源,Spark Streaming作為計算引擎處理完數據之后,再持久化到存儲中,包括MySQL、HDFS、ElasticSearch以及MongoDB等;同時Spark Streaming 數據清洗后也會寫入Kafka,然后經由Flume持久化到HDFS;接著基于持久化的內容做一些UI的展現。架構見下圖:

2.2 無狀態模型

無狀態模型只關注當前新生成的DStream數據,所以的計算邏輯均基于該批次的數據進行處理。無狀態模型能夠很好地適應一些應用場景,比如網站點擊實時排行榜、指定batch時間段的用戶訪問以及點擊情況等。該模型由于沒有狀態,并不需要考慮有狀態的情況,只需要根據業務場景保證數據不丟就行。此種情況一般采用Direct方式讀取Kafka數據,并采用監聽器方式持久化Offsets即可。具體流程如下:

其上模型框架包含以下幾個處理步驟:

  • 讀取Kafka實時數據;
  • Spark Streaming Transformations以及actions操作;
  • 將數據結果持久化到存儲中,跳轉到步驟一。

受網絡、集群等一些因素的影響,實時程序出現長時失敗,導致數據出現堆積。此種情況下是丟掉堆積的數據從Kafka largest處消費還是從之前的Kafka offsets處消費,這個取決具體的業務場景。

2.3 狀態模型

有狀態模型是指DStreams在指定的時間范圍內有依賴關系,具體的時間范圍由業務場景來指定,可以是2個及以上的多個batch time RDD組成。Spark Streaming提供了updateStateByKey方法來滿足此類的業務場景。因涉及狀態的問題,所以在實際的計算過程中需要保存計算的狀態,Spark Streaming中通過checkpoint來保存計算的元數據以及計算的進度。該狀態模型的應用場景有網站具體模塊的累計訪問統計、最近N batch time 的網站訪問情況以及app新增累計統計等等。具體流程如下:

上述流程中,每batch time計算時,需要依賴最近2個batch time內的數據,經過轉換及相關統計,最終持久化到MySQL中去。不過為了確保每個計算僅計算2個batch time內的數據,需要維護數據的狀態,清除過期的數據。我們先來看下updateStateByKey的實現,其代碼如下:

  • 暴露了全局狀態數據中的key類型的方法。
  1. def updateStateByKey[S: ClassTag]( 
  2.       updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], 
  3.       partitioner: Partitioner, 
  4.       rememberPartitioner: Boolean 
  5.     ): DStream[(K, S)] = ssc.withScope { 
  6.      new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None) 

隱藏了全局狀態數據中的key類型,僅對Value提供自定義的方法。

  1. def updateStateByKey[S: ClassTag]( 
  2.       updateFunc: (Seq[V], Option[S]) => Option[S], 
  3.       partitioner: Partitioner, 
  4.       initialRDD: RDD[(K, S)] 
  5.     ): DStream[(K, S)] = ssc.withScope { 
  6.     val cleanedUpdateF = sparkContext.clean(updateFunc) 
  7.     val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { 
  8.       iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s))) 
  9.     } 
  10.     updateStateByKey(newUpdateFunc, partitioner, true, initialRDD) 

以上兩種方法分別給我們提供清理過期數據的思路:

  • 泛型K進行過濾。K表示全局狀態數據中對應的key,如若K不滿足指定條件則反回false;
  • 返回值過濾。第二個方法中自定義函數指定了Option[S]返回值,若過期數據返回None,那么該數據將從全局狀態中清除。

三、Spark Streaming監控

同Spark一樣,Spark Streaming也提供了Jobs、Stages、Storage、Enviorment、Executors以及Streaming的監控,其中Streaming監控頁的內容如下圖:

上圖是Spark UI中提供一些數據監控,包括實時輸入數據、Scheduling Delay、處理時間以及總延遲的相關監控數據的趨勢展現。另外除了提供上述數據監控外,Spark UI還提供了Active Batches以及Completed Batches相關信息。Active Batches包含當前正在處理的batch信息以及堆積的batch相關信息,而Completed Batches剛提供每個batch處理的明細數據,具體包括batch time、input size、scheduling delay、processing Time、Total Delay等,具體信息見下圖:

Spark Streaming能夠提供如此優雅的數據監控,是因在對監聽器設計模式的使用。如若Spark UI無法滿足你所需的監控需要,用戶可以定制個性化監控信息。Spark Streaming提供了StreamingListener特質,通過繼承此方法,就可以定制所需的監控,其代碼如下:

  1. @DeveloperApi 
  2.     trait StreamingListener { 
  3.  
  4.       /** Called when a receiver has been started */ 
  5.       def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { } 
  6.  
  7.       /** Called when a receiver has reported an error */ 
  8.       def onReceiverError(receiverError: StreamingListenerReceiverError) { } 
  9.  
  10.       /** Called when a receiver has been stopped */ 
  11.       def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { } 
  12.  
  13.       /** Called when a batch of jobs has been submitted for processing. */ 
  14.       def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { } 
  15.  
  16.       /** Called when processing of a batch of jobs has started.  */ 
  17.       def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { } 
  18.  
  19.       /** Called when processing of a batch of jobs has completed. */ 
  20.       def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } 
  21.  
  22.       /** Called when processing of a job of a batch has started. */ 
  23.       def onOutputOperationStarted( 
  24.           outputOperationStarted: StreamingListenerOutputOperationStarted) { } 
  25.  
  26.       /** Called when processing of a job of a batch has completed. */ 
  27.       def onOutputOperationCompleted( 
  28.           outputOperationCompleted: StreamingListenerOutputOperationCompleted) { } 
  29.     } 

目前,我們保存Offsets時,采用繼承StreamingListener方式,此是一種應用場景。當然也可以監控實時計算程序的堆積情況,并在達到一閾值后發送報警郵件。具體監聽器的定制還得依據應用場景而定。

四、Spark Streaming優缺點

Spark Streaming并非是Storm那樣,其并非是真正的流式處理框架,而是一次處理一批次數據。也正是這種方式,能夠較好地集成Spark 其他計算模塊,包括MLlib(機器學習)、Graphx以及Spark SQL。這給實時計算帶來很大的便利,與此帶來便利的同時,也犧牲作為流式的實時性等性能。

4.1 優點

  • Spark Streaming基于Spark Core API,因此其能夠與Spark中的其他模塊保持良好的兼容性,為編程提供了良好的可擴展性;
  • Spark Streaming 是粗粒度的準實時處理框架,一次讀取完或異步讀完之后處理數據,且其計算可基于大內存進行,因而具有較高的吞吐量;
  • Spark Streaming采用統一的DAG調度以及RDD,因此能夠利用其lineage機制,對實時計算有很好的容錯支持;
  • Spark Streaming的DStream是基于RDD的在流式數據處理方面的抽象,其transformations 以及actions有較大的相似性,這在一定程度上降低了用戶的使用門檻,在熟悉Spark之后,能夠快速上手Spark Streaming。

4.2 缺點

  • Spark Streaming是準實時的數據處理框架,采用粗粒度的處理方式,當batch time到時才會觸發計算,這并非像Storm那樣是純流式的數據處理方式。此種方式不可避免會出現相應的計算延遲 。
  • 目前來看,Spark Streaming穩定性方面還是會存在一些問題。有時會因一些莫名的異常導致退出,這種情況下得需要自己來保證數據一致性以及失敗重啟功能等。

四、總結

本篇文章主要介紹了Spark Streaming在實際應用場景中的兩種計算模型,包括無狀態模型以及狀態模型;并且重點關注了下Spark Streaming在監控方面所作的努力。首先本文介紹了Spark Streaming應用場景以及在我們的實際應用中所采取的技術架構。在此基礎上,引入無狀態計算模型以及有狀態模型兩種計算模型;接著通過監聽器模式介紹Spark UI相關監控信息等;***對Spark Streaming的優缺點進行概括。希望本篇文章能夠給各位帶來幫助,后續我們會介紹Spark Streaming在場景應用中我們所做的優化方面的努力,敬請期待!

關于作者

徐勝國,大連理工大學碩士畢業,360大數據中心數據研發工程師,主要負責基于Spark Streaming的項目架構及研發工作。郵箱 : xshguo_better@yeah.net。如有問題,可郵件聯系,歡迎交流。

責任編輯:武曉燕 來源: oschina博客
相關推薦

2018-04-09 12:25:11

2017-08-14 10:30:13

SparkSpark Strea擴容

2016-12-19 14:35:32

Spark Strea原理剖析數據

2017-10-13 10:36:33

SparkSpark-Strea關系

2016-01-28 10:11:30

Spark StreaSpark大數據平臺

2016-05-11 10:29:54

Spark Strea數據清理Spark

2017-09-26 09:35:22

2019-10-17 09:25:56

Spark StreaPVUV

2019-12-13 08:25:26

FlinkSpark Strea流數據

2021-08-20 16:37:42

SparkSpark Strea

2023-10-24 20:32:40

大數據

2017-06-27 15:08:05

大數據Apache SparKafka Strea

2017-10-11 11:10:02

Spark Strea大數據流式處理

2021-07-09 10:27:12

SparkStreaming系統

2018-04-18 08:54:28

RDD內存Spark

2016-03-03 15:11:42

Spark Strea工作流調度器

2011-08-24 14:07:13

PostgreSQLStreaming R

2018-10-24 09:00:26

KafkaSpark數據

2022-06-24 08:00:00

編程工具數據結構開發

2010-02-23 10:57:34

WCF Streami
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久草在线| 欧美精品在线观看 | 成人在线免费 | 久久人人爽人人爽 | 黄色成人在线 | 亚洲视频免费一区 | 欧美日韩精品一区二区三区蜜桃 | 国产成人精品一区二区三区在线 | 久久综合成人精品亚洲另类欧美 | 日本免费一区二区三区四区 | 亚洲成人精品在线 | 国产精品美女久久久久aⅴ国产馆 | 久久国内 | 免费三级网 | 九九福利| 久久精品一级 | 毛片一区二区三区 | 情侣酒店偷拍一区二区在线播放 | 一级一级一级毛片 | 91精品国产色综合久久 | 国产黄色大片在线观看 | 综合国产 | 欧美性生活网 | 性色av香蕉一区二区 | 亚洲综合中文字幕在线观看 | 久久小视频 | 一区二区三区久久久 | 男人天堂网址 | 色天堂影院 | 久久亚洲欧美日韩精品专区 | caoporn国产精品免费公开 | 韩国欧洲一级毛片 | 在线免费观看成年人视频 | 久久久久久美女 | 欧美精品v国产精品v日韩精品 | 亚洲精品久久久久久国产精华液 | 在线欧美a| 欧美一区二区小视频 | 国产精品久久久久久婷婷天堂 | 在线国产视频 | 97国产成人|