Hadoop大數據通用處理平臺
Hadoop是一款開源的大數據通用處理平臺,其提供了分布式存儲和分布式離線計算,適合大規模數據、流式數據(寫一次,讀多次),不適合低延時的訪問、大量的小文件以及頻繁修改的文件。
*Hadoop由HDFS、YARN、MapReduce組成。
如果想學習Java工程化、高性能及分布式、深入淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給大家。
Hadoop的特點:
- 高擴展(動態擴容):能夠存儲和處理千兆字節數據(PB),能夠動態的增加和卸載節點,提升存儲能力(能夠達到上千個節點)
- 低成本:只需要普通的PC機就能實現,不依賴高端存儲設備和服務器。
- 高效率:通過在Hadoop集群中分化數據并行處理,使得處理速度非???。
- 可靠性:數據有多份副本,并且在任務失敗后能自動重新部署。
Hadoop的使用場景:
- 日志分析,將數據分片并行計算處理。
- 基于海量數據的在線應用。
- 推薦系統,精準營銷。
- 搜索引擎。
Hadoop生態圈:

- Hive:利用Hive可以不需要編寫復雜的Hadoop程序,只需要寫一個SQL語句,Hive就會把SQL語句轉換成Hadoop的任務去執行,降低使用Hadoop離線計算的門檻。
- HBase:海量數據存儲的非關系型數據庫,單個表中的數據能夠容納百億行x百萬列。
- ZooKeeper:監控Hadoop集群中每個節點的狀態,管理整個集群的配置,維護節點間數據的一致性。
- Flume:海量日志采集系統。
2.內部結構
2.1 HDFS

HDFS是分布式文件系統,存儲海量的文件,其中HDFS中包含NameNode、DataNode、SecondaryNameNode組件等。
Block數據塊
- HDFS中基本的存儲單元,1.X版本中每個Block默認是64M,2.X版本中每個Block默認是128M。
- 一個大文件會被拆分成多個Block進行存儲,如果一個文件少于Block的大小,那么其實際占用的空間為文件自身大小。
- 每個Block都會在不同的DataNode節點中存在備份(默認備份數是3)
DataNode
- 保存具體的Blocks數據。
- 負責數據的讀寫操作和復制操作。
- DataNode啟動時會向NameNode匯報當前存儲的數據塊信息。
NameNode
- 存儲文件的元信息和文件與Block、DataNode的關系,NameNode運行時所有數據都保存在內存中,因此整個HDFS可存儲的文件數受限于NameNode的內存大小。
- 每個Block在NameNode中都對應一條記錄,如果是大量的小文件將會消耗大量內存,因此HDFS適合存儲大文件。
- NameNode中的數據會定時保存到本地磁盤中(只有元數據),但不保存文件與Block、DataNode的位置信息,這部分數據由DataNode啟動時上報和運行時維護。
*NameNode不允許DataNode具有同一個Block的多個副本,所以創建的副本數量是當時DataNode的總數。
*DataNode會定期向NameNode發送心跳信息,一旦在一定時間內NameNode沒有接收到DataNode發送的心跳則認為其已經宕機,因此不會再給它任何IO請求。
*如果DataNode失效造成副本數量下降并且低于預先設置的閾值或者動態增加副本數量,則NameNode會在合適的時機重新調度DataNode進行復制。
SecondaryNameNode
- 定時與NameNode進行同步,合并HDFS中系統鏡像,定時替換NameNode中的鏡像。
HDFS寫入文件的流程

- HDFS Client向NameNode申請寫入文件。
- NameNode根據文件大小,返回文件要寫入的DataNode列表以及Block id (此時NameNode已存儲文件的元信息、文件與DataNode、Block之間的關系)
- HDFS Client收到響應后,將文件寫入第一個DataNode中,第一個DataNode接收到數據后將其寫入本地磁盤,同時把數據傳遞給第二個DataNode,直到寫入備份數個DataNode。
- 每個DataNode接收完數據后都會向前一個DataNode返回寫入成功的響應,最終第一個DataNode將返回HDFS Client客戶端寫入成功的響應。
- 當HDFS Client接收到整個DataNodes的確認請求后會向NameNode發送最終確認請求,此時NameNode才會提交文件。
*當寫入某個DataNode失敗時,數據會繼續寫入其他的DataNode,NameNode會重新尋找DataNode繼續復制,以保證數據的可靠性。
*每個Block都會有一個校驗碼并存放在獨立的文件中,以便讀的時候來驗證數據的完整性。
*文件寫入完畢后,向NameNode發送確認請求,此時文件才可見,如果發送確認請求之前NameNode宕機,那么文件將會丟失,HDFS客戶端無法進行讀取。
HDFS讀取文件的流程

