成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Spark入門:實現WordCount的3種方式

大數據 Spark
當我們學習一門新的語言,HelloWorld通常是我們寫的第一個程序。而WordCount基本上是我們學習MapReduce思想與編程的第一個程序,無論是Hadoop的MR或者是Spark的RDD操作學習。

[[170182]]

WordCount作為Spark的入門任務,可以很簡單,也可以做到比較復雜。 本文從實現功能的角度提出了3種實現方式,至于性能影響,會在后文繼續討論。

注意: 本文使用的Spark版本還是1.6.1.如果讀者您已經切換到2.0+版本,請參考GitHub spark的官方例子進行學習。 因為2.0版本的API與1.X 并不能完全兼容,特別是2.0開始使用了SparkSession的概念,而不是SparkContext!

***種方式:mapToPair + reduceByKey

這是官方提供的實現方式,應該也是網上能找到的最多的例子。

官網地址: http://spark.apache.org/examples.html

核心代碼:

  1. JavaRDD<String> textFile = sc.textFile("hdfs://..."); 
  2.  
  3. JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() { 
  4.  
  5. public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); } 
  6.  
  7. }); 
  8.  
  9. JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { 
  10.  
  11. public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } 
  12.  
  13. }); 
  14.  
  15. JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<IntegerIntegerInteger>() { 
  16.  
  17. public Integer call(Integer a, Integer b) { return a + b; } 
  18.  
  19. }); 
  20.  
  21. counts.saveAsTextFile("hdfs://..."); 

總結上面的步驟:

  1. flatmap : 將一整段文字映射成一個字符串數組
  2. mapToPair: 將word 映射成 (word, 1)
  3. reduceByKey: 按照key進行group and plus的操作, 得到最終結果
  4. collect: 這是Action,上面3個都是Transformation

第二種方式:使用countByValue代替mapToPair + reduceByKey

核心代碼:

  1. JavaRDD<String> textFile = sc.textFile("hdfs://..."); 
  2.  
  3. JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() { 
  4.  
  5. public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); } 
  6.  
  7. }); 
  8.  
  9. Map<String, Long> counts = words.countByValue(); 

讀文件、flatmap這兩步都是完全一樣的,但是后面直接一個countByValue就搞定了,并且還直接collect到本地了,是不是感覺這一種實現方式更簡潔了呢?

至于性能,一般來說這種方式還不錯,但是這種方式有一些缺點,參考StackOverFlow的描述:

網址: http://stackoverflow.com/questions/25318153/spark-rdd-aggregate-vs-rdd-reducebykey

countByValue would be the fastest way to do this, however its implementation uses hash maps and merges them so if you have a large amount of data this approach may not scale well (especially when you consider how many issues spark already has with memory). You may want to use the standard way of counting in map reduce which would be to map the line and 1 as pairs then reduceBykey like this:

簡單的說,這種方式是使用hash的方式進行merge。 如果處理的數據量比較大的時候,效果可能不怎么好。

注意: 這種方式的性能筆者確實還沒有親自實踐過!

第三種方式:AggregateByKey

AggregateByKey 這個方法,可以看做是reduceByKey的增強版,因為reduceByKey的輸出類型與輸入類型要求是完全一致的。比如wordcount 之中的輸入是Tuple2<String, Integer> 輸出也同樣要求是Tuple2<String,Integer>. 但是AggregateByKey的輸出類型可以是不一樣的數據類型。 參考下面的代碼:

  1. val keysWithValuesList = Array("foo=A""foo=A""foo=A""foo=A""foo=B""bar=C""bar=D""bar=D"
  2.  
  3. val data = sc.parallelize(keysWithValuesList) 
  4.  
  5. //Create key value pairs 
  6.  
  7. val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache() 
  8.  
  9. val initialCount = 0; 
  10.  
  11. val addToCounts = (n: Int, v: String) => n + 1 
  12.  
  13. val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2 
  14.  
  15. val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts) 

輸出:

  1. Aggregate By Key sum Results 
  2.  
  3. bar -> 3 
  4.  
  5. foo -> 5 

可以看到,輸入是<String, String> 而輸出變成了<String, Integer>

注意: 這種方法,并不是處理WordCount的***的選擇,只是說明我們可以使用AggregateByKey這種方式來實現相同的功能

其實還有另外一種實現方式: 使用DataFrame。 但是這種方式需要前期的準備比較多,即如何將數據處理并喂給DataFrame。

一般來說,DataFrame的效率相比其他的RDD的實現方式要高不少,如果在前期準備工作上面難度不是太大的話,非常推薦使用DataFrame的方式。

責任編輯:武曉燕 來源: FlyML
相關推薦

2009-07-02 14:42:55

ExtJS Grid

2020-02-18 20:00:31

PostgreSQL數據庫

2022-08-05 08:27:05

分布式系統線程并發

2020-02-10 15:50:18

Spring循環依賴Java

2017-09-05 10:20:15

2015-05-04 10:20:25

2019-01-31 08:15:38

物聯網農業IoT

2010-03-12 17:52:35

Python輸入方式

2021-11-05 21:33:28

Redis數據高并發

2010-08-13 13:25:53

Flex頁面跳轉

2014-12-31 17:42:47

LBSAndroid地圖

2021-06-24 08:52:19

單點登錄代碼前端

2021-04-01 06:01:10

嵌入式開發應用程序開發技術

2021-07-19 05:48:30

springboot 攔截器項目

2015-04-02 16:54:52

災難恢復VDI災難恢復

2015-04-13 11:39:26

VDI災難恢復

2010-07-14 10:30:26

Perl多線程

2023-09-07 19:14:05

2018-04-02 14:29:18

Java多線程方式

2025-03-26 00:35:25

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲精品乱码久久久久久按摩 | 国产精品久久久久久亚洲调教 | 中文字幕av一区二区三区 | 成人在线中文字幕 | 欧美成人精品一区 | 欧美一级黑人aaaaaaa做受 | 欧美中文字幕一区二区三区亚洲 | 欧美日高清视频 | 一区二区日韩 | 成人av一区二区三区 | 91av视频在线 | 久久综合伊人一区二区三 | 亚洲久久一区 | 久久成人免费视频 | 97久久久久久| 激情一区二区三区 | 国产三区av | 成人国产在线视频 | 九九av| 欧美在线网站 | 久久久久久国产 | 亚洲精品久久区二区三区蜜桃臀 | 亚洲精品一区在线 | 国产成人精品一区二 | 日韩欧美一区二区三区在线播放 | 777777777亚洲妇女| 91精品国产91久久久久久不卞 | 天天干干 | 国产福利视频网站 | 免费一看一级毛片 | 91久久久久久久久 | 精品欧美在线观看 | 99精品在线| 97精品国产97久久久久久免费 | xxxxx黄色片| 精品一区二区三区在线观看 | 91精品国产一区二区三区动漫 | 黄色一级电影在线观看 | 亚洲成人精品 | 巨大荫蒂视频欧美另类大 | 99久久久久久 |