從HDFS和MapReduce兩方面了解Hadoop
簡介
Hadoop 是一個能夠對大量數據進行分布式處理的軟件框架,框架最核心的設計就是:HDFS 和 MapReduce。HDFS 為海量的數據提供了存儲,而 MapReduce 則為海量的數據提供了計算。這篇文章就主要從 HDFS 和 MapReuce 兩個大的方面展開對 Hadoop 講解,當然為了直觀的測試 HDFS 提供的豐富的 API 以及我們編寫的 MapReduce 程序,在閱讀下面的內容之前,你需要準備一臺安裝了 Hadoop 的機器(也可以是虛擬機),如果你還沒有安裝的話,可以參考《在 Ubuntu 上安裝 Hadoop》。
HDFS
HDFS 概念
在說 HDFS 之前我們先來解釋一下什么是 DFS,DFS 的全稱是 Distributed File System,翻譯過來就是分布式文件系統,而 HDFS 就是 Hadoop 自帶的分布式文件系統。
相關名詞
為了后面大家更容易理解文章,這里使用一定的篇幅來簡單的介紹一下與 HDFS 相關的一些組件或者名詞的概念。
- NameNode,管理節點,管理系統的命名空間,維護著整個文件系統的結構和目錄信息,通常情況下一個 Hadoop 集群只會有一個工作的 NameNode。
- DataNode,工作節點,文件系統的工作節點,主要是根據需要進行存儲或者檢索數據塊,并且定期向 NameNode 報告它們所存儲的數據塊列表。
- 數據塊,同我們常使用的磁盤上的文件系統一樣,HDFS 也有數據塊的概念,默認的大小為 128M。
- 塊緩存,一般情況下,我們通過 HDFS 從 DataNode 中檢索數據時,DataNode 都是從磁盤中讀取,但是對于訪問很頻繁的文件,它所對于的數據塊可能會被緩存到 DataNode 的內存中,以加快讀取速度,這就是所謂的塊緩存。
- 聯邦 HDFS,其實這個就是為了解決 Hadoop 不適合存儲數量龐大的文件的問題,同時由多個 NameNode 來維護整個文件系統的系統樹以及文件和目錄,每個 NameNode 負責管理文件系統命名空間中的一部分。
特性
下面我們就一起來看下 HDFS 有哪些特性:
- 存儲超大文件,由于 HDFS 是分布式的文件系統,所以不受單臺機器的存儲大小的限制,可以存儲超大文件,目前已經達到了 PB 級了。
- 流式訪問數據。
- HDFS 對硬件的要求并不是很高,可以運行在廉價的商用硬件上。
- 不適合低延遲的數據訪問,由于 Hadoop 的流式數據訪問,訪問數據會有寫延遲,所以不太適合低時間延遲的數據訪問,一般情況下這種需求我們會使用關系型數據庫來實現。
- 不適合大量的小文件存儲,原因是 NameNode 將文件系統的元數據存儲在內存中,每存儲一個文件都需要在 NameNode 中存儲該文件的目錄、存儲在哪個 DataNode 中等等的數據。所以如果文件的數量達到數十億的話 NameNode 的內存很可能不夠用了。
- 不支持多用戶寫入,以及任意的修改文件,只可以在文件末尾添加內容。
HDFS 的命令行操作
命令行接口是 HDFS 所有類型的接口中最簡單的,也是每個開發者都必須要掌握的。下面我們就列舉幾個簡單的命令行操作,但是在操作前你必須按照***章的內容安裝好了 Hadoop,并且啟動了 HDFS。
創建目錄。
- 清單 1. 創建目錄命令
- hadoop fs -mkdir /test
查看目錄。
- 清單 2. 創建目錄命令
- hadoop fs -ls /
上傳文件,緊跟-put 后面的 test.txt 是要推送到 HDFS 中的文件,/test 是指定要推送到 HDFS 上哪個目錄下面。
- 清單 3. 上傳文件
- hadoop fs -put test.txt /test
刪除文件。
- 清單 4. 上傳文件
- hadoop fs -rm /test/test.txt
其實通過上面例舉的幾個命令我們可以看出 HDFS 的文件操作命令幾乎和 Linux 上的命令一致,這樣我們使用起來會很容易上手。
HDFS 的 JavaAPI
在 Java 項目中使用 HDFS 提供的 API 我們需要依賴 hadoop-common 和 hadoop-hdfs 兩個包,為了方便測試,我們這里還引入了 junit,篇幅原因這里就不對項目本身做太多的講解,這里附上項目源碼地址供大家參考。
讀取 HDFS 中文件的內容。
- 清單 5. JavaApi 讀取文件內容
- @Test
- public void read() throws IOException {
- // 文件地址。
- URI uri = URI.create("/test/test.txt");
- // 用于接收讀取的數據流。
- FSDataInputStream in = null;
- try {
- in = fs.open(new Path(uri));
- // ***的一個 boolean 類型的參數是指是否在調用結束后關閉流,我們這里選擇在 finally 里面手動關閉。
- IOUtils.copyBytes(in, System.out, 4096, false);
- } finally {
- IOUtils.closeStream(in);
- }
- }
- }
不出意外的話,你可以在控制臺看到你指定文件的內容。在這一步我遇到一個問題,就是無法直接在 windows 下操作 HDFS,具體的解決方法可以參照這篇文章。FSDataInputStream.seek()方法還可以實現從文件輸入流的任意一個絕對位置讀取文件內容,比如我們可以在上面代碼中添加如下的內容來實現在控制臺重復打印文件內容。
- 清單 6. JavaApi 任意位置讀取文件內容
- in.seek(0);
- tils.copyBytes(in, System.out, 4096, false);
創建目錄。
- 清單 7. JavaApi 創建目錄
- @Test
- public void mkdir() throws IOException {
- fs.mkdirs(new Path("/test/api"));
- }
查詢文件目錄。
- 清單 8. JavaApi 查詢文件目錄
- @Test
- public void ls() throws IOException {
- FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
- if (null == fileStatuses || fileStatuses.length == 0) {
- return;
- }
- for (FileStatus fileStatus : fileStatuses) {
- System.out.println(fileStatus.getPath() + " " + fileStatus.getPermission());
- }
- }
這里引入一個類 FileStatus,這個類封裝了 HDFS 中文件和目錄的元數據,包括文件長度、塊大小、復本、修改時間、所有者以及權限信息。FileSystem 里面提供的 listStatus 方法可以獲取一個目錄下的所有目錄或者文件的 FileStatus,但是它不會遞歸獲取下級目錄的內容,這里可以開發你的想象自己實現一下(Tips:fileStatus.isDirectory()可以判斷這個 fileStatus 是否是一個文件夾)。
刪除文件或目錄。
- 清單 9. JavaApi 刪除文件或目錄
- @Test
- public void delete() throws IOException {
- fs.delete(new Path("/test/api"), false);
- }
- @Test
- public void deleteNonEmptyDir() throws IOException {
- fs.delete(new Path("/test"), true);
- }
我們可以看到 fs.delete()這個方法有兩個參數,***個參數很好理解,就是我們要刪除的目錄或者文件的地址。那么第二個 Boolean 類型的參數呢,如果刪除的是文件或者空目錄這個參數實際上是會被忽略的,如果刪除的是非空目錄,只有在這個參數值為 true 的時候才會成功刪除。
創建文件和文件寫入。
我們通過 FileSystem.create()方法來創建一個文件,這個方法會順帶著創建不存在的父級目錄,如果不需要這個的話,***是在創建之前調用 exists()方法來判斷一下,如果父級目錄不存在直接報錯即可。
- 清單 10. JavaApi 創建文件和文件寫入
- @Test
- public void create() throws IOException {
- FSDataOutputStream out = null;
- try {
- out = fs.create(new Path("/test/api/test.txt"));
- out.writeChars("hello hdfs.");
- } finally {
- IOUtils.closeStream(out);
- }
- }
文件創建好后,可以通過 append()方法在文件末尾添加內容。
- 清單 11. JavaApi 追加文件內容
- @Test
- public void append() throws IOException {
- FSDataOutputStream out = null;
- try {
- out = fs.append(new Path("/test/api/test.txt"));
- out.writeChars("hello hdfs.");
- } finally {
- out.close();
- }
- }
從本地上傳文件到 HDFS。
- 清單 12. JavaApi 上傳文件至 HDFS
- @Test
- public void copyFromLocal() throws IOException {
- fs.copyFromLocalFile(new Path("d:/local.txt"), new Path("/test/api"));
- }
從 HDFS 上下載文件。
- 清單 13. JavaApi 從 HDFS 下載文件
- @Test
- public void copyToLocal() throws IOException {
- fs.copyToLocalFile(new Path("/test/api/local.txt"), new Path("E:/"));
- }
MapReduce 實戰
什么是 MapReduce
MapReduce 是一種編程模型,"Map(映射)"和"Reduce(歸約)",是它們的主要思想,我們通過 Map 函數來分布式處理輸入數據,然后通過 Reduce 匯總結果并輸出。我們編寫一個 MapReduce 程序的一般步驟是:
- 編寫 map 程序。
- 編寫 reduce 程序。
- 編寫程序驅動。
本章節的目標
本節中我們將使用 MapReduce 框架來編寫一個簡單的例子,這個例子是用來統計 HDFS 指定目錄下的文件中每個字符出現的次數并將統計結果輸出到 HDFS 的指定目錄中。點擊此處獲取本章節源代碼。
Map 程序
我們繼承 Mapper 類并重寫了其 map 方法。Map 階段輸入的數據是從 hdfs 中拿到的原數據,輸入的 key 為某一行起始位置相對于文件起始位置的偏移量,value 為該行的文本。輸出的內容同樣也為鍵-值對,這個時候輸出數據的鍵值對的類型可以自己指定,在本例中 key 是 Text 類型的,value 是 LongWritable 類型的。輸出的結果將會被發送到 reduce 函數進一步處理。
- 清單 14. Map 程序
- public class CharCountMapper extends Mapper< LongWritable, Text, Text, LongWritable> {
- @Override
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- // 將這一行文本轉為字符數組
- char[] chars = value.toString().toCharArray();
- for (char c : chars) {
- // 某個字符出現一次,便輸出其出現 1 次。
- context.write(new Text(c + ""), new LongWritable(1));
- }
- }
- }
Reduce 程序
我們繼承 Reducer 類并重寫了其 reduce 方法。在本例中 Reduce 階段的輸入是 Map 階段的輸出,輸出的結果可以作為最終的輸出結果。相信你也注意到了,reduce 方法的第二個參數是一個 Iterable,MapReduce 會將 map 階段中相同字符的輸出匯總到一起作為 reduce 的輸入。
- 清單 15. Reduce 程序
- public class CharCountReducer extends Reducer< Text, LongWritable, Text, LongWritable> {
- @Override
- protected void reduce(Text key, Iterable< LongWritable> values, Context context)
- throws IOException, InterruptedException {
- long count = 0;
- for (LongWritable value : values) {
- count += value.get();
- }
- context.write(key, new LongWritable(count));
- }
- }
驅動程序
到目前為止,我們已經有了 map 程序和 reduce 程序,我們還需要一個驅動程序來運行整個作業。可以看到我們在這里初始化了一個 Job 對象。Job 對象指定整個 MapReduce 作業的執行規范。我們用它來控制整個作業的運作,在這里我們指定了 jar 包位置還有我們的 Map 程序、Reduce 程序、Map 程序的輸出類型、整個作業的輸出類型還有輸入輸出文件的地址。
- 清單 16. 驅動程序
- public class CharCountDriver {
- public static void main(String[] args) throws Exception {
- Configuration configuration = new Configuration();
- Job job = Job.getInstance(configuration);
- // Hadoop 會自動根據驅動程序的類路徑來掃描該作業的 Jar 包。
- job.setJarByClass(cn.itweknow.mr.CharCountDriver.class);
- // 指定 mapper
- job.setMapperClass(CharCountMapper.class);
- // 指定 reducer
- job.setReducerClass(CharCountReducer.class);
- // map 程序的輸出鍵-值對類型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
- // 輸出鍵-值對類型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- // 輸入文件的路徑
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- // 輸入文件路徑
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- boolean res = job.waitForCompletion(true);
- System.exit(res?0:1);
- }
- }
執行 MapReduce 作業
打包作業,我們需要將我們的 MapReduce 程序打成 jar 包。
- 清單 17. 打包作業
- mvn package -Dmaven.test.skip=true
將 jar 包復制到 hadoop 機器上。
在 HDFS 上準備好要統計的文件,我準備的文件在 HDFS 上的/mr/input/目錄下,內容為"hello hadoop hdfs.I am coming."。
執行 jar。
- 清單 18. 執行作業
- hadoop jar mr-test-1.0-SNAPSHOT.jar cn.itweknow.mr.CharCountDriver /mr/input/ /mr/output/out.txt
查看結果。
我們先看看輸出目錄,結果如下,最終輸出的結果就存放在/mr/output/part-r-00000 文件中。
圖 1. MapReduce 作業輸出目錄
然后我們再看看輸出文件中的具體內容,如下所示:
圖 2. MapReduce 作業輸出結果
MapReduce 運行原理
我們可以將一個 MapReduce 作業的運行過程簡單的拆分成 6 個過程,分別是作業的提交、作業初始化、任務分配、任務執行、進度和狀態的更新、作業完成。下面我就一起來具體了解下這么幾個步驟。
作業的提交
當我們調用 job.submit()或者 job.waitForCompletion()方法(其內部也會調用 submit()方法)的時候,會創建一個 JobSubmitter 對象,在 JobSubmitter 內部所實現的作業提交過程如下:
- 向資源管理器請求新的應用 ID 作為 MapReduce 作業的作業 ID。
- 檢查作業的輸出目錄,如果沒有指定輸出目錄或者輸出目錄已經存在就會拋出錯誤,這也就是為啥我們在執行 MapReduce 作業時為啥需要保證指定的輸出目錄不存在。
- 將作業運行所需要的資源文件(作業 JAR 包,配置文件,輸入分片)復制到一起(一個以作業 ID 命名的目錄下)。
- 調用 submitApplication()方法提交作業。
作業的初始化
- 首先資源管理器會將作業請求傳遞給 YARN 調度器。
- 調度器會為作業分配一個容器。
- 資源管理器在節點管理器的管理下在容器中啟動 application master。
- application master 的主類 MRAppMaster 會創建多個簿記對象來跟蹤作業的進度。
- 接收輸入分片。
- application master 為每個分片創建 map 任務以及確定 reduce 任務,并且分配任務 ID。
任務的分配
application master 會為創建的任務向資源管理器請求容器,先是為 map 任務請求資源,后為 reduce 任務請求資源。為 map 任務分配資源的時候需要考慮到數據本地化的局限,會盡量保證運行的 map 任務所需要的數據塊存儲在當前機器或者當前機架中,這樣會極大的節省帶寬資源。而 reduce 任務則不存在這個限制。
任務的執行
- 資源管理器為任務分配好容器后,application master 就通過與節點管理器通信啟動容器。
- 在運行任務之前,會將任務所需要的資源本地化。
- 運行任務。
進度和狀態的更新
任務在運行的過程中,會對其精度保持追蹤,對與 map 任務,其任務進度就是已經處理的輸入所占總輸入的比例。對與 reduce 任務來講就比較復雜了,因為這個部分包含資源復制階段、排序階段和 reduce 階段三個階段,每個階段都占整個完成比例的 1/3,也就是說當我們完成 reduce 的一半的時候進度應該為 5/6。對與狀態的更新,客戶端會每秒輪詢一次 application master 以接收***的任務狀態。
作業的完成
當 application master 收到作業***一個任務已經完成的通知后,便把作業的狀態設置為"成功"。
為了方便大家理解,我這里將整個過程總結為一張圖,貼在這里僅供大家參考。
圖 3. MapReduce 程序運行圖解
Shuffle
簡介,什么是 Shuffle
MapReduce 程序會確保每個 reduce 函數的輸入都是按鍵排序的。系統執行排序以及將 map 函數的輸出傳給 reduce 函數的過程稱之為 shuffle。整個 Shuffle 分為 Map 端和 Reduce 端,下圖是 MapReduce 的 Shuffle 的一個整體概覽圖,大家先看一下整個圖,我們后面再做進一步的解釋說明。
圖 4. Shuffle 概覽圖
Map 端 Shuffle
其實 Map 函數產生的輸出會寫到磁盤上而不是 HDFS。但是它也不是簡簡單單的直接寫到磁盤,這中間有一個復雜的過程,下面我們就來拆解一下。
從上面的圖可以看到每個 Map 任務都會有一個緩沖區,這個緩沖區會臨時存儲 map 函數輸出的內容,緩沖區的個大小默認是 100M,我們可以通過 mapreduce.task.io.sort.mb 這個配置項配置,當緩沖區中的內容達到其設定的閾值(閾值的設置值是占整個緩沖區的大小,默認為 0.8,我們可以通過 mapreduce.map.sort.spill.percent 來配置)時就會產生溢出,這個時候會有一個后臺線程將緩沖區中的內容分區(根據最終要傳給的 Reduce 任務分成不同的區,分區的目的是將輸出劃分到不同的 Reducer 上去,后面的 Reducer 就會根據分區來讀取自己對應的數據)
然后區內按照 key 排序,如果我們設置了 Combiner(Combiner 的本質也是一個 Reducer,其目的是對將要寫入到磁盤上的文件先進行一次處理,這樣,寫入到磁盤的數據量就會減少。)
的話,這個時候會運行 Combiner 函數,***再寫入磁盤。而在這個過程中 Map 任務還會繼續往緩沖區中輸出內容,如果出現緩沖區空間被占滿的情況,Map 任務就會阻塞直到緩沖區中的內容被全部寫到磁盤中為止。
每次緩沖區溢出時都會新建一個新的溢出文件,這樣***其實是會出現多個溢出文件的,在 Map 任務結束前這些溢出文件會被合并到一個整的輸出文件。
Reduce 端 Shuffle
Reduce 端的 Shuffle 分為三個階段,復制階段、合并階段和 Reduce。
首先是復制階段,Reduce 任務需要集群上若干個 map 輸出作為其輸入內容,在每個 Map 任務完成的時候 Reduce 任務就開復制其輸出,上面也提到過 Map 任務在寫入磁盤前會將輸出進行根據 Reduce 任務進行分區,所以這里 Reduce 任務在復制的時候只會復制自己的那個分區里的內容。如果 Map 的輸出非常小,那么 Reduce 會直接將其復制到內存中,否則會被復制到磁盤。
合并階段,因為有很多的 Map 任務,所以 Reduce 復制過來的 map 輸出會有很多個,在這個階段主要就是將這些 Map 輸出合并成為一個文件。
Reduce 階段,這個階段主要就是執行我們的 Reduce 函數的代碼了,并產生最終的結果,然后寫入到 HDFS 中。
在文章的***,提供我在撰寫本文的過程中所編寫的一些源代碼,供大家參考。也希望大家能夠從本文中收獲一些幫助。
結束語
本文主要從 HDFS 和 MapReduce 兩個大的方面講解了 Hadoop 的相關知識,并且編寫了一個簡單的 MapReduce 程序,***還深入了解了一下 MapReduce 程序的運行原理以及 Shuffle 相關的內容。