成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

實時數倉 | 三分鐘搞定Flink Cdc

大數據
Flink CDC Connector 是ApacheFlink的一組數據源連接器,使用變化數據捕獲change data capture (CDC)從不同的數據庫中提取變更數據。Flink CDC連接器將Debezium集成為引擎來捕獲數據變更。

簡介

Flink CDC Connector 是ApacheFlink的一組數據源連接器,使用變化數據捕獲change data capture (CDC)從不同的數據庫中提取變更數據。Flink CDC連接器將Debezium集成為引擎來捕獲數據變更。因此,它可以充分利用Debezium的功能。

特點

  • 支持讀取數據庫快照,并且能夠持續讀取數據庫的變更日志,即使發生故障,也支持exactly-once 的處理語義
  • 對于DataStream API的CDC connector,用戶無需部署Debezium和Kafka,即可在單個作業中使用多個數據庫和表上的變更數據。
  • 對于Table/SQL API 的CDC connector,用戶可以使用SQL DDL創建CDC數據源,來監視單個表上的數據變更。

使用場景

  • 數據庫之間的增量數據同步
  • 審計日志
  • 數據庫之上的實時物化視圖
  • 基于CDC的維表join

Flink提供的 table format

Flink提供了一系列可以用于table connector的table format,具體如下:

Formats Supported Connectors
CSV Apache Kafka, Filesystem
JSON Apache Kafka, Filesystem, Elasticsearch
Apache Avro Apache Kafka, Filesystem
Debezium CDC Apache Kafka
Canal CDC Apache Kafka
Apache Parquet Filesystem
Apache ORC Filesystem

使用過程中的注意點

使用MySQL CDC的注意點

如果要使用MySQL CDC connector,對于程序而言,需要添加如下依賴:

  1. <dependency> 
  2.   <groupId>com.alibaba.ververica</groupId> 
  3.   <artifactId>flink-connector-mysql-cdc</artifactId> 
  4.   <version>1.0.0</version> 
  5. </dependency> 

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。

使用canal-json的注意點

如果要使用Kafka的canal-json,對于程序而言,需要添加如下依賴:

  1. <!-- universal --> 
  2. <dependency> 
  3.     <groupId>org.apache.flink</groupId> 
  4.     <artifactId>flink-connector-kafka_2.11</artifactId> 
  5.     <version>1.11.0</version> 
  6. </dependency> 

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-kafka_2.11-1.11.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。由于Flink1.11的安裝包 的lib目錄下并沒有提供該jar包,所以必須要手動添加依賴包,否則會報如下錯誤:

  1. [ERROR] Could not execute SQL statement. Reason: 
  2. org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. 
  3.  
  4. Available factory identifiers are: 
  5.  
  6. datagen 
  7. mysql-cdc 

使用changelog-json的注意點

如果要使用Kafka的changelog-json Format,對于程序而言,需要添加如下依賴:

  1. <dependency> 
  2.   <groupId>com.alibaba.ververica</groupId> 
  3.   <artifactId>flink-format-changelog-json</artifactId> 
  4.   <version>1.0.0</version> 
  5. </dependency> 

如果要使用Flink SQL Client,需要添加如下jar包:flink-format-changelog-json-1.0.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。

mysql-cdc的操作實踐

創建MySQL數據源表

