成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

新選擇!基于Spring Boot監聽MySQL日志Binlog實現數據實時同步

數據庫 MySQL
MySQL與Redis數據實時同步的主要目的是優化性能和保持數據一致性。通過將熱點數據存儲在Redis中,可以大大提高系統的訪問速度,同時確保MySQL中的數據變化能夠實時反映到Redis中,避免數據不一致的問題。

1. 簡介

MySQL與Redis數據實時同步是將MySQL數據庫中的數據變化實時地反映到Redis緩存系統中的過程。MySQL是一款穩定的關系型數據庫,適合做持久化存儲;而Redis是一個高性能的內存數據庫,適合做緩存和實時數據處理。將兩者結合使用,可以充分發揮各自的優勢,提升系統性能和穩定性。

實現MySQL與Redis數據實時同步有多種方法,如使用MySQL的二進制日志(Binlog)配合Canal或Debezium等工具。此外,還可以在應用層進行雙寫操作或使用消息隊列實現數據同步。

MySQL與Redis數據實時同步的主要目的是優化性能和保持數據一致性。通過將熱點數據存儲在Redis中,可以大大提高系統的訪問速度,同時確保MySQL中的數據變化能夠實時反映到Redis中,避免數據不一致的問題。這種同步機制在電商、社交等需要高并發訪問和實時數據更新的場景中尤為重要。

本篇文章我將介紹另外一款非常不錯開源的組件mysql-binlog-connector-java。通過名稱就能知道他是通過連接MySQL binlog日志來實現數據監聽的。該組件不僅僅是能夠實時監聽binlog的變化,而且你還可以直接去讀取binlog日志文件解析其內容。

該組件具備以下特性:

  • 自動解析二進制日志文件名/位置 | GTID 解析
  • 斷開連接可恢復
  • 插件化的故障轉移策略
  • 支持 binlog_checksum=CRC32(適用于 MySQL 5.6.2+ 用戶)
  • 通過 TLS 進行安全通信
  • 友好的Java管理擴展(JMX)
  • 實時統計
  • 在 Maven Central 上可用
  • 無第三方依賴,跨不同版本的 MySQL 發行版的測試套件

接下來,我將通過如下幾方面介紹該組件在項目中的使用:

  • 編程解析binlog日志
  • 實時監聽binlog日志
  • 通過JMX暴露binlog客戶端 

2. 實戰案例

環境準備

<dependency>
  <groupId>com.zendesk</groupId>
  <artifactId>mysql-binlog-connector-java</artifactId>
  <version>0.30.1</version>
</dependency>


<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

當前mysql-binlog-connector-java最新版本為0.30.1。你可以通過下面地址查看倉庫版本情況:

https://mvnrepository.com/artifact/com.zendesk/mysql-binlog-connector-java

注意:你的確定你開啟了binlog日志

SHOW VARIABLES LIKE '%log_bin%'

圖片圖片

通過上面的命令查看狀態。

2.1 編程讀取binlog日志

public static void main(String[] args) throws Exception {


  File binlogFile = new File("C:\\ProgramData\\MySQL\\MySQL Server 5.7\\Data\\mysql-bin.000032") ;
  EventDeserializer eventDeserializer = new EventDeserializer() ;
  // 設置兼容性模式
  eventDeserializer.setCompatibilityMode(
    EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
    EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);
  BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);
  try {
    for (Event event; (event = reader.readEvent()) != null;) {
      EventData data = event.getData() ;
      // 判斷事件的類型
      if (data instanceof WriteRowsEventData ed) {
        List<Serializable[]> rows = ed.getRows() ;
        rows.forEach(row -> {
          for (Serializable s : row) {
            if (s instanceof byte[] bs) {
              System.err.print(new String(bs) + "\t") ;
            } else {
              System.err.print(s + "\t") ;
            }
          }
          System.out.println() ;
        });
      } else if (data instanceof QueryEventData ed) {
        System.out.printf("查詢事件:%s%n", ed.getSql()) ;
      } else if (data instanceof DeleteRowsEventData ed) {
        System.err.println("刪除事件") ;
      } else if (data instanceof TableMapEventData ed) {
        String database = ed.getDatabase() ;
        String table = ed.getTable() ;
        System.out.printf("數據庫: %s, 表名: %s%n", database, table) ;
      }
    }
  } finally {
    reader.close();
  }
}

