成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

十分鐘入門Fink SQL

運(yùn)維 數(shù)據(jù)庫運(yùn)維
本篇文章主要講解了Flink SQL 入門操作,后面我會(huì)分享一些關(guān)于Flink SQL連接Kafka、輸出到kafka、MySQL等

[[358221]]

前言

Flink 本身是批流統(tǒng)一的處理框架,所以 Table API 和 SQL,就是批流統(tǒng)一的上層處理 API。目前功能尚未完善,處于活躍的開發(fā)階段。 Table API 是一套內(nèi)嵌在 Java 和 Scala 語言中的查詢 API,它允許我們以非常直觀的方式,組合來自一些關(guān)系運(yùn)算符的查詢(比如 select、filter 和 join)。而對于 Flink SQL,就是直接可以在代碼中寫 SQL,來實(shí)現(xiàn)一些查詢(Query)操作。Flink 的 SQL 支持,基于實(shí)現(xiàn)了 SQL 標(biāo)準(zhǔn)的 Apache Calcite(Apache 開源 SQL 解析工具)。圖片

1、導(dǎo)入所需要的的依賴包

  1. <dependency> 
  2.           <groupId>org.apache.flink</groupId> 
  3.           <artifactId>flink-table-planner_2.12</artifactId> 
  4.           <version>1.10.1</version> 
  5.       </dependency> 
  6.       <dependency> 
  7.           <groupId>org.apache.flink</groupId> 
  8.           <artifactId>flink-table-api-scala-bridge_2.12</artifactId> 
  9.           <version>1.10.1</version> 
  10.       </dependency> 
  11.       <dependency> 
  12.           <groupId>org.apache.flink</groupId> 
  13.           <artifactId>flink-csv</artifactId> 
  14.           <version>1.10.1</version> 
  15.      </dependency> 

flink-table-planner:planner 計(jì)劃器,是 table API 最主要的部分,提供了運(yùn)行時(shí)環(huán)境和生成程序執(zhí)行計(jì)劃的 planner; flink-table-api-scala-bridge:bridge 橋接器,主要負(fù)責(zé) table API 和 DataStream/DataSet API的連接支持,按照語言分 java 和 scala。

這里的兩個(gè)依賴,是 IDE 環(huán)境下運(yùn)行需要添加的;如果是生產(chǎn)環(huán)境,lib 目錄下默認(rèn)已經(jīng)有了 planner,就只需要有 bridge 就可以了。

當(dāng)然,如果想使用用戶自定義函數(shù),或是跟 kafka 做連接,需要有一個(gè) SQL client,這個(gè)包含在 flink-table-common 里。

2、兩種 planner(old& blink)的區(qū)別

  1. 批流統(tǒng)一:Blink 將批處理作業(yè),視為流式處理的特殊情況。所以,blink 不支持表和DataSet 之間的轉(zhuǎn)換,批處理作業(yè)將不轉(zhuǎn)換為 DataSet 應(yīng)用程序,而是跟流處理一樣,轉(zhuǎn)換為 DataStream 程序來處理。
  2. 因 為 批 流 統(tǒng) 一 , Blink planner 也 不 支 持 BatchTableSource , 而 使 用 有 界 的
  3. Blink planner 只支持全新的目錄,不支持已棄用的 ExternalCatalog。
  4. 舊 planner 和 Blink planner 的 FilterableTableSource 實(shí)現(xiàn)不兼容。舊的 planner 會(huì)把PlannerExpressions 下推到 filterableTableSource 中,而 blink planner 則會(huì)把 Expressions 下推。
  5. 基于字符串的鍵值配置選項(xiàng)僅適用于 Blink planner。
  6. PlannerConfig 在兩個(gè) planner 中的實(shí)現(xiàn)不同。
  7. Blink planner 會(huì)將多個(gè) sink 優(yōu)化在一個(gè) DAG 中(僅在 TableEnvironment 上受支持,而在 StreamTableEnvironment 上不受支持)。而舊 planner 的優(yōu)化總是將每一個(gè) sink 放在一個(gè)新的 DAG 中,其中所有 DAG 彼此獨(dú)立。
  8. 舊的 planner 不支持目錄統(tǒng)計(jì),而 Blink planner 支持。

3、表(Table)的概念

