為何Spark在編程界越來越吃香?Spark將成為數據科學家的統一平臺
前言
統計科學家使用交互式的統計工具(比如R)來回答數據中的問題,獲得全景的認識。與之相比,數據工程師則更像一名程序員,他們在服務器上編寫代碼,創建和應用機器學習模型,熟悉C++和Java等系統級語言,經常需要和企業級數據中心的某些組件打交道,比如Hadoop。
而有的數據科學家專注于更細的領域,就像精通R但從未聽說過Python或者scikit-learn(反之亦然),即便兩者都提供了豐富的統計庫。(文末附有驚喜!)
Spark相比其他工具
如果可以提供一種統一的工具,運行在統一的架構,用統一的語言編程,并可以同時滿足統計科學家和數據工程師的需求,那該多好啊。難道為了研究數據,我就必須去學一種像Python或R的語言?我一直使用傳統的數據分析工具,難道為了應對大規模計算,就必須去懂MapReduce?正是統計工具的不完美造就了這種局面:
- R提供了一個豐富的統計分析和機器學習的解釋器。但R難以在分布式條件下執行數據的分析和清洗,以便開展其所擅長的數據分析,也不以一種主流的開發語言為人所知。
- Python是-種通用的編程語言,也不乏出色的第三方數據分析庫(像Pandas和scikit-learn),但Python也有和R一樣的缺陷:只能局限在處理單機能負載的數據量。
- 在經典的MapReduce計算框架上開發分布式的機器學習算法是可行的(參考Mahout),但程序員需要從零開始,更別說移植復雜計算的難度。
- 為降低復雜計算移植到MapReduce的難度,Crunch 提供一個簡單的、傻瓜式的Java API,但MapReduce天生決定了它在迭代計算方面是低效的,盡管大多數機器學習算法都需要迭代計算。

Spark的優勢
Spark是一個超有潛力的通用數據計算平臺,無論是對統計科學家還是數據工程師。大部分人討論到Spark時,總是注意到將數據駐留內存以提高計算效率的方面(相對MapReduce),Spark 擁有許多的特征,使之真正成為一個融合統計科學和數據工程的交叉點:
- Spark附帶了一個機器學習庫MLib,雖然只是在初始階段。
- Spark是用Scala語言編寫的,運行在Java虛擬機上,同時也提供像R和Python的命令行解釋器。
- 對Java程序員,Scala 的學習曲線是比較陡峭的,但所幸Scala可以兼容一切的Java庫。
- Spark的RDD(彈性分布式數據集),是Crunch開發者熟知的--種數據結構。
- Spark模仿了Scala 的集合計算API,對Java和Scala開發者來說耳熟能詳,而Python開發者也不難上手,而Scala對統計計算的支持也不錯。
- Spark和其底層的Scala語言,并不只是為機器學習而誕生的,除此之外,像數據訪問、日志ETL和整合都可以通過API輕松搞定。就像Python,你可以把整個數據計算流程搬到Spark平臺.上來,而不僅僅是模型擬合和分析。
在命令行解釋器中執行的代碼,和編譯后運行的效果相同。而且,命令行的輸入可以得到實時反饋,你將看到數據透明地在集群間傳遞與計算。
Spark和MLib還有待完善整個項目有不少bug,效率也還有提升的空間,和YARN的整合也存在問題。Spark還沒辦法提供像R那樣豐富的數據分析函數。但Spark已然是世界上最好的數據平臺,足以讓來自任何背景的數據科學家側目。

