SpringBoot與Debezium整合,實現供應鏈數據庫實時同步系統
Debezium專門用于捕獲數據庫的變化并將這些變化以實時流的方式推送到消息隊列系統Kafka,從而實現高效、可靠的實時數據同步和流處理。
我們為什么選擇Debezium?
實時數據同步
Debezium是一個開源的分布式平臺,專門用于捕獲數據庫的變化,并將這些變化以實時流的方式推送到Kafka或其他消息隊列系統。這對于需要實時更新庫存信息的應用場景尤為重要。
支持多種數據庫
Debezium支持多種關系型數據庫,包括MySQL、PostgreSQL、MongoDB等。這意味著我們可以靈活地選擇適合業務需求的數據庫,而無需擔心數據捕獲的問題。
高性能和低延遲
Debezium通過使用數據庫的日志文件(如MySQL的二進制日志)來捕獲數據變化,這種方式不僅高效而且延遲極低。這確保了即使在高并發環境下,也能快速響應數據庫的變化。
結構化數據輸出
Debezium將捕獲到的數據變化以結構化的JSON格式輸出,便于下游系統解析和處理。這種標準化的數據格式使得集成變得更加簡單和可靠。
容錯性和可靠性
Debezium具有強大的容錯機制,能夠在網絡故障或服務器重啟后繼續從斷點處恢復數據捕獲。這確保了數據的一致性和完整性。
易于配置和部署
Debezium可以通過簡單的REST API進行配置和管理,這大大簡化了部署過程。此外,Debezium與Kafka生態系統緊密集成,使得整個數據管道易于搭建和維護。
數據一致性保證
Debezium確保數據捕獲過程中的一致性,避免了因數據不同步導致的業務問題。這對于庫存管理系統來說尤為重要,因為任何庫存數據的不一致都可能導致嚴重的后果。
應用場景
- 博客平臺:當新文章發布或現有文章更新時,實時刷新前端頁面。
- 論壇:實時顯示最新的帖子和評論,提升用戶體驗。
- 跨系統集成:將 CRM 系統、ERP 系統和其他業務系統的數據變化整合到一起,提供統一的數據視圖。
- 增量加載:僅加載自上次同步以來發生變化的數據,減少數據傳輸量和處理時間。
- 庫存監控:當庫存低于閾值時,立即觸發告警,提醒相關人員補充庫存。
- 交易監控:實時監控金融交易數據,檢測可疑活動并觸發安全措施。
- 訂單管理系統:當訂單狀態發生變化時,將變化事件發送給支付、物流等微服務,觸發相應的業務流程。
- 用戶管理系統:當用戶信息更新時,將變化事件通知權限管理、營銷等微服務,保持數據一致性。
- 財務審計:記錄所有財務交易的變化,供后續審計使用。
- 大數據分析:將來自不同系統的數據變化收集到 Hadoop 或 Amazon S3 中,使用 Spark 等工具進行復雜的數據分析。
- 機器學習模型訓練:實時收集和處理數據,用于訓練和更新機器學習模型。
- 玩家行為分析:實時收集玩家的游戲行為數據,分析玩家偏好和游戲平衡性。
- 動態調整:根據玩家的行為數據動態調整游戲難度和獎勵機制。
哪些公司使用Debezium?
Uber
- 用途: Uber 使用 Debezium 捕獲訂單、司機位置等數據的變化,并將這些數據推送到 Kafka。
- 描述: 這使得 Uber 能夠實時監控訂單狀態和司機位置,優化調度算法并提高運營效率。
- 用途: LinkedIn 使用 Debezium 來捕獲用戶活動和社交網絡數據的變化。
- 描述: 通過 Debezium,LinkedIn 能夠實時更新推薦系統和新聞推送,提供個性化的用戶體驗。
Walmart
- 用途: Walmart 使用 Debezium 實現其供應鏈管理系統中的數據同步和實時分析。
- 描述: 通過 Debezium,Walmart 能夠實時監控庫存水平和訂單狀態,提高供應鏈效率和客戶滿意度。
IBM
- 用途: IBM 使用 Debezium 實現其混合云環境中的數據同步和流處理。
- 描述: Debezium 幫助 IBM 在不同云平臺之間無縫傳輸數據,確保業務連續性和數據一致性。
eBay
- 用途: eBay 使用 Debezium 實現其電子商務平臺中的數據同步和實時分析。
- 描述: 通過 Debezium,eBay 能夠實時更新商品信息和庫存狀態,提升購物體驗和運營效率。
PayPal
- 用途: PayPal 使用 Debezium 捕獲支付交易數據的變化,并將其用于實時風險管理和合規性檢查。
- 描述: 通過 Debezium,PayPal 能夠及時發現可疑交易行為,確保支付系統的安全性和可靠性。
Airbnb
- 用途: Airbnb 使用 Debezium 實現其內部系統的數據同步和實時監控。
- 描述: 通過 Debezium,Airbnb 能夠實時更新房源信息和預訂狀態,優化住宿安排和客戶服務。
數據表
-- 創建 products 表,存儲產品信息。
CREATETABLE products (
idINT AUTO_INCREMENT PRIMARY KEY,
nameVARCHAR(255) NOTNULLCOMMENT'產品名稱',
price DECIMAL(10, 2) NOTNULLCOMMENT'產品價格'
);
-- 創建 inventory 表,存儲產品的庫存信息。
CREATETABLE inventory (
idINT AUTO_INCREMENT PRIMARY KEY,
product_id INTCOMMENT'關聯的產品ID',
quantity INTNOTNULLCOMMENT'庫存數量',
last_updated TIMESTAMPDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPCOMMENT'最后更新時間',
FOREIGNKEY (product_id) REFERENCES products(id)
);
my.cnf文件配置
[mysqld]
log-bin=mysql-bin
binlog_format=ROW
server-id=1
expire_logs_days=10
“
確保啟用了二進制日志, 記得記得要重啟MySQL服務!!!
配置Debezium Connector
創建一個Debezium連接器配置文件 register-mysql.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "password",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventorydb",
"table.include.list": "inventorydb.products,inventorydb.inventory",
"include.schema.changes": "false"
}
}
使用curl命令注冊Debezium連接器:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
代碼實操
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.5</version>
<relativePath/><!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>inventory-sync</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>inventory-sync</name>
<description>Debezium Demo</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
# Kafka服務器地址
spring.kafka.bootstrap-servers=localhost:9092
# 消費者組ID
spring.kafka.consumer.group-id=inventory-consumer-group
# 自動偏移重置策略
spring.kafka.consumer.auto-offset-reset=earliest
# 鍵反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 值反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.connect.json.JsonDeserializer
Debezium消息監聽器
package com.example.inventorysync.listener;
import com.example.inventorysync.handler.DataChangeHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class DebeziumEventListener {
private static final Logger logger = LoggerFactory.getLogger(DebeziumEventListener.class);
@Autowired
private DataChangeHandler dataChangeHandler;
/**
* 監聽Kafka主題中的消息
*
* @param record 接收到的Kafka消息記錄
*/
@KafkaListener(topics = {"dbserver1.inventorydb.products", "dbserver1.inventorydb.inventory"})
public void listen(ConsumerRecord<String, String> record) {
logger.info("Received message: {}", record.value());
dataChangeHandler.handleChange(record.value());
}
}
數據處理器
package com.example.inventorysync.handler;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class DataChangeHandler {
private static final Logger logger = LoggerFactory.getLogger(DataChangeHandler.class);
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* 處理數據變更事件
*
* @param jsonData 變更事件的JSON字符串
*/
public void handleChange(String jsonData) {
try {
// 解析JSON數據
JsonNode rootNode = objectMapper.readTree(jsonData);
JsonNode payloadNode = rootNode.path("payload");
if (!payloadNode.isMissingNode()) {
String op = payloadNode.path("op").asText();
switch (op) {
case"c":
handleCreate(payloadNode);
break;
case"u":
handleUpdate(payloadNode);
break;
case"d":
handleDelete(payloadNode);
break;
default:
logger.warn("Unsupported operation type: {}", op);
}
} else {
logger.error("Payload node is missing in the JSON data");
}
} catch (Exception e) {
logger.error("Error processing data change event", e);
}
}
/**
* 處理插入操作
*
* @param payloadNode 包含插入數據的JSON節點
*/
private void handleCreate(JsonNode payloadNode) throws Exception {
// 獲取after節點的數據
JsonNode afterNode = payloadNode.path("after");
logger.info("Handling CREATE event: {}", afterNode.toString());
if (afterNode.has("id")) {
int id = afterNode.path("id").asInt();
String tableName = getTableName(afterNode);
if ("products".equals(tableName)) {
String name = afterNode.path("name").asText();
double price = afterNode.path("price").asDouble();
logger.info("Product created: ID={}, Name={}, Price={}", id, name, price);
} elseif ("inventory".equals(tableName)) {
int productId = afterNode.path("product_id").asInt();
int quantity = afterNode.path("quantity").asInt();
String lastUpdated = afterNode.path("last_updated").asText();
logger.info("Inventory created: ID={}, Product ID={}, Quantity={}, Last Updated={}", id, productId, quantity, lastUpdated);
}
}
}
/**
* 處理更新操作
*
* @param payloadNode 包含更新前后數據的JSON節點
*/
private void handleUpdate(JsonNode payloadNode) throws Exception {
// 獲取before和after節點的數據
JsonNode beforeNode = payloadNode.path("before");
JsonNode afterNode = payloadNode.path("after");
logger.info("Handling UPDATE event: Before - {}, After - {}", beforeNode.toString(), afterNode.toString());
if (afterNode.has("id")) {
int id = afterNode.path("id").asInt();
String tableName = getTableName(afterNode);
if ("products".equals(tableName)) {
String name = afterNode.path("name").asText();
double price = afterNode.path("price").asDouble();
logger.info("Product updated: ID={}, Name={}, Price={}", id, name, price);
} elseif ("inventory".equals(tableName)) {
int productId = afterNode.path("product_id").asInt();
int quantity = afterNode.path("quantity").asInt();
String lastUpdated = afterNode.path("last_updated").asText();
logger.info("Inventory updated: ID={}, Product ID={}, Quantity={}, Last Updated={}", id, productId, quantity, lastUpdated);
}
}
}
/**
* 處理刪除操作
*
* @param payloadNode 包含刪除前數據的JSON節點
*/
private void handleDelete(JsonNode payloadNode) throws Exception {
// 獲取before節點的數據
JsonNode beforeNode = payloadNode.path("before");
logger.info("Handling DELETE event: {}", beforeNode.toString());
if (beforeNode.has("id")) {
int id = beforeNode.path("id").asInt();
String tableName = getTableName(beforeNode);
if ("products".equals(tableName)) {
logger.info("Product deleted: ID={}", id);
} elseif ("inventory".equals(tableName)) {
logger.info("Inventory deleted: ID={}", id);
}
}
}
/**
* 獲取表名
*
* @param node 包含表名的JSON節點
* @return 表名
*/
private String getTableName(JsonNode node) {
return node.path("table").asText();
}
}
Application
package com.example.inventorysync;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class InventorySyncApplication {
public static void main(String[] args) {
SpringApplication.run(InventorySyncApplication.class, args);
}
}
測試
插入數據
執行SQL語句插入產品和庫存數據:
-- 插入產品數據
INSERT INTO products (name, price) VALUES ('Laptop', 999.99);
-- 插入庫存數據
INSERT INTO inventory (product_id, quantity) VALUES (1, 100);
日志:
2025-03-31 21:01:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"string","field":"name"},{"type":"decimal","field":"price"}],"optional":false,"name":"inventorydb.products.Value"},"optional":true},"payload":{"op":"c","before":null,"after":{"id":1,"name":"Laptop","price":999.99},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307260000,"snapshot":"last","db":"inventorydb","sequence":null,"table":"products","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":1234,"row":0,"thread":2,"query":null},"ts_ms":1680307260000}}
2025-03-31 21:01:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling CREATE event: {"id":1,"name":"Laptop","price":999.99}
2025-03-31 21:01:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Product created: ID=1, Name=Laptop, Price=999.99
2025-03-31 21:01:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"int32","field":"product_id"},{"type":"int32","field":"quantity"},{"type":"string","field":"last_updated"}],"optional":false,"name":"inventorydb.inventory.Value"},"optional":true},"payload":{"op":"c","before":null,"after":{"id":1,"product_id":1,"quantity":100,"last_updated":"2025-03-31T21:01:01.000Z"},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307261000,"snapshot":"last","db":"inventorydb","sequence":null,"table":"inventory","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":5678,"row":0,"thread":2,"query":null},"ts_ms":1680307261000}}
2025-03-31 21:01:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling CREATE event: {"id":1,"product_id":1,"quantity":100,"last_updated":"2025-03-31T21:01:01.000Z"}
2025-03-31 21:01:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Inventory created: ID=1, Product ID=1, Quantity=100, Last Updated=2025-03-31T21:01:01.000Z
更新數據
執行SQL語句更新產品和庫存數據:
-- 更新產品數據
UPDATE products SET price = 899.99 WHERE id = 1;
-- 更新庫存數據
UPDATE inventory SET quantity = 90 WHERE id = 1;
日志:
2025-03-31 21:02:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"string","field":"name"},{"type":"decimal","field":"price"}],"optional":false,"name":"inventorydb.products.Value"},"optional":true},"payload":{"op":"u","before":{"id":1,"name":"Laptop","price":999.99},"after":{"id":1,"name":"Laptop","price":899.99},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307320000,"snapshot":"false","db":"inventorydb","sequence":null,"table":"products","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":9012,"row":0,"thread":2,"query":null},"ts_ms":1680307320000}}
2025-03-31 21:02:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling UPDATE event: Before - {"id":1,"name":"Laptop","price":999.99}, After - {"id":1,"name":"Laptop","price":899.99}
2025-03-31 21:02:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Product updated: ID=1, Name=Laptop, Price=899.99
2025-03-31 21:02:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"int32","field":"product_id"},{"type":"int32","field":"quantity"},{"type":"string","field":"last_updated"}],"optional":false,"name":"inventorydb.inventory.Value"},"optional":true},"payload":{"op":"u","before":{"id":1,"product_id":1,"quantity":100,"last_updated":"2025-03-31T21:01:01.000Z"},"after":{"id":1,"product_id":1,"quantity":90,"last_updated":"2025-03-31T21:02:01.000Z"},"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307321000,"snapshot":"false","db":"inventorydb","sequence":null,"table":"inventory","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":13456,"row":0,"thread":2,"query":null},"ts_ms":1680307321000}}
2025-03-31 21:02:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling UPDATE event: Before - {"id":1,"product_id":1,"quantity":100,"last_updated":"2025-03-31T21:01:01.000Z"}, After - {"id":1,"product_id":1,"quantity":90,"last_updated":"2025-03-31T21:02:01.000Z"}
2025-03-31 21:02:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Inventory updated: ID=1, Product ID=1, Quantity=90, Last Updated=2025-03-31T21:02:01.000Z
刪除數據
執行SQL語句刪除產品和庫存數據:
-- 刪除庫存數據
DELETE FROM inventory WHERE id = 1;
-- 刪除產品數據
DELETE FROM products WHERE id = 1;
日志:
2025-03-31 21:03:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"int32","field":"product_id"},{"type":"int32","field":"quantity"},{"type":"string","field":"last_updated"}],"optional":false,"name":"inventorydb.inventory.Value"},"optional":true},"payload":{"op":"d","before":{"id":1,"product_id":1,"quantity":90,"last_updated":"2025-03-31T21:02:01.000Z"},"after":null,"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307380000,"snapshot":"false","db":"inventorydb","sequence":null,"table":"inventory","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":17890,"row":0,"thread":2,"query":null},"ts_ms":1680307380000}}
2025-03-31 21:03:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling DELETE event: {"id":1,"product_id":1,"quantity":90,"last_updated":"2025-03-31T21:02:01.000Z"}
2025-03-31 21:03:00.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Inventory deleted: ID=1
2025-03-31 21:03:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.l.DebeziumEventListener : Received message: {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","field":"id"},{"type":"string","field":"name"},{"type":"decimal","field":"price"}],"optional":false,"name":"inventorydb.products.Value"},"optional":true},"payload":{"op":"d","before":{"id":1,"name":"Laptop","price":899.99},"after":null,"source":{"version":"1.9.5.Final","connector":"mysql","name":"dbserver1","ts_ms":1680307381000,"snapshot":"false","db":"inventorydb","sequence":null,"table":"products","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":22345,"row":0,"thread":2,"query":null},"ts_ms":1680307381000}}
2025-03-31 21:03:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Handling DELETE event: {"id":1,"name":"Laptop","price":899.99}
2025-03-31 21:03:01.000 INFO [kafka-listener-container-0-C-1] c.e.i.h.DataChangeHandler : Product delete