成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

實時監(jiān)視同步數(shù)據(jù)庫變更,這個框架真是神器

運維 數(shù)據(jù)庫運維
我們數(shù)據(jù)庫中的數(shù)據(jù)一直在變化,有時候我們希望能監(jiān)聽數(shù)據(jù)庫數(shù)據(jù)的變化并根據(jù)變化做出一些反應(yīng),比如更新對應(yīng)變化數(shù)據(jù)的緩存、增量同步到其它數(shù)據(jù)源、對數(shù)據(jù)進(jìn)行檢測和審計等等。而這種技術(shù)就叫變更數(shù)據(jù)捕獲(Change Data Capture)。

[[403537]]

我們數(shù)據(jù)庫中的數(shù)據(jù)一直在變化,有時候我們希望能監(jiān)聽數(shù)據(jù)庫數(shù)據(jù)的變化并根據(jù)變化做出一些反應(yīng),比如更新對應(yīng)變化數(shù)據(jù)的緩存、增量同步到其它數(shù)據(jù)源、對數(shù)據(jù)進(jìn)行檢測和審計等等。而這種技術(shù)就叫變更數(shù)據(jù)捕獲(Change Data Capture)。對于這種技術(shù)我們可能知道一個國內(nèi)比較知名的框架Canal,非常好用!但是Canal有一個局限性就是只能用于Mysql的變更數(shù)據(jù)捕獲。今天來介紹另一種更加強大的分布式CDC框架Debezium。

Debezium

提起Debezium這個框架,相信大多數(shù)普通開發(fā)者都比較陌生,但是提及它所屬的公司大家一定不會陌生。

紅帽公司

沒錯就是開源界最成功的紅帽公司。Debezium是為捕獲數(shù)據(jù)更改的流式處理框架,開源免費。Debezium近乎實時地監(jiān)控數(shù)據(jù)庫行級別(row-level)的數(shù)據(jù)變更,并針對變更可以做出反應(yīng)。而且只有已提交的變更才是可見的,所以不用擔(dān)心事務(wù)問題或者更改被回滾的問題。Debezium為所有的數(shù)據(jù)庫更改事件提供了一個統(tǒng)一的模型,所以不用擔(dān)心每種數(shù)據(jù)庫系統(tǒng)的復(fù)雜性。Debezium提供了對MongoDB、MySQL、PostgreSQL、SQL Server、Oracle、DB2等數(shù)據(jù)庫的支持。

另外借助于Kafka Connector可以開發(fā)出一個基于事件流的變更捕獲平臺,具有高容錯率和極強的擴展性。

Debezium Kafka 架構(gòu)

如圖所示,部署了用于 MySQL 和 PostgresSQL 的 Debezium Kafka連接器以捕獲對這兩種類型數(shù)據(jù)庫的更改事件,然后將這些更改通過下游的Kafka Connector將記錄傳輸?shù)狡渌到y(tǒng)或者數(shù)據(jù)庫(例如 Elasticsearch、數(shù)據(jù)倉庫、分析系統(tǒng))或緩存。

另一種玩法就是將Debezium內(nèi)置到應(yīng)用程序中,來做一個類似消息總線的設(shè)施,將數(shù)據(jù)變更事件傳遞給訂閱的下游系統(tǒng)中。

Debezium內(nèi)置服務(wù)器架構(gòu)

Debezium對數(shù)據(jù)的完整性和可用性也是做了不少的工作。Debezium用持久化的、有副本備份的日志來記錄數(shù)據(jù)庫數(shù)據(jù)變化的歷史,因此,你的應(yīng)用可以隨時停止再重啟,而不會錯過它停止運行時發(fā)生的事件,保證了所有的事件都能被正確地、完全地處理掉。

稍后我會演示一個Spring Boot集成Debezium的數(shù)據(jù)捕獲系統(tǒng)。

Spring Boot集成Debezium

理論介紹并不能讓你直觀感受到Debezium的能力,所以接下來我將使用嵌入式Debezium引擎來演示一下。

