成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Flume-接入Hive數倉搭建流程

大數據
實時流接入數倉,基本在大公司都會有,在Flume1.8以后支持taildir source, 其有許多特點,而被廣泛使用。本文以開源Flume流為例,介紹流接入HDFS ,后面在其上面建立ods層外表。

實時流接入數倉,基本在大公司都會有,在Flume1.8以后支持taildir source, 其有以下幾個特點,而被廣泛使用:

  1. 使用正則表達式匹配目錄中的文件名
  2. 監控的文件中,一旦有數據寫入,Flume就會將信息寫入到指定的Sink
  3. 高可靠,不會丟失數據
  4. 不會對跟蹤文件有任何處理,不會重命名也不會刪除
  5. 不支持Windows,不能讀二進制文件。支持按行讀取文本文件

本文以開源Flume流為例,介紹流接入HDFS ,后面在其上面建立ods層外表。

1.1 taildir source配置

  1. a1.sources.r1.type = TAILDIR 
  2. a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json 
  3. a1.sources.r1.filegroups = f1 
  4. a1.sources.r1.filegroups.f1 =/opt/hoult/servers/logs/start/.*log 

1.2 hdfs sink 配置

  1. a1.sinks.k1.type = hdfs 
  2. a1.sinks.k1.hdfs.path = /user/data/logs/start/logs/start/%Y-%m-%d/ 
  3. a1.sinks.k1.hdfs.filePrefix = startlog. 
  4. # 配置文件滾動方式(文件大小32M) 
  5. a1.sinks.k1.hdfs.rollSize = 33554432 
  6. a1.sinks.k1.hdfs.rollCount = 0 
  7. a1.sinks.k1.hdfs.rollInterval = 0 
  8. a1.sinks.k1.hdfs.idleTimeout = 0 
  9. a1.sinks.k1.hdfs.minBlockReplicas = 1 
  10. # 向hdfs上刷新的event的個數 
  11. a1.sinks.k1.hdfs.batchSize = 100 
  12. # 使用本地時間 
  13. a1.sinks.k1.hdfs.useLocalTimeStamp = true  

1.3 Agent的配置

 

  1. a1.sources = r1 
  2. a1.sinks = k1 
  3. a1.channels = c1 
  4. # taildir source 
  5. a1.sources.r1.type = TAILDIR 
  6. a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json 
  7. a1.sources.r1.filegroups = f1 
  8. a1.sources.r1.filegroups.f1 = /user/data/logs/start/.*log 
  9. # memorychannel 
  10. a1.channels.c1.type = memory 
  11. a1.channels.c1.capacity = 100000 
  12. a1.channels.c1.transactionCapacity = 2000 
  13. # hdfs sink 
  14. a1.sinks.k1.type = hdfs 
  15. a1.sinks.k1.hdfs.path = /opt/hoult/servers/logs/start/%Y-%m-%d/ 
  16. a1.sinks.k1.hdfs.filePrefix = startlog. 
  17. # 配置文件滾動方式(文件大小32M) 
  18. a1.sinks.k1.hdfs.rollSize = 33554432 
  19. a1.sinks.k1.hdfs.rollCount = 0 
  20. a1.sinks.k1.hdfs.rollInterval = 0 
  21. a1.sinks.k1.hdfs.idleTimeout = 0 
  22. a1.sinks.k1.hdfs.minBlockReplicas = 1 
  23. # 向hdfs上刷新的event的個數 
  24. a1.sinks.k1.hdfs.batchSize = 1000 
  25. # 使用本地時間 
  26. a1.sinks.k1.hdfs.useLocalTimeStamp = true 
  27. # Bind the source and sink to the channel 
  28. a1.sources.r1.channels = c1 
  29. a1.sinks.k1.channel = c1  

/opt/hoult/servers/conf/flume-log2hdfs.conf

1.4 啟動

 

  1. flume-ng agent --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console 
  2.  
  3. export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote" 
  4. # 要想使配置文件生效,還要在命令行中指定配置文件目錄 
  5. flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console 

要$FLUME_HOME/conf/flume-env.sh加下面的參數,否則會報錯誤如下:

1.5 使用自定義攔截器解決Flume Agent替換本地時間為日志里面的時間戳

使用netcat source → logger sink來測試

 

  1. # a1是agent的名稱。source、channel、sink的名稱分別為:r1 c1 k1 
  2. a1.sources = r1 
  3. a1.channels = c1 
  4. a1.sinks = k1 
  5. # source 
  6. a1.sources.r1.type = netcat 
  7. a1.sources.r1.bind = linux121 
  8. a1.sources.r1.port = 9999 
  9. a1.sources.r1.interceptors = i1 
  10. a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder 
  11. # channel 
  12. a1.channels.c1.type = memory 
  13. a1.channels.c1.capacity = 10000 
  14. a1.channels.c1.transactionCapacity = 100 
  15. # sink 
  16. a1.sinks.k1.type = logger 
  17. # source、channel、sink之間的關系 
  18. a1.sources.r1.channels = c1 
  19. a1.sinks.k1.channel = c1  