TableEnvironment 可以注冊目錄 Catalog,并可以基于 Catalog 注冊表。它會(huì)維護(hù)一個(gè)Catalog-Table 表之間的 map。 表(Table)是由一個(gè)標(biāo)識符來指定的,由 3 部分組成:Catalog 名、數(shù)據(jù)庫(database)名和對象名(表名)。如果沒有指定目錄或數(shù)據(jù)庫,就使用當(dāng)前的默認(rèn)值。

4、連接到文件系統(tǒng)(Csv 格式)

連接外部系統(tǒng)在 Catalog 中注冊表,直接調(diào)用 tableEnv.connect()就可以,里面參數(shù)要傳入一個(gè) ConnectorDescriptor,也就是 connector 描述器。對于文件系統(tǒng)的 connector 而言,flink內(nèi)部已經(jīng)提供了,就叫做 FileSystem()。

5、測試案例 (新)

需求: 將一個(gè)txt文本文件作為輸入流讀取數(shù)據(jù)過濾id不等于sensor_1的數(shù)據(jù)實(shí)現(xiàn)思路: 首先我們先構(gòu)建一個(gè)table的env環(huán)境通過connect提供的方法來讀取數(shù)據(jù)然后設(shè)置表結(jié)構(gòu)將數(shù)據(jù)注冊為一張表就可進(jìn)行我們的數(shù)據(jù)過濾了(使用sql或者流處理方式進(jìn)行解析)

準(zhǔn)備數(shù)據(jù)

  1. sensor_1,1547718199,35.8 
  2. sensor_6,1547718201,15.4 
  3. sensor_7,1547718202,6.7 
  4. sensor_10,1547718205,38.1 
  5. sensor_1,1547718206,32 
  6. sensor_1,1547718208,36.2 
  7. sensor_1,1547718210,29.7 
  8. sensor_1,1547718213,30.9 