流程圖

如上圖所示,當(dāng)我們變更MySQL數(shù)據(jù)庫中的某行數(shù)據(jù)時,通過Debezium實時監(jiān)聽到binlog日志的變化觸發(fā)捕獲變更事件,然后獲取到變更事件模型,并做出響應(yīng)(消費)。接下來我們來搭建環(huán)境。

MySQL開啟binlog日志

為了方便這里使用MySQL的Docker容器,對應(yīng)的腳本為:

  1. # 運行mysql容器  
  2. docker run --name mysql-service -v d:/mysql/data:/var/lib/mysql -p 3306:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7 --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --default-time_zone="+8:00" 
  3. # 設(shè)置binlog位置 
  4. docker exec mysql-service bash -c "echo 'log-bin=/var/lib/mysql/mysql-bin' >> /etc/mysql/mysql.conf.d/mysqld.cnf" 
  5. # 配置 mysql的server-id 
  6. docker exec mysql-service bash -c "echo 'server-id=123454' >> /etc/mysql/mysql.conf.d/mysqld.cnf" 

上面的腳本運行了一個用戶名為root、密碼為123456并且將數(shù)據(jù)掛載到本地路徑d:/mysql/data的MySQL容器,同時開啟了binlog日志,并設(shè)置server-id為123454,這些信息后面配置會用。

請注意如果不使用root用戶的話,需要保證用戶具有SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT四種權(quán)限。

Spring Boot集成嵌入式Debezium

Debezium依賴

Spring Boot的應(yīng)用中加入下列依賴:

  1. <dependency> 
  2.      <groupId>io.debezium</groupId> 
  3.      <artifactId>debezium-api</artifactId> 
  4.      <version>${debezium.version}</version> 
  5.  </dependency> 
  6.  <dependency> 
  7.      <groupId>io.debezium</groupId> 
  8.      <artifactId>debezium-embedded</artifactId> 
  9.      <version>${debezium.version}</version> 
  10.  </dependency> 
  11.  <dependency> 
  12.      <groupId>io.debezium</groupId> 
  13.      <artifactId>debezium-connector-mysql</artifactId> 
  14.      <version>${debezium.version}</version> 
  15.  </dependency> 

目前最新的版本號為1.5.2.Final。

聲明配置

