成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

快速上手Flink SQL——Table與DataStream之間的互轉(zhuǎn)

運(yùn)維 數(shù)據(jù)庫(kù)運(yùn)維
本篇文章主要會(huì)跟大家分享如何連接kafka,MySQL,作為輸入流和數(shù)出的操作,以及Table與DataStream進(jìn)行互轉(zhuǎn)。

[[358498]]

本篇文章主要會(huì)跟大家分享如何連接kafka,MySQL,作為輸入流和數(shù)出的操作,以及Table與DataStream進(jìn)行互轉(zhuǎn)。

一、將kafka作為輸入流

kafka 的連接器 flink-kafka-connector 中,1.10 版本的已經(jīng)提供了 Table API 的支持。我們可以在 connect方法中直接傳入一個(gè)叫做 Kafka 的類,這就是 kafka 連接器的描述器ConnectorDescriptor。

準(zhǔn)備數(shù)據(jù):

  1. 1,語數(shù) 
  2. 2,英物 
  3. 3,化生 
  4. 4,文學(xué) 
  5. 5,語理 
  6. 6,學(xué)物 

創(chuàng)建kafka主題

  1. ./kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic FlinkSqlTest 

通過命令行的方式啟動(dòng)一個(gè)生產(chǎn)者

  1. [root@node01 bin]# ./kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic FlinkSqlTest 
  2. >1,語數(shù) 
  3. >2,英物  
  4. >3,化生 
  5. >4,文學(xué) 
  6. >5,語理\ 
  7. >6,學(xué)物 

