成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

數倉 | 幾種常見的數據同步方式

大數據
數據倉庫的特性之一是集成,即首先把未經過加工處理的、不同來源的、不同形式的數據同步到ODS層,一般情況下,這些ODS層數據包括日志數據和業務DB數據。

[[428501]]

本文轉載自微信公眾號「大數據技術與數倉 」,作者西貝。轉載本文請聯系大數據技術與數倉公眾號。

寫在前面

數據倉庫的特性之一是集成,即首先把未經過加工處理的、不同來源的、不同形式的數據同步到ODS層,一般情況下,這些ODS層數據包括日志數據和業務DB數據。對于業務DB數據而言(比如存儲在MySQL中),將數據采集并導入到數倉中(通常是Hive或者MaxCompute)是非常重要的一個環節。

那么,該如何將業務DB數據高效準確地同步到數倉中呢?一般企業會使用兩種方案:直連同步與實時增量同步(數據庫日志解析)。其中直連同步的基本思路是直連數據庫進行SELECT,然后將查詢的數據存儲到本地文件作為中間存儲,最后把文件Load到數倉中。這種方式非常的簡單方便,但是隨著業務的發展,會遇到一些瓶頸,具體見下文分析。

為了解決這些問題,一般會使用實時增量的方式進行數據同步,其基本原理是CDC (Change Data Capture) + Merge,即實時Binlog采集 + 離線處理Binlog還原業務數據這樣一套解決方案。

本文主要包括以下內容,希望對你有所幫助

  • 常見數據同步方式
  • 流式數據集成

數據同步的方式

直連同步

直連同步是指通過定義好的規范接口API和基于動態鏈接庫的方式直接連接業務庫,比如ODBC/JDBC等規定了統一的標準接口,不同的數據庫基于這套標準提供規范的驅動,從而支持完全相同的函數調用和SQL實現。比如經常使用的Sqoop就是采取這種方式進行批量數據同步的。

直連同步的方式配置十分簡單,很容易上手操作,比較適合操作型業務系統的數據同步,但是會存在以下問題:

  • 數據同步時間:隨著業務規模的增長,數據同步花費的時間會越來越長,無法滿足下游數倉生產的時間要求。
  • 性能瓶頸:直連數據庫查詢數據,對數據庫影響非常大,容易造成慢查詢,如果業務庫沒有采取主備策略,則會影響業務線上的正常服務,如果采取了主備策略,雖然可以避免對業務系統的性能影響,但當數據量較大時,性能依然會很差。

日志解析

所謂日志解析,即解析數據庫的變更日志,比如MySQL的Binlog日志,Oracle的歸檔日志文件。通過讀取這些日志信息,收集變化的數據并將其解析到目標存儲中即可完成數據的實時同步。這種讀操作是在操作系統層面完成的,不需要通過數據庫,因此不會給源數據庫帶來性能上的瓶頸。

數據庫日志解析的同步方式可以實現實時與準實時的同步,延遲可以控制在毫秒級別的,其最大的優勢就是性能好、效率高,不會對源數據庫造成影響,目前,從業務系統到數據倉庫中的實時增量同步,廣泛采取這種方式。當然,這種方式也會存在一些問題,比如批量補數時造成大量數據更新,日志解析會處理較慢,造成數據延遲。除此之外,這種方式比較復雜,投入也較大,因為需要一個實時的抽取系統去抽取并解析日志,下文會對此進行詳細解釋。

如上圖所示架構,在直連同步基礎之上增加了流式同步的鏈路,經過流式計算引擎把相應的 Binlog 采集到 Kafka,同時會經過一個 Kafka 2Hive 的程序把它導入到原始數據,再經過一層 Merge,產出下游需要的 ODS 數據。

上述的數據集成方式優勢是非常明顯的,把數據傳輸的時間放到了 T+0 這一天去做,在第二天的時候只需要去做一次 merge 就可以了。非常節省時間和計算資源。

流式數據集成實現

實現思路

首先,采用Flink負責把Kafka上的Binlog數據拉取到HDFS上,生成增量表。

然后,對每張ODS表,首先需要一次性制作快照(Snapshot),把MySQL里的存量數據讀取到Hive上,這一過程底層采用直連MySQL去Select數據的方式,可以使用Sqoop進行一次性全量導入,生成一張全量表。

最后,對每張ODS表,每天基于存量數據和當天增量產生的Binlog做Merge,從而還原出業務數據。

Binlog是流式產生的,通過對Binlog的實時采集,把部分數據處理需求由每天一次的批處理分攤到實時流上。無論從性能上還是對MySQL的訪問壓力上,都會有明顯地改善。Binlog本身記錄了數據變更的類型(Insert/Update/Delete),通過一些語義方面的處理,完全能夠做到精準的數據還原。