Stack Overflow問題的自動標注
Stack Overflow是一個著名的軟件技術問答平臺,在上面提的每個問題有可能被打上若干個短文本的標簽,比如java或者sql,我們的目標在于建立一.套系統,使用ALS推薦算法,為新問題的標簽提供預測和建議。從推薦系統的角度,你可以把問題想象成user,把標簽想象成item。
首先,從Stack Overflow下載官方提供的截至20140120的問答數據
- sta ckoverflow. com-Posts. 7z
這是一個能夠直接用于分布式計算的bzip格式文件,但在我們的場景下,必須先解壓并拷
貝到HDFS
- bzcat stackover flow. com-Posts.7z| hdfs dfs -put一/user /srowen/ Posts. xml
解壓后的文件大約是24.4GB,包含210萬個問題,1800 萬個回答,總共標注了930萬個標簽,這些標簽排重之后大概是34000個。
確認機器安裝了Spark 之后,輸入spark-shell即可打開Scala的REPL環境。首先,我們讀取一個存儲在HDFS的Posts. xm文件:
- val postsXML = sC. textFile("hdfs:/ //user /srowen/Posts. xml")
這時命令行工具會返回:
- postsXML: org. apache. spark.rdd.RDD[String] = MappedRDD[1] at textFile at :12
顯示文本文件已轉化為一個String型的RDD,你可以通過調用RDD的函數,實現任意的查詢運算。比如統計文件的行數:
- postsXML. count
這條指令生成大量的輸出,顯示Spark正在利用分布式的環境計數,最終打印出18066983。
下一步,將XML文件的每- -行都存入形如(questionID, tag)的元組。得益于Scala的函數式編程的風格,RDD和Scala集合-樣可以使用map等方法:
- val postIDTags = postsXML. flatMap { line =>
- // Matches Id=".. ."
- Tags="..." in line
- val idTagRegex = "Id=\"(\\d+)\". +Tags=\"([^\"]+)\"".r
- // // Finds tags like <TAG> value from above
- val tagRegex = "<([^&]+)>".r
- // Yields 0 or 1 matches:
- idTagRegex. findFirstMatchIn(line) match {
- // No match -- not a line
- case None => None
- // Match, and can extract ID and tags from m
- case Some(m) => {
- val postID = m. group(1) . toInt
- val tagsString = m. group(2)
- // Pick out just TAG matching group
- val tags = tagRegex. findAllMatchIn(tagsString)。map(_ . group
- (1)) . toList
- // Keep only question with at least 4 tags, and map to (pos
- t,tag) tuples
- if (tags.size >= 4) tags . map( (postID,_)) else None
- }
- }
- // Because of flatMap,individual lists will concatenate
- // into one collection of tuples
- }
你會發現這條指令的執行是立即返回的,而不像count一樣需要等待,因為到目前為止,Spark并未啟動任何主機間的數據變換。
ALS的MLib實現必須使用數值ID而非字符串作為惟一標識,而問題的標簽數據是字符串格式的,所以需要把字符串哈希成一個非負整數,同時保留非負整數到字符串的映射。這里我們先定義一個哈希函數以便復用。
- def nnHash(tag: String) = tag.hashCode & 0x7FFFFF
- var tagHashes = postIDTags .map(_._2) .distinct. map(tag => (nnHash
- (tag) , tag))
現在把元組轉換為ALS計算所需的輸入:
- import org. apache. spark. mllib. recommendation._
- // Convert to Rating(Int ,Int,Double) objects
- val alsInput = postIDTags.map(t => Rating(t. _1, nnHash(t._2), 1.
- 0) )
- // Train model with 40 features, 10 iterations of ALS
- val model = ALS. trainImplicit(alsInput, 40,10)
這一步生成特征矩陣,可以被用來預測問題與標簽之間的關聯。由于目前MLib還處于不完善的狀態,沒有提供一個recommend的接口來獲取建議的標簽,我們可以簡單定義一個:
- def
- recommend (questionID: Int, howMany: Int = 5): Array[(String ,
- Double)] = {
- // Build list of one question and all items and predict value f
- or all of them
- val predictions = model. predict(tagHashes.map(t => (questionI
- D,t._1)))
- // Get top howMany recommendations ordered by prediction value
- val topN = predictions. top ( howMany) (Ordering . by [Rating,Doub1
- e](_.rating))
- // Translate back to tags from IDs
- topN . map(r => (tagHashes. lookup(r . product)(0),r .rating))
通過上述函數,我們可以獲得任意一個問題比如ID為7122697的How to make substring-matching query work fast on a large table?的至少4個標簽:
- recommend ( 7122697)。foreach(println)
推薦結果如下所示:
- (sqL,0.17745152481166354)
- (database,0.13526622226672633)
- (oracle , 0.1079428707621154)
- (ruby-on-rails, 0.06067207312463499)
- (postgresql,0.050933613169706474)
注意:
- -每次運行得到的結果不盡相同,是因為ALS是從隨機解開始迭代的
- -如果你希望獲得實時性更高的結果,可以在recommend前輸入tagHashes=tagHas hes. cache
真實的問題標簽是postgresql、query-optimi zation、substring 和text-search。不過,預測結果也有一定的合理性(postgresq 經常和ruby-on-rails一起出現)
當然,以上的示例還不夠優雅和高效,但是,我希望所有來自R的分析師、鼓搗Python 的黑客和熟悉Hadoop的開發者,都能從中找到你們熟悉的部分,從而找到一條適合你們的路徑去探索Spark,并從中獲益。