編寫Flink代碼連接到kafka

  1. import org.apache.flink.streaming.api.scala._ 
  2. import org.apache.flink.table.api.DataTypes 
  3. import org.apache.flink.table.api.scala._ 
  4. import org.apache.flink.table.descriptors.{Csv, Kafka, Schema
  5.  
  6. /** 
  7.  * @Package 
  8.  * @author 大數(shù)據(jù)老哥 
  9.  * @date 2020/12/17 0:35 
  10.  * @version V1.0 
  11.  */ 
  12.  
  13. object FlinkSQLSourceKafka { 
  14.   def main(args: Array[String]): Unit = { 
  15.     // 獲取流處理的運(yùn)行環(huán)境 
  16.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  17.     // 獲取table的運(yùn)行環(huán)境 
  18.     val tableEnv = StreamTableEnvironment.create(env) 
  19.     tableEnv.connect
  20.       new Kafka() 
  21.         .version("0.11")  // 設(shè)置kafka的版本 
  22.           .topic("FlinkSqlTest") // 設(shè)置要連接的主題 
  23.         .property("zookeeper.connect","node01:2181,node02:2181,node03:2181")  //設(shè)置zookeeper的連接地址跟端口號(hào) 
  24.         .property("bootstrap.servers","node01:9092,node02:9092,node03:9092") //設(shè)置kafka的連接地址跟端口號(hào) 
  25.     ).withFormat(new Csv()) // 設(shè)置格式 
  26.       .withSchema(new Schema()  // 設(shè)置元數(shù)據(jù)信息 
  27.         .field("id",DataTypes.STRING()) 
  28.         .field("name",DataTypes.STRING()) 
  29.       ).createTemporaryTable("kafkaInputTable") // 創(chuàng)建臨時(shí)表 
  30.      //定義要查詢的sql語句 
  31.     val result = tableEnv.sqlQuery("select * from  kafkaInputTable "
  32.     //打印數(shù)據(jù) 
  33.     result.toAppendStream[(String,String)].print() 
  34.     // 開啟執(zhí)行 
  35.     env.execute("source kafkaInputTable"
  36.   } 

運(yùn)行結(jié)果圖

當(dāng)然也可以連接到 ElasticSearch、MySql、HBase、Hive 等外部系統(tǒng),實(shí)現(xiàn)方式基本上是類似的。

二、表的查詢

利用外部系統(tǒng)的連接器 connector,我們可以讀寫數(shù)據(jù),并在環(huán)境的 Catalog 中注冊(cè)表。接下來就可以對(duì)表做查詢轉(zhuǎn)換了。Flink 給我們提供了兩種查詢方式:Table API 和 SQL。

三、Table API 的調(diào)用

Table API 是集成在 Scala 和 Java 語言內(nèi)的查詢 API。與 SQL 不同,Table API 的查詢不會(huì)用字符串表示,而是在宿主語言中一步一步調(diào)用完成的。 Table API 基于代表一張表的 Table 類,并提供一整套操作處理的方法 API。這些方法會(huì)返回一個(gè)新的 Table 對(duì)象,這個(gè)對(duì)象就表示對(duì)輸入表應(yīng)用轉(zhuǎn)換操作的結(jié)果。有些關(guān)系型轉(zhuǎn)換操作,可以由多個(gè)方法調(diào)用組成,構(gòu)成鏈?zhǔn)秸{(diào)用結(jié)構(gòu)。例如 table.select(…).filter(…) ,其中 select(…) 表示選擇表中指定的字段,filter(…)表示篩選條件。代碼中的實(shí)現(xiàn)如下:

  1. val kafkaInputTable = tableEnv.from("kafkaInputTable"
  2.    kafkaInputTable.select("*"
  3.      .filter('id !=="1"

四、SQL查詢

Flink 的 SQL 集成,基于的是 ApacheCalcite,它實(shí)現(xiàn)了 SQL 標(biāo)準(zhǔn)。在 Flink 中,用常規(guī)字符串來定義 SQL 查詢語句。SQL 查詢的結(jié)果,是一個(gè)新的 Table。

代碼實(shí)現(xiàn)如下:

  1. val result = tableEnv.sqlQuery("select * from  kafkaInputTable "

當(dāng)然,也可以加上聚合操作,比如我們統(tǒng)計(jì)每個(gè)用戶的個(gè)數(shù)

調(diào)用 table API

  1. val result: Table = tableEnv.from("kafkaInputTable"
  2.        result.groupBy("user"
  3.        .select('name,'name.count as 'count

調(diào)用SQL

  1. val result = tableEnv.sqlQuery("select  name ,count(1) as count from kafkaInputTable group by name "

這里 Table API 里指定的字段,前面加了一個(gè)單引號(hào)’,這是 Table API 中定義的 Expression類型的寫法,可以很方便地表示一個(gè)表中的字段。 字段可以直接全部用雙引號(hào)引起來,也可以用半邊單引號(hào)+字段名的方式。以后的代碼中,一般都用后一種形式。

五、將DataStream 轉(zhuǎn)成Table

Flink 允許我們把 Table 和DataStream 做轉(zhuǎn)換:我們可以基于一個(gè) DataStream,先流式地讀取數(shù)據(jù)源,然后 map 成樣例類,再把它轉(zhuǎn)成 Table。Table 的列字段(column fields),就是樣例類里的字段,這樣就不用再麻煩地定義 schema 了。

5.1、代碼實(shí)現(xiàn)

代碼中實(shí)現(xiàn)非常簡(jiǎn)單,直接用 tableEnv.fromDataStream() 就可以了。默認(rèn)轉(zhuǎn)換后的 Table schema 和 DataStream 中的字段定義一一對(duì)應(yīng),也可以單獨(dú)指定出來。

這就允許我們更換字段的順序、重命名,或者只選取某些字段出來,相當(dāng)于做了一次 map 操作(或者 Table API 的 select 操作)。

代碼具體如下:

  1. import org.apache.flink.streaming.api.scala._ 
  2. import org.apache.flink.table.api.scala._ 
  3.  
  4. /** 
  5.  * @Package 
  6.  * @author 大數(shù)據(jù)老哥 
  7.  * @date 2020/12/17 21:21 
  8.  * @version V1.0 
  9.  */ 
  10. object FlinkSqlReadFileTable { 
  11.  
  12.   def main(args: Array[String]): Unit = { 
  13.     // 構(gòu)建流處理運(yùn)行環(huán)境 
  14.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  15.     // 構(gòu)建table運(yùn)行環(huán)境 
  16.     val tableEnv = StreamTableEnvironment.create(env) 
  17.     // 使用流處理來讀取數(shù)據(jù) 
  18.     val readData = env.readTextFile("./data/word.txt"
  19.     // 使用flatMap進(jìn)行切分 
  20.     val word: DataStream[String] = readData.flatMap(_.split(" ")) 
  21.     // 將word 轉(zhuǎn)為 table 
  22.     val table = tableEnv.fromDataStream(word) 
  23.     // 計(jì)算wordcount 
  24.     val wordCount = table.groupBy("f0").select('f0, 'f0.count as 'count
  25.     wordCount.printSchema() 
  26.     //轉(zhuǎn)換成流處理打印輸出 
  27.     tableEnv.toRetractStream[(String,Long)](wordCount).print() 
  28.     env.execute("FlinkSqlReadFileTable"
  29.   } 

5.2 數(shù)據(jù)類型與 Table schema 的對(duì)應(yīng)

DataStream 中的數(shù)據(jù)類型,與表的 Schema之間的對(duì)應(yīng)關(guān)系,是按照樣例類中的字段名來對(duì)應(yīng)的(name-based mapping),所以還可以用 as 做重命名。

另外一種對(duì)應(yīng)方式是,直接按照字段的位置來對(duì)應(yīng)(position-based mapping),對(duì)應(yīng)的過程中,就可以直接指定新的字段名了。

基于名稱的對(duì)應(yīng):

  1. val userTable = tableEnv.fromDataStream(dataStream,'username as 'name,'id as 'myid) 

基于位置的對(duì)應(yīng):

  1. val userTable = tableEnv.fromDataStream(dataStream, 'name, 'id) 

Flink 的 DataStream 和 DataSet API 支持多種類型。組合類型,比如元組(內(nèi)置 Scala 和 Java 元組)、POJO、Scala case 類和 Flink 的 Row 類型等,允許具有多個(gè)字段的嵌套數(shù)據(jù)結(jié)構(gòu),這些字段可以在 Table 的表達(dá)式中訪問。其他類型,則被視為原子類型。

元組類型和原子類型,一般用位置對(duì)應(yīng)會(huì)好一些;如果非要用名稱對(duì)應(yīng),也是可以的:元組類型,默認(rèn)的名稱是_1, _2;而原子類型,默認(rèn)名稱是 f0。

六、創(chuàng)建臨時(shí)視圖(Temporary View)

創(chuàng)建臨時(shí)視圖的第一種方式,就是直接從 DataStream 轉(zhuǎn)換而來。同樣,可以直接對(duì)應(yīng)字段轉(zhuǎn)換;也可以在轉(zhuǎn)換的時(shí)候,指定相應(yīng)的字段。代碼如下:

  1. tableEnv.createTemporaryView("sensorView", dataStream)  
  2. tableEnv.createTemporaryView("sensorView", dataStream, 'id, 'temperature,'timestamp as 'ts) 

另外,當(dāng)然還可以基于 Table 創(chuàng)建視圖:

  1. tableEnv.createTemporaryView("sensorView", sensorTable) 

View 和 Table 的 Schema 完全相同。事實(shí)上,在 Table API 中,可以認(rèn)為 View 和 Table是等價(jià)的。

總結(jié)

上述文章了主要講解了以kafka方式作為輸入流進(jìn)行流失處理,其實(shí)我也可以設(shè)置MySQL、ES、MySQL 等,都是類似的,以及table API 與sql之間的區(qū)別,還講解了DataStream轉(zhuǎn)換位Table 或者Table 轉(zhuǎn)換為DataStream這樣的或我們后面在做數(shù)據(jù)分析的時(shí)候就非常簡(jiǎn)單了。

 本文轉(zhuǎn)載自微信公眾號(hào)「大數(shù)據(jù)老哥」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系大數(shù)據(jù)老哥公眾號(hào)。

 

責(zé)任編輯:武曉燕 來源: 大數(shù)據(jù)老哥
相關(guān)推薦

2021-12-17 07:54:16

Flink SQLTable DataStream

2022-08-19 07:13:45

SQL方法編程

2021-02-19 08:11:39

Flink Function接口

2011-07-26 13:58:17

LINQ

2017-07-05 17:50:52

KotlinJava程序員

2019-01-15 08:50:12

Apache FlinKafka分布式

2022-06-04 07:26:47

Thanos集群Prometheus

2013-11-19 12:53:33

OA信息化

2021-12-10 08:13:02

MatplotlibpythonAPI

2022-08-21 07:17:16

LinkerdKubernetes服務(wù)網(wǎng)格

2010-09-16 16:17:03

TRUNCATE TA

2009-05-11 14:19:49

數(shù)據(jù)遷移OracleSQL Server

2020-10-26 08:31:41

Python爬蟲開發(fā)

2010-05-18 10:17:11

2011-07-25 15:42:58

XML

2023-06-13 08:00:57

ChatGPT語言模型

2024-01-29 00:36:50

Backstage設(shè)施工具

2009-09-16 09:56:42

LINQ to SQL

2011-05-04 13:24:39

Ubuntu 11.0

2018-04-24 10:05:13

Docker工具交付
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 一区二区三区四区电影视频在线观看 | 午夜精品一区二区三区在线视频 | 黄色片在线观看网址 | 亚洲高清视频在线 | 国产精品99久久久久久宅男 | 成人性生交大片免费看中文带字幕 | av一二三四 | 在线播放中文字幕 | 91精品国产日韩91久久久久久 | 九一在线 | 国产高清免费视频 | 中文字幕 亚洲一区 | 手机三级电影 | 蜜桃视频在线观看www社区 | 在线免费观看a级片 | 91国产在线视频在线 | 日韩在线| 久久99精品久久久97夜夜嗨 | 国内精品视频免费观看 | 午夜a区| 波多野结衣电影一区 | 干干天天 | 亚欧洲精品在线视频免费观看 | 高清久久久 | 91久久| 国产伦精品一区二区三毛 | 国产在线一区二区三区 | 一级国产精品一级国产精品片 | 黄色欧美在线 | h片在线看| 综合亚洲视频 | 国产精品久久久久久久久久免费看 | 国产精品久久久久久久久久不蜜臀 | 日本久久久久久 | 亚洲精品女优 | 国产精品不卡视频 | 成年女人免费v片 | 一区二区在线免费观看 | 91视视频在线观看入口直接观看 | 亚洲精品乱码久久久久久久久久 | 精品欧美一区二区三区精品久久 |