在創建MySQL CDC表之前,需要先創建MySQL的數據表,如下:

  1. -- MySQL 
  2. /*Table structure for table `order_info` */ 
  3. DROP TABLE IF EXISTS `order_info`; 
  4. CREATE TABLE `order_info` ( 
  5.   `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號'
  6.   `consignee` varchar(100) DEFAULT NULL COMMENT '收貨人'
  7.   `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人電話'
  8.   `total_amount` decimal(10,2) DEFAULT NULL COMMENT '總金額'
  9.   `order_status` varchar(20) DEFAULT NULL COMMENT '訂單狀態,1表示下單,2表示支付'
  10.   `user_id` bigint(20) DEFAULT NULL COMMENT '用戶id'
  11.   `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式'
  12.   `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送貨地址'
  13.   `order_comment` varchar(200) DEFAULT NULL COMMENT '訂單備注'
  14.   `out_trade_no` varchar(50) DEFAULT NULL COMMENT '訂單交易編號(第三方支付用)'
  15.   `trade_body` varchar(200) DEFAULT NULL COMMENT '訂單描述(第三方支付用)'
  16.   `create_time` datetime DEFAULT NULL COMMENT '創建時間'
  17.   `operate_time` datetime DEFAULT NULL COMMENT '操作時間'
  18.   `expire_time` datetime DEFAULT NULL COMMENT '失效時間'
  19.   `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流單編號'
  20.   `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父訂單編號'
  21.   `img_url` varchar(200) DEFAULT NULL COMMENT '圖片路徑'
  22.   `province_id` int(20) DEFAULT NULL COMMENT '地區'
  23.   PRIMARY KEY (`id`) 
  24. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單表'
  25. -- ---------------------------- 
  26. -- Records of order_info 
  27. -- ---------------------------- 
  28. INSERT INTO `order_info`  
  29. VALUES (476, 'lAXjcL''13408115089', 433.00, '2', 10, '2''OYyAdSdLxedceqovndCD''ihjAYsSjrgJMQVdFQnSy''8728720206''''2020-06-18 02:21:38'NULLNULLNULLNULLNULL, 9); 
  30. INSERT INTO `order_info` 
  31. VALUES (477, 'QLiFDb''13415139984', 772.00, '1', 90, '2''OizYrQbKuWvrvdfpkeSZ''wiBhhqhMndCCgXwmWVQq''1679381473''''2020-06-18 09:12:25'NULLNULLNULLNULLNULL, 3); 
  32. INSERT INTO `order_info` 
  33. VALUES (478, 'iwKjQD''13320383859', 88.00, '1', 107, '1''cbXLKtNHWOcWzJVBWdAs''njjsnknHxsxhuCCeNDDi''0937074290''''2020-06-18 15:56:34'NULLNULLNULLNULLNULL, 7); 
  34.  
  35. /*Table structure for table `order_detail` */ 
  36. CREATE TABLE `order_detail` ( 
  37.   `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號'
  38.   `order_id` bigint(20) DEFAULT NULL COMMENT '訂單編號'
  39.   `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id'
  40.   `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名稱(冗余)'
  41.   `img_url` varchar(200) DEFAULT NULL COMMENT '圖片名稱(冗余)'
  42.   `order_price` decimal(10,2) DEFAULT NULL COMMENT '購買價格(下單時sku價格)'
  43.   `sku_num` varchar(200) DEFAULT NULL COMMENT '購買個數'
  44.   `create_time` datetime DEFAULT NULL COMMENT '創建時間'
  45.   PRIMARY KEY (`id`) 
  46. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單明細表'
  47.  
  48. -- ---------------------------- 
  49. -- Records of order_detail 
  50. -- ---------------------------- 
  51. INSERT INTO `order_detail`  
  52. VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待''http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3''2020-06-18 02:21:38'); 
  53. INSERT INTO `order_detail`  
  54. VALUES (1330, 477, 9, '榮耀10 GT游戲加速 AIS手持夜景 6GB+64GB 幻影藍全網通 移動聯通電信''http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4''2020-06-18 09:12:25'); 
  55. INSERT INTO `order_detail` 
  56. VALUES (1331, 478, 4, '小米Play 流光漸變AI雙攝 4GB+64GB 夢幻藍 全網通4G 雙卡雙待 小水滴全面屏拍照游戲智能手機''http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1''2020-06-18 15:56:34'); 
  57. INSERT INTO `order_detail`  
  58. VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待''http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3''2020-06-18 15:56:34'); 
  59. INSERT INTO `order_detail`  
  60. VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待''http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1''2020-06-18 15:56:34'); 

Flink SQL Cli創建CDC數據源

啟動 Flink 集群,再啟動 SQL CLI,執行下面命令:

  1. -- 創建訂單信息表 
  2. CREATE TABLE order_info( 
  3.     id BIGINT
  4.     user_id BIGINT
  5.     create_time TIMESTAMP(0), 
  6.     operate_time TIMESTAMP(0), 
  7.     province_id INT
  8.     order_status STRING, 
  9.     total_amount DECIMAL(10, 5) 
  10.   ) WITH ( 
  11.     'connector' = 'mysql-cdc'
  12.     'hostname' = 'kms-1'
  13.     'port' = '3306'
  14.     'username' = 'root'
  15.     'password' = '123qwe'
  16.     'database-name' = 'mydw'
  17.     'table-name' = 'order_info' 
  18. ); 

在Flink SQL Cli中查詢該表的數據:result-mode: tableau,+表示數據的insert。

在SQL CLI中創建訂單詳情表:

  1. CREATE TABLE order_detail( 
  2.     id BIGINT
  3.     order_id BIGINT
  4.     sku_id BIGINT
  5.     sku_name STRING, 
  6.     sku_num BIGINT
  7.     order_price DECIMAL(10, 5), 
  8.  create_time TIMESTAMP(0) 
  9.  ) WITH ( 
  10.     'connector' = 'mysql-cdc'
  11.     'hostname' = 'kms-1'
  12.     'port' = '3306'
  13.     'username' = 'root'
  14.     'password' = '123qwe'
  15.     'database-name' = 'mydw'
  16.     'table-name' = 'order_detail' 
  17. ); 

查詢結果如下:

執行JOIN操作:

  1. SELECT 
  2.     od.id, 
  3.     oi.id order_id, 
  4.     oi.user_id, 
  5.     oi.province_id, 
  6.     od.sku_id, 
  7.     od.sku_name, 
  8.     od.sku_num, 
  9.     od.order_price, 
  10.     oi.create_time, 
  11.     oi.operate_time 
  12. FROM 
  13.    ( 
  14.     SELECT *  
  15.     FROM order_info 
  16.     WHERE  
  17.       order_status = '2'-- 已支付 
  18.    ) oi 
  19.    JOIN 
  20.   ( 
  21.     SELECT * 
  22.     FROM order_detail 
  23.   ) od  
  24.   ON oi.id = od.order_id; 

canal-json的操作實踐

關于cannal的使用方式,可以參考我的另一篇文章:基于Canal與Flink實現數據實時增量同步(一)。我已經將下面的表通過canal同步到了kafka,具體格式為:

  1.     "data":[ 
  2.         { 
  3.             "id":"1"
  4.             "region_name":"華北" 
  5.         }, 
  6.         { 
  7.             "id":"2"
  8.             "region_name":"華東" 
  9.         }, 
  10.         { 
  11.             "id":"3"
  12.             "region_name":"東北" 
  13.         }, 
  14.         { 
  15.             "id":"4"
  16.             "region_name":"華中" 
  17.         }, 
  18.         { 
  19.             "id":"5"
  20.             "region_name":"華南" 
  21.         }, 
  22.         { 
  23.             "id":"6"
  24.             "region_name":"西南" 
  25.         }, 
  26.         { 
  27.             "id":"7"
  28.             "region_name":"西北" 
  29.         } 
  30.     ], 
  31.     "database":"mydw"
  32.     "es":1597128441000, 
  33.     "id":102, 
  34.     "isDdl":false
  35.     "mysqlType":{ 
  36.         "id":"varchar(20)"
  37.         "region_name":"varchar(20)" 
  38.     }, 
  39.     "old":null
  40.     "pkNames":null
  41.     "sql":""
  42.     "sqlType":{ 
  43.         "id":12, 
  44.         "region_name":12 
  45.     }, 
  46.     "table":"base_region"
  47.     "ts":1597128441424, 
  48.     "type":"INSERT" 

在SQL CLI中創建該canal-json格式的表:

  1. CREATE TABLE region ( 
  2.   id BIGINT
  3.   region_name STRING 
  4. WITH ( 
  5.  'connector' = 'kafka'
  6.  'topic' = 'mydw.base_region'
  7.  'properties.bootstrap.servers' = 'kms-3:9092'
  8.  'properties.group.id' = 'testGroup'
  9.  'format' = 'canal-json' , 
  10.  'scan.startup.mode' = 'earliest-offset'  
  11. ); 

查詢結果如下:

changelog-json的操作實踐

創建MySQL數據源

參見上面的order_info

Flink SQL Cli創建changelog-json表

  1. CREATE TABLE order_gmv2kafka ( 
  2.   day_str STRING, 
  3.   gmv DECIMAL(10, 5) 
  4. WITH ( 
  5.     'connector' = 'kafka'
  6.     'topic' = 'order_gmv_kafka'
  7.     'scan.startup.mode' = 'earliest-offset'
  8.     'properties.bootstrap.servers' = 'kms-3:9092'
  9.     'format' = 'changelog-json' 
  10. ); 
  11.  
  12. INSERT INTO order_gmv2kafka 
  13. SELECT DATE_FORMAT(create_time, 'yyyy-MM-dd'as day_str, SUM(total_amount) as gmv 
  14. FROM order_info 
  15. WHERE order_status = '2' -- 訂單已支付 
  16. GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd');  

查詢表看一下結果:

再查一下kafka的數據:

  1. {"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"

當將另外兩個訂單的狀態order_status更新為2時,總金額=443+772+88=1293再觀察數據:

再看kafka中的數據:

 

責任編輯:武曉燕 來源: 大數據技術與數倉
相關推薦

2009-11-05 16:04:19

Oracle用戶表

2020-11-20 08:36:59

Jpa數據代碼

2024-05-16 11:13:16

Helm工具release

2022-02-16 19:42:25

Spring配置開發

2009-11-09 12:55:43

WCF事務

2024-12-18 10:24:59

代理技術JDK動態代理

2009-11-12 09:16:15

ADO.NET數據庫連

2024-08-30 08:50:00

2022-02-17 09:24:11

TypeScript編程語言javaScrip

2023-12-27 08:15:47

Java虛擬線程

2024-01-16 07:46:14

FutureTask接口用法

2021-04-20 13:59:37

云計算

2013-06-28 14:30:26

棱鏡計劃棱鏡棱鏡監控項目

2020-06-30 10:45:28

Web開發工具

2021-12-17 07:47:37

IT風險框架

2025-02-24 10:40:55

2023-12-04 18:13:03

GPU編程

2024-10-15 09:18:30

2024-01-12 07:38:38

AQS原理JUC

2021-02-03 14:31:53

人工智能人臉識別
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产大毛片 | 国产精品久久久久婷婷二区次 | 99福利视频| 国产高清精品在线 | 欧美日本免费 | 亚洲成人免费观看 | 久久久久久久久久久久久91 | 国产一区二区精品 | 精品欧美一区免费观看α√ | 日韩一区二区免费视频 | 国产伦精品一区二区 | 久久男人| 视频在线观看一区二区 | 免费看一区二区三区 | 精品一区二区三区在线观看国产 | 91精品国产日韩91久久久久久 | 亚洲精品免费在线观看 | 中文字幕乱码视频32 | 国产一区二区在线免费观看 | 亚洲天堂一区 | 99re视频精品 | 欧美成人一区二区 | 欧美三级视频 | 久久九九网站 | 久久一二 | 日韩国产专区 | 国产高清在线 | 日韩欧美国产精品 | www国产精 | 精品成人av| 草久久久 | 国产精品资源在线观看 | 91国在线视频 | 一区精品在线观看 | 一级黄色av电影 | 热re99久久精品国99热观看 | 一区二区不卡高清 | 日本在线免费观看 | 欧美色综合网 | 欧美日韩精品久久久免费观看 | 97精品超碰一区二区三区 |