攔截器主要代碼如下:

 

  1. public class CustomerInterceptor implements Interceptor { 
  2.     private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd"); 
  3.  
  4.     @Override 
  5.     public void initialize() { 
  6.  
  7.     } 
  8.  
  9.     @Override 
  10.     public Event intercept(Event event) { 
  11.         // 獲得body的內容 
  12.         String eventBody = new String(event.getBody(), Charsets.UTF_8); 
  13.         // 獲取header的內容 
  14.         Map<String, String> headerMap = event.getHeaders(); 
  15.         final String[] bodyArr = eventBody.split("\\s+"); 
  16.         try { 
  17.             String jsonStr = bodyArr[6]; 
  18.             if (Strings.isNullOrEmpty(jsonStr)) { 
  19.                 return null
  20.             } 
  21.             // 將 string 轉成 json 對象 
  22.             JSONObject jsonObject = JSON.parseObject(jsonStr); 
  23.             String timestampStr = jsonObject.getString("time"); 
  24.             //將timestamp 轉為時間日期類型(格式 :yyyyMMdd) 
  25.             long timeStamp = Long.valueOf(timestampStr); 
  26.             String date = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeStamp), ZoneId.systemDefault())); 
  27.             headerMap.put("logtime"date); 
  28.             event.setHeaders(headerMap); 
  29.         } catch (Exception e) { 
  30.             headerMap.put("logtime""unknown"); 
  31.             event.setHeaders(headerMap); 
  32.         } 
  33.         return event; 
  34.  
  35.     } 
  36.  
  37.     @Override 
  38.     public List<Event> intercept(List<Event> events) { 
  39.         List<Event> out = new ArrayList<>(); 
  40.         for (Event event : events) { 
  41.             Event outEvent = intercept(event); 
  42.             if (outEvent != null) { 
  43.                 out.add(outEvent); 
  44.             } 
  45.         } 
  46.         return out
  47.     } 
  48.  
  49.     @Override 
  50.     public void close() { 
  51.  
  52.     } 
  53.  
  54.     public static class Builder implements Interceptor.Builder { 
  55.         @Override 
  56.         public Interceptor build() { 
  57.             return new CustomerInterceptor(); 
  58.         } 
  59.  
  60.         @Override 
  61.         public void configure(Context context) { 
  62.         } 
  63.     } 

啟動

 

  1. flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-test.conf -name a1 -Dflume.roog.logger=INFO,console 
  2. ## 測試 
  3. telnet linux121 9999  

吳邪,小三爺,混跡于后臺,大數據,人工智能領域的小菜鳥。

責任編輯:未麗燕 來源: segmentfault.com
相關推薦

2025-06-11 02:45:00

2023-08-07 01:25:39

2022-01-02 23:02:16

數據中臺選型

2022-08-22 17:46:56

虛擬數倉Impala

2021-01-31 23:54:23

數倉模型

2023-01-03 17:43:39

網易郵箱數倉

2021-01-04 05:42:48

數倉模型設計

2022-07-26 15:38:58

數據倉數據治理數據團隊

2022-11-25 10:07:12

數倉數據流開發

2021-08-02 17:24:37

數字化

2023-11-23 16:53:56

數據倉庫大數據

2022-01-13 10:45:48

數倉對象主題域

2022-02-18 09:02:04

數據倉庫治理

2022-03-01 17:16:16

數倉建模ID Mapping

2021-12-02 08:41:30

數倉建模設計

2025-05-27 00:15:07

2022-12-06 17:52:57

離線數倉治理

2023-02-20 07:33:47

Teradata數據倉庫

2021-10-13 07:23:03

數據同步倉庫

2021-08-11 07:53:22

數倉維度建模
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 欧美日韩精品中文字幕 | 国产一级视频在线观看 | 久久男人 | 九九热在线免费视频 | 一区日韩 | 草草影院ccyy| 艹逼网 | 色综合一区二区 | 久久国产精品久久国产精品 | 可以免费观看的av片 | 日韩欧美福利视频 | 韩日av片 | 男女网站在线观看 | 国产精品成人一区二区 | 国产精品久久久久久久久久久免费看 | av片在线观看网站 | 久久精品视频网站 | 在线免费观看成人 | 国产精品久久久久aaaa | 国产精品美女久久久免费 | 亚洲综合网站 | 成人深夜福利 | 精品一区二区av | 国产高清免费视频 | 日韩福利在线 | 成人国产精品免费观看 | 免费观看一区二区三区毛片 | 噜久寡妇噜噜久久寡妇 | 国产精品69毛片高清亚洲 | 久久久久久久久久一区 | 国产视频福利 | 夜夜爽99久久国产综合精品女不卡 | 亚洲a一区 | 九九热热九九 | 久久99深爱久久99精品 | 亚洲国产一 | 国产精品视频观看 | 亚洲精品9999 | 欧美中文字幕一区二区三区亚洲 | 波多野结衣av中文字幕 | 成人国产精品久久 |