代碼實(shí)現(xiàn)

  1. import org.apache.flink.streaming.api.scala._ 
  2. import org.apache.flink.table.api.{DataTypes} 
  3. import org.apache.flink.table.api.scala._ 
  4. import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema
  5.  
  6. /** 
  7.  * @Package 
  8.  * @author 大數(shù)據(jù)老哥 
  9.  * @date 2020/12/12 21:22 
  10.  * @version V1.0 
  11.  *          第一個(gè)Flinksql測試案例 
  12.  */ 
  13.  
  14. object FlinkSqlTable { 
  15.   def main(args: Array[String]): Unit = { 
  16.     // 構(gòu)建運(yùn)行流處理的運(yùn)行環(huán)境 
  17.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  18.     // 構(gòu)建table環(huán)境 
  19.     val tableEnv = StreamTableEnvironment.create(env) 
  20.      //通過 connect 讀取數(shù)據(jù) 
  21.     tableEnv.connect(new FileSystem().path("D:\\d12\\Flink\\FlinkSql\\src\\main\\resources\\sensor.txt")) 
  22.       .withFormat(new Csv()) //設(shè)置類型 
  23.       .withSchema(new Schema() // 給數(shù)據(jù)添加元數(shù)信息 
  24.         .field("id", DataTypes.STRING()) 
  25.         .field("time", DataTypes.BIGINT()) 
  26.         .field("temperature", DataTypes.DOUBLE()) 
  27.       ).createTemporaryTable("inputTable")  // 創(chuàng)建一個(gè)臨時(shí)表 
  28.      
  29.     val resTable = tableEnv.from("inputTable"
  30.       .select("*").filter('id === "sensor_1"
  31.     // 使用sql的方式查詢數(shù)據(jù) 
  32.     var resSql = tableEnv.sqlQuery("select * from inputTable where id='sensor_1'"
  33.     // 將數(shù)據(jù)轉(zhuǎn)為流進(jìn)行輸出 
  34.     resTable.toAppendStream[(String, Long, Double)].print("resTable"
  35.     resSql.toAppendStream[(String, Long, Double)].print("resSql"
  36.  
  37.     env.execute("FlinkSqlWrodCount"
  38.   } 

6、TableEnvironment 的作用

  • 注冊 catalog
  • 在內(nèi)部 catalog 中注冊表
  • 執(zhí)行 SQL 查詢
  • 注冊用戶自定義函數(shù)
  • 注冊用戶自定義函數(shù)
  • 保存對 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

在創(chuàng)建 TableEnv 的時(shí)候,可以多傳入一個(gè) EnvironmentSettings 或者 TableConfig 參數(shù),可以用來配置 TableEnvironment 的一些特性。

7、 老版本創(chuàng)建流處理批處理

7.1老版本流處理

  1. val settings = EnvironmentSettings.newInstance() 
  2. .useOldPlanner() // 使用老版本 planner 
  3. .inStreamingMode() // 流處理模式 
  4. .build() 
  5. val tableEnv = StreamTableEnvironment.create(env, settings) 

7.2 老版本批處理

  1. val batchEnv = ExecutionEnvironment.getExecutionEnvironment  
  2. val batchTableEnv = BatchTableEnvironment.create(batchEnv) 

7.3 blink 版本的流處理環(huán)境

  1. val bsSettings = EnvironmentSettings.newInstance() 
  2. .useBlinkPlanner() 
  3. .inStreamingMode().build() 
  4. val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) 

7.4 blink 版本的批處理環(huán)境

  1. val bbSettings = EnvironmentSettings.newInstance() 
  2. .useBlinkPlanner() 
  3. .inBatchMode().build() 
  4. val bbTableEnv = TableEnvironment.create(bbSettings) 

總結(jié):

本篇文章主要講解了Flink SQL 入門操作,后面我會(huì)分享一些關(guān)于Flink SQL連接Kafka、輸出到kafka、MySQL等

本文轉(zhuǎn)載自微信公眾號「 大數(shù)據(jù)老哥」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系 大數(shù)據(jù)老哥公眾號。

 

責(zé)任編輯:武曉燕 來源: 大數(shù)據(jù)老哥
相關(guān)推薦

2022-06-16 07:31:41

Web組件封裝HTML 標(biāo)簽

2012-07-10 01:22:32

PythonPython教程

2024-05-13 09:28:43

Flink SQL大數(shù)據(jù)

2019-04-01 14:59:56

負(fù)載均衡服務(wù)器網(wǎng)絡(luò)

2023-06-07 08:27:10

Docker容器

2024-06-19 09:58:29

2021-09-07 09:40:20

Spark大數(shù)據(jù)引擎

2023-04-12 11:18:51

甘特圖前端

2023-11-30 10:21:48

虛擬列表虛擬列表工具庫

2015-09-06 09:22:24

框架搭建快速高效app

2023-10-07 00:06:09

SQL數(shù)據(jù)庫

2019-09-16 09:14:51

2009-10-09 14:45:29

VB程序

2022-08-26 09:01:07

CSSFlex 布局

2023-07-15 18:26:51

LinuxABI

2024-11-07 16:09:53

2023-11-09 14:44:27

Docker鏡像容器

2020-12-11 09:40:10

DevOpsCICD

2015-11-06 11:03:36

2022-04-13 22:01:44

錯(cuò)誤監(jiān)控系統(tǒng)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 国产精品特级毛片一区二区三区 | 国产一级一级国产 | 成人福利在线观看 | av高清毛片| 综合成人在线 | 91欧美精品成人综合在线观看 | 久久国产婷婷国产香蕉 | 一区二区不卡 | 性色av一区| 在线观看精品视频网站 | 在线国产中文字幕 | 国产高清在线精品一区二区三区 | 逼逼网| 欧美在线a | 欧美一区二区免费电影 | 亚洲精品久久 | 色综合国产 | 国产色播av在线 | 婷婷午夜天 | 日日摸天天添天天添破 | 色资源在线 | 看片91| 91视频a | 欧美精品一区二区在线观看 | 黄网免费看 | 一区日韩 | 91人人视频在线观看 | 中文一区二区 | av中文字幕在线播放 | 精品国产乱码久久久久久丨区2区 | 亚洲精品一区二区 | 99热视 | 请别相信他免费喜剧电影在线观看 | 国产成人精品综合 | 韩日精品一区 | 91在线视频观看免费 | 久久久久国产精品www | 亚洲一区二区在线视频 | 亚洲国产精品99久久久久久久久 | 日本精品一区二区三区在线观看视频 | 国产高清视频一区 |