如果想學習Java工程化、高性能及分布式、深入淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給大家。
- HDFS Client向NameNode申請讀取指定文件。
- NameNode返回文件所有的Block以及這些Block所在的DataNodes中(包括復制節點)
- HDFS Client根據NameNode的返回,優先從與HDFS Client同節點的DataNode中直接讀取(若HDFS Client不在集群范圍內則隨機選擇),如果從DataNode中讀取失敗則通過網絡從復制節點中進行讀取。
機架感知

分布式集群中通常包含非常多的機器,由于受到機架槽位和交換機網口的限制,通常大型的分布式集群都會跨好幾個機架,由多個機架上的機器共同組成一個分布式集群。
機架內的機器之間的網絡速度通常都會高于跨機架機器之間的網絡速度,并且機架之間機器的網絡通信通常受到上層交換機間網絡帶寬的限制。
Hadoop默認沒有開啟機架感知功能,默認情況下每個Block都是隨機分配DataNode,需要進行相關的配置,那么在NameNode啟動時,會將機器與機架的對應信息保存在內存中,用于在HDFS Client申請寫文件時,能夠根據預先定義的機架關系合理的分配DataNode。
Hadoop機架感知默認對3個副本的存放策略為:
- 第1個Block副本存放在和HDFS Client所在的節點中(若HDFS Client不在集群范圍內則隨機選取)
- 第2個Block副本存放在與第一個節點不同機架下的節點中(隨機選擇)
- 第3個Block副本存放在與第2個副本所在節點的機架下的另一個節點中,如果還有更多的副本則隨機存放在集群的節點中。
*使用此策略可以保證對文件的訪問能夠優先在本機架下找到,并且如果整個機架上發生了異常也可以在另外的機架上找到該Block的副本。
2.2 YARN
YARN是分布式資源調度框架(任務計算框架的資源調度框架),主要負責集群中的資源管理以及任務調度并且監控各個節點。
ResourceManager
- 是整個集群的資源管理者,管理并監控各個NodeManager。
- 處理客戶端的任務請求。
- 啟動和監控ApplicationMaster。
- 負責資源的分配以及調度。
NodeManager
- 是每個節點的管理者,負責任務的執行。
- 處理來自ResourceManager的命令。
- 處理來自ApplicationMaster的命令。
ApplicationMaster
- 數據切分,用于并行計算處理。
- 計算任務所需要的資源。
- 負責任務的監控與容錯。
任務運行在YARN的流程

客戶端提交任務請求到ResourceManager。
- ResourceManager生成一個ApplicationManager進程,用于任務的管理。
- ApplicationManager創建一個Container容器用于存放任務所需要的資源。
- ApplicationManager尋找其中一個NodeManager,在此NodeManager中啟動一個ApplicationMaster,用于任務的管理以及監控。
- ApplicationMaster向ResourceManager進行注冊,并計算任務所需的資源匯報給ResourceManager(CPU與內存)
- ResourceManager為此任務分配資源,資源封裝在Container容器中。
- ApplicationMaster通知集群中相關的NodeManager進行任務的執行。
- 各個NodeManager從Container容器中獲取資源并執行Map、Reduce任務。
2.3 MapReduce
MapReduce是分布式離線并行計算框架,高吞吐量,高延時,原理是將分析的數據拆分成多份,通過多臺節點并行處理,相對于Storm、Spark任務計算框架而言,MapReduce是最早出現的計算框架。
MapReduce、Storm、Spark任務計算框架對比:

MapReduce執行流程
MapReduce將程序劃分為Map任務以及Reduce任務兩部分。
Map任務處理流程
- 讀取文件中的內容,解析成Key-Value的形式 (Key為偏移量,Value為每行的數據)
- 重寫map方法,編寫業務邏輯,生成新的Key和Value。
- 對輸出的Key、Value進行分區(Partitioner類)
- 對數據按照Key進行排序、分組,相同key的value放到一個集合中(數據匯總)
*處理的文件必須要在HDFS中。
Reduce任務處理流程
- 對多個Map任務的輸出,按照不同的分區,通過網絡復制到不同的reduce節點。
- 對多個Map任務的輸出進行合并、排序。
- 將reduce的輸出保存到文件,存放在HDFS中。
3.Hadoop的使用
3.1 安裝
由于Hadoop使用Java語言進行編寫,因此需要安裝JDK。

