成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Flink SQL 知其所以然:在 Flink 中還能使用 Hive Udf?(附源碼)

數據庫 其他數據庫
本文主要介紹 flink sql 流任務中,不能使用 create temporary function 去引入一個用戶自定義的 hive udf。因此博主只能通過 flink sql 提供的 module 插件能力,自定義了 module,來支持引入用戶自定義的 hive udf。

[[440014]]

 1.序篇

廢話不多說,咱們先直接上本文的目錄和結論,小伙伴可以先看結論快速了解博主期望本文能給小伙伴們帶來什么幫助:

  • 背景及應用場景介紹:博主期望你能了解到,其實很多場景下實時數倉的建設都是隨著離線數倉而建設的(相同的邏輯在實時數倉中重新實現一遍),因此能夠在 flink sql 中復用 hive udf 是能夠大大提高人效的。
  • flink 擴展支持 hive 內置 udf:flink sql 提供了擴展 udf 的能力,即 module,并且 flink sql 也內置了 HiveModule(需要你主動加載進環境),來支持一些 hive 內置的 udf (比如 get_json_object)給小伙伴們使用。
  • flink 擴展支持用戶自定義的 hive udf:主要介紹 flink sql 流任務中,不能使用 create temporary function 去引入一個用戶自定義的 hive udf。因此博主只能通過 flink sql 提供的 module 插件能力,自定義了 module,來支持引入用戶自定義的 hive udf。

2.背景及應用場景介紹

其實大多數公司都是從離線數倉開始建設的。相信大家必然在自己的生產環境中開發了非常多的 hive udf。隨著需求對于時效性要求的增高,越來越多的公司也開始建設起實時數倉。很多場景下實時數倉的建設都是隨著離線數倉而建設的。實時數據使用 flink 產出,離線數據使用 hive\spark 產出。

那么回到我們文章標題的問題:為什么需要 flink 支持 hive udf 呢?

博主分析了下,結論如下:

站在數據需求的角度來說,一般會有以下兩種情況:

  • 以前已經有了離線數據鏈路,需求方也想要實時數據。如果直接能用已經開發好的 hive udf,則不用將相同的邏輯遷移到 flink udf 中,并且后續無需費時費力維護兩個 udf 的邏輯一致性。
  • 實時和離線的需求都是新的,需要新開發。如果只開發一套 udf,則事半功倍。

因此在 flink 中支持 hive udf 這件事對開發人員提效來說是非常有好處的。

3.在擴展前,你需要知道一些基本概念

  • flink 支持 hive udf 這件事分為兩個部分。
  • flink 擴展支持 hive 內置 udf

flink 擴展支持用戶自定義 hive udf

第一部分:flink 擴展支持 hive 內置 udf,比如 get_json_object,rlike 等等。

有同學問了,這么基本的 udf,flink 都沒有嗎?

確實沒有。關于 flink sql 內置的 udf 見如下鏈接,大家可以看看 flink 支持了哪些 udf:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/functions/systemfunctions/

那么如果我如果強行使用 get_json_object 這個 udf,會發生啥呢?結果如下圖。

直接報錯找不到 udf。

第二部分:flink 擴展支持用戶自定義 hive udf。

內置函數解決不了用戶的復雜需求,用戶就需要自己寫 hive udf,并且這部分自定義 udf 也想在 flink sql 中使用。

下面看看怎么在 flink sql 中進行這兩種擴展。

4.hive udf 擴展支持

4.1.flink sql module

涉及到擴展 udf 就不得不提到 flink 提供的 module。見官網下圖。

從第一句話就可以看到,module 的作用就是讓用戶去擴展 udf 的。

flink 本身已經內置了一個 module,名字叫 CoreModule,其中已經包含了一些 udf。

那我們要怎么使用 module 這玩意去擴展我們的 hive udf 呢?

4.2.flink 擴展支持 hive 內置 udf

步驟如下:

引入 hive 的 connector。其中包含了 flink 官方提供的一個 HiveModule。在 HiveModule 中包含了 hive 內置的 udf。

  1. <dependency> 
  2.     <groupId>org.apache.flink</groupId> 
  3.     <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> 
  4.     <version>${flink.version}</version> 
  5. </dependency> 

在 StreamTableEnvironment 中加載 HiveModule。

  1. String name = "default"
  2. String version = "3.1.2"
  3. tEnv.loadModule(name, new HiveModule(version)); 

然后在控制臺打印一下目前有的 module。

  1. String[] modules = tEnv.listModules(); 
  2. Arrays.stream(modules).forEach(System.out::println); 

然后可以看到除了 core module,還有我們剛剛加載進去的 default module。

  1. default 
  2. core 

查看所有 module 的所有 udf。在控制臺打印一下。

  1. String[] functions = tEnv.listFunctions(); 
  2. Arrays.stream(functions).forEach(System.out::println); 

就會將 default 和 core module 中的所有包含的 udf 給列舉出來,當然也就包含了 hive module 中的 get_json_object。

然后我們再去在 flink sql 中使用 get_json_object 這個 udf,就沒有報錯,能正常輸出結果了。

使用 flink hive connector 自帶的 HiveModule,已經能夠解決很大一部分常見 udf 使用的問題了。