然后聲明需要的配置:

  1. /** 
  2.      * Debezium 配置. 
  3.      * 
  4.      * @return configuration 
  5.      */ 
  6.     @Bean 
  7.     io.debezium.config.Configuration debeziumConfig() { 
  8.         return io.debezium.config.Configuration.create() 
  9. //            連接器的Java類名稱 
  10.                 .with("connector.class", MySqlConnector.class.getName()) 
  11. //            偏移量持久化,用來容錯 默認(rèn)值 
  12.                 .with("offset.storage""org.apache.kafka.connect.storage.FileOffsetBackingStore"
  13. //                偏移量持久化文件路徑 默認(rèn)/tmp/offsets.dat  如果路徑配置不正確可能導(dǎo)致無法存儲偏移量 可能會導(dǎo)致重復(fù)消費變更 
  14. //                如果連接器重新啟動,它將使用最后記錄的偏移量來知道它應(yīng)該恢復(fù)讀取源信息中的哪個位置。 
  15.                 .with("offset.storage.file.filename""C:/Users/n1/IdeaProjects/spring-boot-debezium/tmp/offsets.dat"
  16. //                捕獲偏移量的周期 
  17.                 .with("offset.flush.interval.ms""6000"
  18. //               連接器的唯一名稱 
  19.                 .with("name""mysql-connector"
  20. //                數(shù)據(jù)庫的hostname 
  21.                 .with("database.hostname""localhost"
  22. //                端口 
  23.                 .with("database.port""3306"
  24. //                用戶名 
  25.                 .with("database.user""root"
  26. //                密碼 
  27.                 .with("database.password""123456"
  28. //                 包含的數(shù)據(jù)庫列表 
  29.                 .with("database.include.list""etl"
  30. //                是否包含數(shù)據(jù)庫表結(jié)構(gòu)層面的變更,建議使用默認(rèn)值true 
  31.                 .with("include.schema.changes""false"
  32. //                mysql.cnf 配置的 server-id 
  33.                 .with("database.server.id""123454"
  34. //                 MySQL 服務(wù)器或集群的邏輯名稱 
  35.                 .with("database.server.name""customer-mysql-db-server"
  36. //                歷史變更記錄 
  37.                 .with("database.history""io.debezium.relational.history.FileDatabaseHistory"
  38. //                歷史變更記錄存儲位置  
  39.                 .with("database.history.file.filename""C:/Users/n1/IdeaProjects/spring-boot-debezium/tmp/dbhistory.dat"
  40.                 .build(); 
  41.     } 

配置分為兩部分:

  • 一部分是Debezium Engine的配置屬性,參見Debezium Engine配置[1]。
  • 一部分是Mysql Connector的配置屬性,參見Mysql Connector配置[2]。

實例化Debezium Engine

應(yīng)用程序需要為運行的Mysql Connector啟動一個Debezium引擎,這個引擎會以異步線程的形式運行,它包裝了整個Mysql Connector連接器的生命周期。聲明一個引擎需要以下幾步:

聲明收到數(shù)據(jù)變更捕獲信息的格式,提供了JSON、Avro、Protobuf、Connect、CloudEvents等格式。

加載上面定義的配置。

聲明消費數(shù)據(jù)更改事件的函數(shù)方法。

聲明的偽代碼:

  1. DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)) 
  2.         .using(configuration.asProperties()) 
  3.         .notifying(this::handlePayload) 
  4.         .build(); 

handlePayload方法為:

  1. private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) { 
  2.     recordChangeEvents.forEach(r -> { 
  3.         SourceRecord sourceRecord = r.record(); 
  4.         Struct sourceRecordChangeValue = (Struct) sourceRecord.value(); 
  5.  
  6.         if (sourceRecordChangeValue != null) { 
  7.             // 判斷操作的類型 過濾掉讀 只處理增刪改   這個其實可以在配置中設(shè)置 
  8.             Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION)); 
  9.  
  10.             if (operation != Envelope.Operation.READ) { 
  11.                 String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER
  12.                 // 獲取增刪改對應(yīng)的結(jié)構(gòu)體數(shù)據(jù) 
  13.                 Struct struct = (Struct) sourceRecordChangeValue.get(record); 
  14.                 // 將變更的行封裝為Map 
  15.                 Map<String, Object> payload = struct.schema().fields().stream() 
  16.                         .map(Field::name
  17.                         .filter(fieldName -> struct.get(fieldName) != null
  18.                         .map(fieldName -> Pair.of(fieldName, struct.get(fieldName))) 
  19.                         .collect(toMap(Pair::getKey, Pair::getValue)); 
  20.                 // 這里簡單打印一下 
  21.                 System.out.println("payload = " + payload); 
  22.             } 
  23.         } 
  24.     }); 

引擎的啟動和關(guān)閉正好契合Spring Bean的生命周期:

  1. @Data 
  2. public class DebeziumServerBootstrap implements InitializingBean, SmartLifecycle { 
  3.  
  4.     private final Executor executor = Executors.newSingleThreadExecutor(); 
  5.     private DebeziumEngine<?> debeziumEngine; 
  6.  
  7.     @Override 
  8.     public void start() { 
  9.         executor.execute(debeziumEngine); 
  10.     } 
  11.  
  12.     @SneakyThrows 
  13.     @Override 
  14.     public void stop() { 
  15.         debeziumEngine.close(); 
  16.     } 
  17.  
  18.     @Override 
  19.     public boolean isRunning() { 
  20.         return false
  21.     } 
  22.  
  23.     @Override 
  24.     public void afterPropertiesSet() throws Exception { 
  25.         Assert.notNull(debeziumEngine, "debeziumEngine must not be null"); 
  26.     } 

啟動

啟動該Spring Boot項目,你可以采用各種手段往數(shù)據(jù)庫增刪改數(shù)據(jù),觀察會有類似下面的打印:

  1. payload = {user_id=1123213, username=felord.cn, age=11 , gender=0, enabled=1} 

說明Debezium監(jiān)聽到了數(shù)據(jù)庫的變更。你可以想想這種技術(shù)在哪些場景有用武之地。好了今天的分享就到這里,感謝大家的支持,我是:碼農(nóng)小胖哥。原創(chuàng)不易,請多多關(guān)注、點贊、轉(zhuǎn)發(fā)、再看。

參考資料

[1]Debezium Engine配置: https://debezium.io/documentation/reference/1.5/development/engine.html#engine-properties

[2]Mysql Connector配置: https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties

本文轉(zhuǎn)載自微信公眾號「碼農(nóng)小胖哥」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系碼農(nóng)小胖哥公眾號。

 

責(zé)任編輯:武曉燕 來源: 碼農(nóng)小胖哥
相關(guān)推薦

2023-04-18 18:22:31

開源工具數(shù)據(jù)庫

2020-09-21 11:30:28

CanalMySQL數(shù)據(jù)庫

2022-07-08 10:09:47

SPLSQL數(shù)據(jù)庫

2010-05-17 14:00:07

MySql數(shù)據(jù)庫

2020-08-13 07:42:15

數(shù)據(jù)庫Flyway代碼

2020-08-31 07:00:00

數(shù)據(jù)庫數(shù)據(jù)庫同步

2017-05-25 08:52:08

SQL Server數(shù)據(jù)庫

2019-07-23 10:43:28

MariaDB數(shù)據(jù)庫MySQL

2019-10-08 15:54:42

SQL數(shù)據(jù)庫技術(shù)

2009-04-22 09:42:07

SQL Server監(jiān)視鏡像

2024-05-22 12:07:12

向量數(shù)據(jù)庫AI

2009-03-25 18:11:57

監(jiān)視鏡像數(shù)據(jù)庫

2024-10-30 08:15:18

2024-12-06 08:29:29

2011-08-25 13:41:50

SQL Server 變更跟蹤

2010-07-01 15:44:22

SQL Server數(shù)

2025-04-01 08:38:41

2019-08-13 15:52:34

數(shù)據(jù)庫同步遷移

2010-06-02 16:57:50

MySQL數(shù)據(jù)庫同步

2010-08-27 09:59:51

SQL Server
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: www四虎com | 亚洲成人一级片 | 国产一级免费视频 | 亚洲一区毛片 | 久久日韩精品 | 一级黄色片美国 | 日日噜噜夜夜爽爽狠狠 | 99re在线视频 | 韩国毛片一区二区三区 | 精品国产一区二区三区久久 | 爱爱视频日本 | 午夜视频精品 | 国产色| 婷婷五月色综合香五月 | 一级全黄视频 | 亚洲视频免费播放 | 婷婷综合五月天 | 日韩一区二区三区视频 | 免费黄视频网站 | 国产日韩欧美电影 | 久久久久久久久久久久久九 | 国产小视频自拍 | 成人国产精品 | 最新日韩在线视频 | 国产精品日韩在线观看一区二区 | 91在线看片| 国产激情精品 | 久久不卡区| 在线欧美激情 | 色播视频在线观看 | 99re6在线视频| 成人免费观看男女羞羞视频 | 玖玖综合网| 精品国产鲁一鲁一区二区张丽 | 亚洲天堂中文字幕 | 欧洲在线视频 | 午夜影院在线免费观看视频 | 亚州精品天堂中文字幕 | 欧美日韩国产一区二区三区 | 亚洲欧美aⅴ | 91大神xh98xh系列全部 |