從CDH中下載Hadoop 2.X并進行解壓,CDH是Cloudrea公司對各種開源框架的整合與優化(較穩定)

- etc目錄:Hadoop配置文件存放目錄。
- logs目錄:Hadoop日志存放目錄。
- bin目錄、sbin目錄:Hadoop可執行命令存放目錄。
etc目錄

bin目錄

sbin目錄

3.2 Hadoop配置
1.配置環境
編輯etc/hadoop/hadoop-env.sh的文件,修改JAVA_HOME配置項為本地JAVA的HOME目錄,此文件是Hadoop啟動時加載的環境變量。

編輯/etc/hosts文件,添加主機名與IP的映射關系。

2.配置Hadoop公共屬性(core-site.xml)
- <configuration>
- <!-- Hadoop工作目錄,用于存放Hadoop運行時產生的臨時數據 -->
- <property>
- <name>hadoop.tmp.dir</name>
- <value>/usr/hadoop/hadoop-2.9.0/data</value>
- </property>
- <!-- NameNode的通信地址,1.x默認9000,2.x可以使用8020 -->
- <property>
- <name>fs.default.name</name>
- <value>hdfs://192.168.1.80:8020</value>
- </property>
- </configuration>
3.配置HDFS(hdfs-site.xml)
- <configuration>
- <!--指定block的副本數量(將block復制到集群中備份數-1個節點的DataNode中)-->
- <property>
- <name>dfs.replication</name>
- <value>1</value>
- </property>
- <!-- 關閉HDFS的訪問權限 -->
- <property>
- <name>dfs.permissions.enabled</name>
- <value>false</value>
- </property>
- </configuration>
4.配置YARN(yarn-site.xml)
- <configuration>
- <!-- 配置Reduce取數據的方式是shuffle(隨機) -->
- <property>
- <name>yarn.nodemanager.aux-services</name>
- <value>mapreduce_shuffle</value>
- </property>
- </configuration>
5.配置MapReduce(mapred-site.xml)
- <configuration>
- <!-- 讓MapReduce任務使用YARN進行調度 -->
- <property>
- <name>mapreduce.framework.name</name>
- <value>yarn</value>
- </property>
- </configuration>
6.配置SSH
由于在啟動hdfs、yarn時都需要對用戶的身份進行驗證,因此可以配置SSH設置免密碼登錄。
- //生成秘鑰
- ssh-keygen -t rsa
- //復制秘鑰到本機
- ssh-copy-id 192.168.1.80
3.3 啟動HDFS
1.格式化NameNode

2.啟動HDFS,將會啟動NameNode、DataNode、SecondaryNameNode三個進程,可以通過jps命令進行查看。

*若啟動時出現錯誤,則可以進入logs目錄查看相應的日志文件。
當HDFS啟動完畢后,可以訪問http://localhost:50070進入HDFS的可視化管理界面,可以在此頁面中監控整個HDFS集群的狀況并且進行文件的上傳以及下載。

*進入HDFS監控頁面下載文件時,會將請求重定向,重定向后的地址的主機名為NameNode的主機名,因此客戶端本地的host文件中需要配置NameNode主機名與IP的映射關系。
3.4 啟動YARN

啟動YARN后,將會啟動ResourceManager以及NodeManager進程,可以通過jps命令進行查看。

當YARN啟動完畢后,可以訪問http://localhost:8088進入YARN的可視化管理界面,可以在此頁面中查看任務的執行情況以及資源的分配。

