成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

FlinkSQL 中 Catalog 的使用場景及案例詳解

大數據
Catalog 在 Flink SQL 中是一個元數據管理組件,用于存儲和管理數據庫、表、視 圖、函數等元數據對象的抽象接口。

Catalog 在 Flink SQL 中是一個元數據管理組件,用于存儲和管理數據庫、表、視 圖、函數等元數據對象的抽象接口。它類似于傳統數據庫系統中的元數據倉庫,為 Flink SQL 提供了元數據管理能力。

Catalog 使 Flink 能夠:

  • 以統一的方式訪問不同的外部系統
  • 減少代碼中的硬編 碼配置
  • 實現表元數據的持久化
  • 支持跨會話的元數據共享。

1. Catalog的作用

(1) 管理元數據對象

Catalog 可以管理以下元數據對象: - 數據庫(Database) - 表(Table) - 視圖(View) - 函數(Function) - 分區(Partition)等

(2) 支持多樣化的元數據存儲

Flink 支持多種Catalog 實現,可以連接各種外部元數據系統: - 內存Catalog(默認) - Hive Catalog - JDBC Catalog - 自定義 Catalog

(3) 提供統一的數據訪問接口

無論底層元數據存儲在哪里,都可以通過統一的接口訪問和操作

(4) 簡化元數據管理

開發者可以通過Catalog 注冊永久表,而不是在代碼中重復定義表結構

2. Flink內置的catalog類型

(1) GenericInMemoryCatalog

默認的內存 Catalog,元數據只在單個 Flink 會話中有效,會話結束數據就會丟失。

// 創建內存Catalog 
Catalog inmemory = new GenericInMemoryCatalog("in_memory_catalog"); 
tableEnv.registerCatalog("in_memory_catalog", inmemory); -- SQL 中創建和使用內存Catalog 
CREATE CATALOG my_memory_catalog WITH ( 
'type' = 'generic_in_memory' 
); 
USE CATALOG my_memory_catalog;

(2) HiveCatalog

使用Hive Metastore 存儲元數據,支持持久化,適合生產環境。

// 創建Hive Catalog 
String name = "my_hive_catalog"; 
String defaultDatabase = "default"; 
String hiveConfDir = "/path/to/hive/conf"; 
String version = "2.3.6"; 
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, 
version); 
tableEnv.registerCatalog("my_hive_catalog", hive); -- SQL 中創建和使用Hive Catalog 
CREATE CATALOG my_hive_catalog WITH ( 
'type' = 'hive', 
'default-database' = 'default', 
'hive-conf-dir' = '/path/to/hive/conf', 
'hive-version' = '2.3.6' 
); 
USE CATALOG my_hive_catalog; 
3. JdbcCatalog 
使用關系型數據庫存儲元數據。 
// 創建JDBC Catalog 
String name = "my_jdbc_catalog"; 
String defaultDatabase = "default"; 
String username = "username"; 
String password = "password"; 


String baseUrl = "jdbc:mysql://localhost:3306"; 


JdbcCatalog jdbc = new JdbcCatalog(name, defaultDatabase, username, pas
 sword, baseUrl); 
tableEnv.registerCatalog("my_jdbc_catalog", jdbc); -- SQL中創建和使用JDBC Catalog(Flink 1.15+) 
CREATE CATALOG my_jdbc_catalog WITH ( 
    'type' = 'jdbc', 
    'default-database' = 'default', 
    'username' = 'username', 
    'password' = 'password', 
    'base-url' = 'jdbc:mysql://localhost:3306' 
); 


USE CATALOG my_jdbc_catalog;

(3) 使用Catalog的SQL操作

1. 創建和切換Catalog -- 創建Catalog 
CREATE CATALOG my_catalog WITH ( 
    'type' = 'hive', 
    'hive-conf-dir' = '/path/to/hive/conf' 
); 
 -- 查看所有Catalog 
SHOW CATALOGS; 
 -- 切換當前Catalog 
USE CATALOG my_catalog; 
2. 創建和使用數據庫 -- 創建數據庫 
CREATE DATABASE my_database; 
 -- 查看所有數據庫 
SHOW DATABASES; 
 -- 切換當前數據庫 
USE my_database; 
3. 管理表和視圖 -- 創建表 
CREATE TABLE my_table ( 
    id INT, 
    name STRING, 
    create_time TIMESTAMP(3) 


) WITH ( 
'connector' = 'kafka', 
'topic' = 'my_topic', 
'properties.bootstrap.servers' = 'localhost:9092', 
'format' = 'json' 
); -- 查看所有表 
SHOW TABLES; -- 查看表結構 
DESCRIBE my_table; 
4. 管理函數 -- 創建自定義函數 
CREATE FUNCTION my_function AS 'com.example.MyFunction'; -- 查看所有函數 
SHOW FUNCTIONS;