4.2.flink 擴展支持用戶自定義 hive udf

原本博主是直接想要使用 flink sql 中的 create temporary function 去執行引入自定義 hive udf 的。

舉例如下:

  1. CREATE TEMPORARY FUNCTION test_hive_udf as 'flink.examples.sql._09.udf._02_stream_hive_udf.TestGenericUDF'

發現在執行這句 sql 時,是可以執行成功,將 udf 注冊進去的。

但是在后續 udf 初始化時就報錯了。具體錯誤如下圖。直接報錯 ClassCastException。

看了下源碼,flink 流環境下(未連接 hive catalog 時)在創建 udf 時會認為這個 udf 是 flink 生態體系中的 udf。

所以在初始化我們引入的 TestGenericUDF 時,默認會按照 flink 的 UserDefinedFunction 強轉,因此才會報強轉錯誤。

那么我們就不能使用 hive udf 了嗎?

錯誤,小伙伴萌豈敢有這種想法。博主都把這個標題列出來了(牛逼都吹出去了),還能給不出解決方案嘛。

思路見下一章節。

4.3.flink 擴展支持用戶自定義 hive udf 的增強 module

其實思路很簡單。

使用 flink sql 中的 create temporary function 雖然不能執行,但是 flink 提供了插件化的自定義 module。

我們可以擴展一個支持用戶自定義 hive udf 的 module,使用這個 module 來支持自定義的 hive udf。

實現的代碼也非常簡單。簡單的把 flink hive connector 提供的 HiveModule 做一個增強即可,即下圖中的 HiveModuleV2。

使用方式如下圖所示:

然后程序就正常跑起來了。

肥腸滴好用!

5.總結與展望

本文主要介紹了如果在 flink sql 使用 hive 內置 udf 及用戶自定義 hive udf,總結如下:

  • 背景及應用場景介紹:博主期望你能了解到,其實很多場景下實時數倉的建設都是隨著離線數倉而建設的(相同的邏輯在實時數倉中重新實現一遍),因此能夠在 flink sql 中復用 hive udf 是能夠大大提高人效的。
  • flink 擴展支持 hive 內置 udf:flink sql 提供了擴展 udf 的能力,即 module,并且 flink sql 也內置了 HiveModule(需要你主動加載進環境),來支持一些 hive 內置的 udf (比如 get_json_object)給小伙伴們使用。
  • flink 擴展支持用戶自定義的 hive udf:主要介紹 flink sql 流任務中,不能使用 create temporary function 去引入一個用戶自定義的 hive udf。因此博主只能通過 flink sql 提供的 module 插件能力,自定義了 module,來支持引入用戶自定義的 hive udf。

 

責任編輯:姜華 來源: 大數據羊說
相關推薦

2022-05-22 10:02:32

CREATESQL 查詢SQL DDL

2022-07-05 09:03:05

Flink SQLTopN

2022-06-10 09:01:04

OverFlinkSQL

2022-06-06 09:27:23

FlinkSQLGroup

2022-05-18 09:02:28

Flink SQLSQL字符串

2021-12-09 06:59:24

FlinkSQL 開發

2022-05-15 09:57:59

Flink SQL時間語義

2022-05-27 09:02:58

SQLHive語義

2022-06-29 09:01:38

FlinkSQL時間屬性

2022-05-12 09:02:47

Flink SQL數據類型

2021-11-28 11:36:08

SQL Flink Join

2021-09-12 07:01:07

Flink SQL ETL datastream

2022-08-10 10:05:29

FlinkSQL

2021-11-27 09:03:26

flink join數倉

2021-12-05 08:28:39

Flink SQLbatch lookuSQL

2021-12-17 07:54:16

Flink SQLTable DataStream

2022-06-18 09:26:00

Flink SQLJoin 操作

2021-11-30 23:30:45

sql 性能異步

2021-12-06 07:15:47

開發Flink SQL

2022-05-09 09:03:04

SQL數據流數據
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲视频中文字幕 | 婷婷二区 | 国产东北一级毛片 | 国产丝袜人妖cd露出 | 国产精品视频免费播放 | 亚洲444eee在线观看 | 久久久精| 精品国产乱码久久久久久果冻传媒 | 黄色片在线免费看 | 亚洲综合字幕 | 精品久久精品 | 亚洲欧美中文日韩在线v日本 | 在线播放一区二区三区 | 999久久久 | 久久99精品久久久久久国产越南 | 国产丝袜一区二区三区免费视频 | 亚洲一区在线播放 | 日本在线看片 | 99国产精品99久久久久久 | 久久精品一区二区三区四区 | 国产精品777一区二区 | 日韩在线精品视频 | 亚洲在线免费观看 | 久久免费视频1 | 99精品国产一区二区三区 | 日本二区| 欧美日韩视频一区二区 | 亚洲国产精品久久久 | 久久免费大片 | 网址黄| 人人九九精 | 欧美日韩视频网站 | 九九九视频精品 | 亚洲 中文 欧美 日韩 在线观看 | 精品久久久久久久久久久久久 | 亚洲小视频在线观看 | 91精品久久久久久久久中文字幕 | 日韩国产专区 | 中文天堂在线观看 | 久久久久久网站 | 欧美一区二区在线观看 |