3.5 使用Shell命令操作HDFS
HDFS中的文件系統與Linux類似,由/代表根目錄。
如果想學習Java工程化、高性能及分布式、深入淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給大家。
- hadoop fs -cat <src>:顯示文件中的內容。
- hadoop fs -copyFromLocal <localsrc> <dst>:將本地中的文件上傳到HDFS。
- hadoop fs -copyToLocal <src> <localdst>:將HDFS中的文件下載到本地。
- hadoop fs -count <path>:查詢指定路徑下文件的個數。
- hadoop fs -cp <src> <dst>:在HDFS內對文件進行復制。
- hadoop fs -get <src> <localdst>:將HDFS中的文件下載到本地。
- hadoop fs -ls <path>:顯示指定目錄下的內容。
- hadoop fs -mkdir <path>:創建目錄。
- hadoop fs -moveFromLocal <localsrc> <dst>:將本地中的文件剪切到HDFS中。
- hadoop fs -moveToLocal <src> <localdst> :將HDFS中的文件剪切到本地中。
- hadoop fs -mv <src> <dst> :在HDFS內對文件進行移動。
- hadoop fs -put <localsrc> <dst>:將本地中的文件上傳到HDFS。
- hadoop fs -rm <src>:刪除HDFS中的文件。
3.6 JAVA中操作HDFS
- /**
- * @Auther: ZHUANGHAOTANG
- * @Date: 2018/11/6 11:49
- * @Description:
- */
- public class HDFSUtils {
- private static Logger logger = LoggerFactory.getLogger(HDFSUtils.class);
- /**
- * NameNode URL
- */
- private static final String NAMENODE_URL = "192.168.1.80:8020";
- /**
- * HDFS文件系統連接對象
- */
- private static FileSystem fs = null;
- static {
- Configuration conf = new Configuration();
- try {
- fs = FileSystem.get(URI.create(NAMENODE_URL), conf);
- } catch (IOException e) {
- logger.info("初始化HDFS連接失?。簕}", e);
- }
- }
- /**
- * 創建目錄
- */
- public static void mkdir(String dir) throws Exception {
- dir = NAMENODE_URL + dir;
- if (!fs.exists(new Path(dir))) {
- fs.mkdirs(new Path(dir));
- }
- }
- /**
- * 刪除目錄或文件
- */
- public static void delete(String dir) throws Exception {
- dir = NAMENODE_URL + dir;
- fs.delete(new Path(dir), true);
- }
- /**
- * 遍歷指定路徑下的目錄和文件
- */
- public static List<String> listAll(String dir) throws Exception {
- List<String> names = new ArrayList<>();
- dir = NAMENODE_URL + dir;
- FileStatus[] files = fs.listStatus(new Path(dir));
- for (FileStatus file : files) {
- if (file.isFile()) { //文件
- names.add(file.getPath().toString());
- } else if (file.isDirectory()) { //目錄
- names.add(file.getPath().toString());
- } else if (file.isSymlink()) { //軟或硬鏈接
- names.add(file.getPath().toString());
- }
- }
- return names;
- }
- /**
- * 上傳當前服務器的文件到HDFS中
- */
- public static void uploadLocalFileToHDFS(String localFile, String hdfsFile) throws Exception {
- hdfsFile = NAMENODE_URL + hdfsFile;
- Path src = new Path(localFile);
- Path dst = new Path(hdfsFile);
- fs.copyFromLocalFile(src, dst);
- }
- /**
- * 通過流上傳文件
- */
- public static void uploadFile(String hdfsPath, InputStream inputStream) throws Exception {
- hdfsPath = NAMENODE_URL + hdfsPath;
- FSDataOutputStream os = fs.create(new Path(hdfsPath));
- BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
- byte[] data = new byte[1024];
- int len;
- while ((len = bufferedInputStream.read(data)) != -1) {
- if (len == data.length) {
- os.write(data);
- } else { //最后一次讀取
- byte[] lastData = new byte[len];
- System.arraycopy(data, 0, lastData, 0, len);
- os.write(lastData);
- }
- }
- inputStream.close();
- bufferedInputStream.close();
- os.close();
- }
- /**
- * 從HDFS中下載文件
- */
- public static byte[] readFile(String hdfsFile) throws Exception {
- hdfsFile = NAMENODE_URL + hdfsFile;
- Path path = new Path(hdfsFile);
- if (fs.exists(path)) {
- FSDataInputStream is = fs.open(path);
- FileStatus stat = fs.getFileStatus(path);
- byte[] data = new byte[(int) stat.getLen()];
- is.readFully(0, data);
- is.close();
- return data;
- } else {
- throw new Exception("File Not Found In HDFS");
- }
- }
- }
3.7 執行一個MapReduce任務
Hadoop中提供了hadoop-mapreduce-examples-2.9.0.jar,其封裝了一些任務計算方法,可以直接進行調用。

*使用hadoop jar命令執行JAR包。
1.創建一個文件,將此文件上傳到HDFS中。

2.使用Hadoop提供的hadoop-mapreduce-examples-2.9.0.jar執行wordcount詞頻統計功能,然后在YARN管理頁面中進行查看。

YARN管理頁面中可以查看任務的執行進度:

3.當任務執行完畢后,可以查看任務的執行結果。

*任務的執行結果將會放到HDFS的文件中。