該組件定義了如下的事件類型

圖片圖片

上面程序輸出結果:

查詢事件:BEGIN
數據庫: testjpa, 表名: t_person
刪除事件
查詢事件:BEGIN
數據庫: testjpa, 表名: t_person
2520  30  姓名 - 30  
查詢事件:BEGIN
數據庫: testjpa, 表名: t_person
2521  44  姓名 - 44  
查詢事件:BEGIN
數據庫: testjpa, 表名: t_person
2522  92  姓名 - 92  
查詢事件:BEGIN
數據庫: testjpa, 表名: t_person
2523  71  姓名 - 71

正確的讀取binlog日志中的信息。

2.2 實時監聽Binlog日志

@Component
public class MySQLToRedisComponent implements CommandLineRunner {


  public void listener() {
    BinaryLogClient client = new BinaryLogClient("118.24.111.33", 3307, "test", "root", "123123");
    EventDeserializer eventDeserializer = new EventDeserializer();
    eventDeserializer.setCompatibilityMode(
        EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
        EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
    );
    client.setEventDeserializer(eventDeserializer);
    client.registerEventListener(new EventListener() {
        @Override
        public void onEvent(Event event) {
          EventHeader header = event.getHeader() ;
          switch(header.getEventType()) {
            case EXT_WRITE_ROWS:
              WriteRowsEventData writeData = event.getData() ;
              List<Serializable[]> rows = writeData.getRows() ;
              for (Serializable row : rows) {
                if (row.getClass().isArray()) {
                  printRow(row);
                }
              }
              break ;
            case EXT_UPDATE_ROWS:
              UpdateRowsEventData updateData = event.getData() ;
              BitSet columns = updateData.getIncludedColumns() ;
              System.err.printf("更新列: %s%n", columns) ;
              List<Entry<Serializable[], Serializable[]>> updateRows = updateData.getRows() ;
              for (Entry<Serializable[], Serializable[]> entry : updateRows) {
                printRow(entry.getKey()) ;
                System.out.println(">>>>>>>>>>>>>>>>>>>>>before") ;
                printRow(entry.getValue()) ;
                System.out.println(">>>>>>>>>>>>>>>>>>>>>after") ;
              }
              break ;
            case EXT_DELETE_ROWS:
              DeleteRowsEventData deleteData = event.getData() ;
              List<Serializable[]> deleteRow = deleteData.getRows() ;
              for (Serializable row : deleteRow) {
                if (row.getClass().isArray()) {
                  printRow(row);
                }
              }
              break ;
            case TABLE_MAP:
              TableMapEventData data = event.getData() ;
              System.out.printf("變更表: %s.%s%n", data.getDatabase(), data.getTable()) ;
              break ;
            default:
              break ;
          }
        }
        private void printRow(Serializable row) {
          Serializable[] ss = (Serializable[]) row ;
          for (Serializable s : ss) {
            if (s.getClass().isArray()) {
              System.out.print(new String((byte[])s) + "\t") ;
            } else {
              System.out.print(s + "\t") ;
            }
          }
          System.out.println() ;
        }
    });
    client.connect();
  }
  public void run(String... args) throws Exception {
    this.listener() ;
  }
}

以上監聽程序,我們僅對部分事件進行了監聽處理。當數據發生變化后,輸出如下:

變更表: test.t_person
更新列: {0, 1, 2}
1  張三  66  
>>>>>>>>>>>>>>>>>>>>>before
1  張三  22  
>>>>>>>>>>>>>>>>>>>>>after

更序列使用了BitSet表示,所以如果你要與具體的列想對應,你還應該執行如下的語句來確定具體的列名:

mysql> describe t_person;
+-------+--------------+------+-----+---------+----------------+
| Field | Type         | Null | Key | Default | Extra          |
+-------+--------------+------+-----+---------+----------------+
| id    | int          | NO   | PRI | NULL    | auto_increment |
| name  | varchar(255) | YES  |     | NULL    |                |
| age   | int          | YES  |     | NULL    |                |
+-------+--------------+------+-----+---------+----------------+

你可以通過JDBC的方式執行該語句獲取對應的列,也可以通過Socket方式發送命令獲取結果。推薦還是JDBC。

2.3 JMX暴露Binlog客戶端

在Spring Boot中,我們可以非常方便的通過JMX暴露binlog客戶端的相關操作,如下示例:

@Bean
BinaryLogClient client() {
  return new BinaryLogClient("118.24.111.33", 3307, "test", "root", "123123") ;
}


@Bean
MBeanExporter exporterClient(BinaryLogClient client) {
  MBeanExporter exporter = new MBeanExporter();
  exporter.setBeans(Map.of("mysql.binlog:type=BinaryLogClient", client)) ;
  return exporter;
}


@Bean
MBeanExporter exporterClientStatistics(BinaryLogClient client) {
  MBeanExporter exporter = new MBeanExporter();
  BinaryLogClientStatistics stats = new BinaryLogClientStatistics(client);
  exporter.setBeans(Map.of("mysql.binlog:type=BinaryLogClientStatistics", stats)) ;
  return exporter;
}

啟動應用后,通過JConsole查看。

圖片 圖片

責任編輯:武曉燕 來源: Spring全家桶實戰案例源碼
相關推薦

2024-07-03 08:02:19

MySQL數據搜索

2020-02-28 16:02:21

MySQL異構同步

2024-08-02 09:36:03

2023-01-31 08:34:19

2016-12-21 14:06:55

日志實現數據實時抽取

2024-10-11 11:32:22

Spring6RSocket服務

2020-09-21 11:30:28

CanalMySQL數據庫

2021-06-04 07:24:14

Flink CDC數據

2022-07-20 23:15:11

Flink數據集CDC

2018-05-14 13:51:39

RDS Binlog架構Kafka集群

2021-02-26 05:21:56

MySQL數據設計

2023-05-03 08:58:46

數據庫開源

2024-06-12 08:46:19

2023-09-01 08:46:44

2014-01-22 11:22:44

華為HANA一體機FusionCube大數據分析

2018-08-21 10:05:59

MySQLbinlog數據庫

2014-08-14 10:52:49

windowslinux

2024-10-18 11:39:55

MySQL數據檢索

2017-01-15 13:45:20

Docker大數據京東

2017-01-04 10:29:37

Spark運維技術
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 成人片免费看 | 日韩激情一区 | 在线午夜 | 9999视频| www.欧美 | 国产精品久久久久久久久久久久 | 色就干| 国产精品成人一区二区 | 91精品观看| 欧美激情一区二区三区 | xxxxx免费视频 | 中文字幕第一页在线 | 91精品国产综合久久久久久丝袜 | aaa综合国产 | 日本精品视频 | 国产精品成人一区二区三区 | 一区中文字幕 | 亚洲一页| 色偷偷噜噜噜亚洲男人 | 天天摸天天干 | 少妇一级淫片免费播放 | 亚洲国产成人精 | 九九热视频这里只有精品 | 亚洲视频一区在线观看 | 91久久久久久久 | 色悠悠久| 成人影院在线观看 | 视频三区 | 国产精品夜间视频香蕉 | 黄色大片视频 | 国产高清在线观看 | 中文字幕乱码一区二区三区 | 黄网免费 | 欧美久久久久久久久 | 欧美一级黄色网 | 亚洲精品在线免费观看视频 | 精品自拍视频在线观看 | 超碰在线97国产 | 黄色毛片在线观看 | 成人性视频免费网站 | 日韩一区二区三区在线视频 |