關于Binlog解析部分,可以使用canal工具,采集到Kafka之后,可以使用Flink解析kafka數據并寫入到HDFS上,解析kafka的數據可以使用Flink的DataStreamAPI,也可以使用FlinkSQL的canal-json數據源格式進行解析,使用FlinkSQL相對來說是比較簡單的。下面是canal-json格式的kafka數據源。

  1. CREATE TABLE region ( 
  2.   id BIGINT
  3.   region_name STRING 
  4. WITH ( 
  5.  'connector' = 'kafka'
  6.  'topic' = 'mydw.base_region'
  7.  'properties.bootstrap.servers' = 'kms-3:9092'
  8.  'properties.group.id' = 'testGroup'
  9.  'format' = 'canal-json' , 
  10.  'scan.startup.mode' = 'earliest-offset'  
  11. ); 

數據解析完成之后,下面的就是合并還原完整數據的過程,關于合并還原數據,一種比較常見的方式就是全外連接(FULL OUTER JOIN)。具體如下:

生成增量表與全量表的Merge任務,當天的增量數據與昨天的全量數據進行全外連接,該Merge任務的基本邏輯是:

  1. INSERT OVERWRITE TABLE user_order PARTITION(ds='20211012'
  2. SELECT  CASE    WHEN n.id IS NULL THEN o.id  
  3.                 ELSE n.id  
  4.         END 
  5.         ,CASE    WHEN n.id IS NULL THEN o.create_time  
  6.                  ELSE n.create_time  
  7.          END 
  8.         ,CASE    WHEN n.id IS NULL THEN o.modified_time 
  9.                  ELSE n.modified_time  
  10.          END 
  11.         ,CASE    WHEN n.id IS NULL THEN o.user_id  
  12.                  ELSE n.user_id  
  13.          END 
  14.          
  15.         ,CASE    WHEN n.id IS NULL THEN o.sku_code  
  16.                  ELSE n.sku_code  
  17.          END 
  18.         ,CASE    WHEN n.id IS NULL THEN o.pay_fee 
  19.                  ELSE n.pay_fee  
  20.          END 
  21. FROM    ( 
  22.             SELECT  * 
  23.             FROM    user_order_delta 
  24.             WHERE   ds = '20211012' 
  25.             AND     id IS NOT NULL 
  26.             AND     user_id IS NOT NULL 
  27.         ) n 
  28. FULL OUTER JOIN (-- 全外連接進行數據merge 
  29.                     SELECT  * 
  30.                     FROM    user_order 
  31.                     WHERE   ds = '20211011' 
  32.                     AND     id IS NOT NULL 
  33.                     AND     user_id IS NOT NULL 
  34.                  
  35.                 ) o 
  36. ON      o.id = n.id 
  37. AND     o.user_id = n.user_id 

經過上述步驟,即可將數據還原完整。

總結

本文首先介紹了數據倉庫構建ODS層常見的數據同步方式,并對每種方式進行了解釋,給出了相對應的示意圖。接著給出了CDC+Merge的數據同步方案。值得注意的是,Flink1.11引入了CDC的connector,比如MySQL CDC和Postgres CDC,同時對Kafka的Connector支持canal-json和debezium-json以及changelog-json的format,通過這種方式可以很方便地捕獲變化的數據,大大簡化了數據處理的流程和數據同步的復雜度。

 

責任編輯:武曉燕 來源: 大數據技術與數倉
相關推薦

2010-03-31 16:28:11

Oracle數據庫

2022-07-26 15:38:58

數據倉數據治理數據團隊

2019-04-09 21:10:23

iOS加密框架

2019-02-26 14:39:20

Windows后門漏洞

2021-07-27 15:40:39

Python數據清洗函數

2018-11-07 09:01:13

Tomcat部署方式

2018-10-10 10:23:53

數據庫RedisNoSQL

2023-11-23 16:53:56

數據倉庫大數據

2022-02-18 09:02:04

數據倉庫治理

2010-07-30 09:16:24

Flex數據綁定

2023-10-30 11:53:37

繼承JS父類

2022-08-22 17:46:56

虛擬數倉Impala

2020-04-03 10:30:50

MySQL數據庫技術

2021-12-02 08:41:30

數倉建模設計

2021-01-31 23:54:23

數倉模型

2023-03-08 07:50:57

企業數據治理

2021-05-07 16:19:36

異步編程Java線程

2021-06-11 07:26:16

數據倉庫機器學習

2010-09-25 14:48:55

SQL連接

2021-01-19 11:56:19

Python開發語言
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 范冰冰一级做a爰片久久毛片 | 国产小视频在线观看 | 国产成人午夜高潮毛片 | 成人av一区二区三区 | 69性欧美高清影院 | av日韩在线播放 | 精品国产一区二区三区久久狼黑人 | 久久人体视频 | 亚洲精品久久久久久久久久久久久 | 国产精品美女 | 97av视频在线 | 成人av网站在线观看 | 欧美极品在线 | 亚洲国产精品久久久 | 久久黄视频 | 人人干在线| www.成人久久| 在线色网址 | 国产一区二区三区高清 | 中文成人在线 | 国产剧情久久 | 毛片在线免费 | 91影院 | 91精品国产高清一区二区三区 | 男女av| 97伦理最新伦理 | 成人视屏在线观看 | 国产精品久久久久久久免费大片 | 欧美精品中文 | 久久在线精品 | 免费在线一区二区三区 | 日本免费一区二区三区视频 | 成人av电影免费在线观看 | 欧美日本久久 | 91亚洲一区 | 国产一级片一区二区 | 91最新在线视频 | 欧美成人精品一区二区男人看 | 亚洲欧美在线视频 | 91在线看片 | 国产视频一区二区在线观看 |