(4) Catalog 的實際應用示

// 跨會話持久化元數據 
// 會話1:注冊Hive Catalog和表 
StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecuti
 onEnvironment(); 
StreamTableEnvironment tEnv1 = StreamTableEnvironment.create(env1); 
tEnv1.executeSql("CREATE CATALOG hive_catalog WITH ('type' = 'hive')"); 
tEnv1.executeSql("USE CATALOG hive_catalog"); 
tEnv1.executeSql("CREATE DATABASE IF NOT EXISTS db1"); 
tEnv1.executeSql("USE db1"); 
tEnv1.executeSql( 
"CREATE TABLE IF NOT EXISTS orders (" + 
"  order_id STRING, " + 
"  price DECIMAL(10, 2)" + 
") WITH (" + 
"  'connector' = 'kafka', " + 
"  'topic' = 'orders'" + 
")"); 
// 會話2:直接使用之前注冊的表 
StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecuti
 onEnvironment(); 
StreamTableEnvironment tEnv2 = StreamTableEnvironment.create(env2); 
tEnv2.executeSql("USE CATALOG hive_catalog"); 
tEnv2.executeSql("USE db1"); 

// 不需要重新定義表結構,可以直接查詢 
tEnv2.executeSql("SELECT * FROM orders"); 
使用不同類型的Catalog實現多源數據集成 
// 注冊多個Catalog訪問不同系統 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutio
 nEnvironment(); 
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); 
// 注冊Hive Catalog 
tEnv.executeSql("CREATE CATALOG hive_catalog WITH ('type' = 'hive')"); 
// 注冊JDBC Catalog 
tEnv.executeSql( 
"CREATE CATALOG jdbc_catalog WITH (" + 
"  'type' = 'jdbc', " + 
"  'default-database' = 'default', " + 
"  'username' = 'user', " + 
"  'password' = 'password', " + 
"  'base-url' = 'jdbc:mysql://localhost:3306'" + 
")"); 
// 從不同Catalog中的表關聯查詢 
tEnv.executeSql( 
"SELECT o.order_id, o.price, c.name " + 
"FROM hive_catalog.db1.orders o " + 
"JOIN jdbc_catalog.default.customers c " + 
"ON o.customer_id = c.id" 
); 
責任編輯:趙寧寧 來源: 大數據技能圈
相關推薦

2013-12-25 16:03:39

GitGit 命令

2022-07-29 07:48:15

HTTP常用狀態碼

2025-02-07 14:33:04

2020-02-14 13:50:32

JavaScript前端技術

2023-05-15 08:50:58

ContextGolang

2024-10-10 08:46:28

2024-10-06 12:35:50

2020-09-04 13:30:43

Java自定義代碼

2023-05-16 07:47:18

RabbitMQ消息隊列系統

2013-07-10 15:52:17

fragmentAndroid

2024-04-16 12:13:07

usingC#開發

2024-01-30 09:43:43

Java緩存技術

2022-12-08 11:54:55

元宇宙

2024-11-12 06:27:16

Python列表元組

2018-08-15 09:48:27

數據庫Redis應用場景

2009-08-03 11:38:57

linux at命令詳linux at命令

2011-05-16 15:49:58

JAVA

2009-05-18 13:07:44

類隱藏Java關鍵字

2024-09-06 11:52:47

2025-02-11 09:49:12

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产一区 | 香蕉二区 | 在线免费看黄 | 亚洲视频在线看 | 一区二区三区在线 | 亚洲日日操 | 亚洲第一成人av | 狠狠操狠狠色 | 香蕉久久av | 国产精品高潮呻吟久久 | 欧美日韩国产一区二区三区不卡 | 国产电影一区 | 成人av电影在线 | 久久精品国产一区二区三区不卡 | 色婷婷av久久久久久久 | 国产一区二区三区视频 | 在线精品一区 | 国产视频一区在线 | 99热热热热| 久久视频精品 | 欧美日韩国产高清 | 亚洲一区视频 | 天天爱爱网 | 久久久蜜臀国产一区二区 | 一区二区三区高清在线观看 | 久久噜噜噜精品国产亚洲综合 | 欧美日韩手机在线观看 | 欧美成人一区二免费视频软件 | 国产精品爱久久久久久久 | 国产精品99久久久久久人 | 国产成人久久精品一区二区三区 | 在线免费看91 | 亚洲综合在线一区 | 欧美在线a| 欧美精品1区2区3区 免费黄篇 | av男人的天堂在线 | 久久久91精品国产一区二区三区 | 一区二区三区视频在线观看 | 中文字幕日韩av | 国产91综合一区在线观看 | 美女国产精品 |