詳解 canal 同步 MySQL 增量數(shù)據(jù)到 ES
canal 是阿里知名的開源項(xiàng)目,主要用途是基于 MySQL 數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)。
這篇文章,我們手把手向同學(xué)們展示使用 canal 將 MySQL 增量數(shù)據(jù)同步到 ES 。
1 集群模式
圖中 server 對(duì)應(yīng)一個(gè) canal 運(yùn)行實(shí)例 ,對(duì)應(yīng)一個(gè) JVM 。
server 中包含 1..n 個(gè) instance , 我們可以將 instance 理解為配置任務(wù)。
instance 包含如下模塊 :
- eventParser數(shù)據(jù)源接入,模擬 slave 協(xié)議和 master 進(jìn)行交互,協(xié)議解析
- eventSinkParser 和 Store 鏈接器,進(jìn)行數(shù)據(jù)過(guò)濾,加工,分發(fā)的工作
- eventStore數(shù)據(jù)存儲(chǔ)
- metaManager增量訂閱 & 消費(fèi)信息管理器
真實(shí)場(chǎng)景中,canal 高可用依賴 zookeeper ,筆者將客戶端模式可以簡(jiǎn)單劃分為:TCP 模式 和 MQ 模式 。
實(shí)戰(zhàn)中我們經(jīng)常會(huì)使用 MQ 模式 。因?yàn)?MQ 模式的優(yōu)勢(shì)在于解耦 ,canal server 將數(shù)據(jù)變更信息發(fā)送到消息隊(duì)列 kafka 或者 RocketMQ ,消費(fèi)者消費(fèi)消息,順序執(zhí)行相關(guān)邏輯即可。
順序消費(fèi):
對(duì)于指定的一個(gè) Topic ,所有消息根據(jù) Sharding Key 進(jìn)行區(qū)塊分區(qū),同一個(gè)分區(qū)內(nèi)的消息按照嚴(yán)格的先進(jìn)先出(FIFO)原則進(jìn)行發(fā)布和消費(fèi)。同一分區(qū)內(nèi)的消息保證順序,不同分區(qū)之間的消息順序不做要求。
2 MySQL配置
1、對(duì)于自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下
[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復(fù)
注意:針對(duì)阿里云 RDS for MySQL , 默認(rèn)打開了 binlog , 并且賬號(hào)默認(rèn)具有 binlog dump 權(quán)限 , 不需要任何權(quán)限或者 binlog 設(shè)置,可以直接跳過(guò)這一步。
2、授權(quán) canal 鏈接 MySQL 賬號(hào)具有作為 MySQL slave 的權(quán)限, 如果已有賬戶可直接 grant 。
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3、創(chuàng)建數(shù)據(jù)庫(kù)商品表 t_product 。
CREATE TABLE `t_product` (
`id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
`name` VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL,
`price` DECIMAL ( 10, 2 ) NOT NULL,
`status` TINYINT ( 4 ) NOT NULL,
`create_time` datetime NOT NULL,
`update_time` datetime NOT NULL,
PRIMARY KEY ( `id` )
) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin
3 Elasticsearch配置
使用 Kibana 創(chuàng)建商品索引 。
PUT /t_product
{
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"id": {
"type":"keyword"
},
"name": {
"type":"text"
},
"price": {
"type":"double"
},
"status": {
"type":"integer"
},
"createTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"updateTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}
}
}
}
執(zhí)行完成,如圖所示 :
4 RocketMQ 配置
創(chuàng)建主題:product-syn-topic ,canal 會(huì)將 Binlog 的變化數(shù)據(jù)發(fā)送到該主題。
5 canal 配置
我們選取 canal 版本 1.1.6 ,進(jìn)入 conf 目錄。
1、配置 canal.properties
#集群模式 zk地址
canal.zkServers = localhost:2181
#本質(zhì)是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rocketMQ
#instance 列表
canal.destinations = product-syn
#conf root dir
canal.conf.dir = ../conf
#全局的spring配置方式的組件文件 生產(chǎn)環(huán)境,集群化部署
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
###### 以下部分是默認(rèn)值 展示出來(lái)
# Canal的batch size, 默認(rèn)50K, 由于kafka最大消息體限制請(qǐng)勿超過(guò)1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get數(shù)據(jù)的超時(shí)時(shí)間, 單位: 毫秒, 空為不限超時(shí)
canal.mq.canalGetTimeout = 100
# 是否為 flat json格式對(duì)象
canal.mq.flatMessage = true
2、instance 配置文件
在 conf 目錄下創(chuàng)建實(shí)例目錄 product-syn , 在 product-syn 目錄創(chuàng)建配置文件 :instance.properties。
# 按需修改成自己的數(shù)據(jù)庫(kù)信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,數(shù)據(jù)庫(kù)的用戶名和密碼
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# table regex
canal.instance.filter.regex=mytest.t_product
# mq config
canal.mq.topic=product-syn-topic
# 針對(duì)庫(kù)名或者表名發(fā)送動(dòng)態(tài)topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partitinotallow=0
# hash partition config
#canal.mq.partitinotallow=3
#庫(kù)名.表名: 唯一主鍵,多個(gè)表之間用逗號(hào)分隔
#canal.mq.partitinotallow=mytest.person:id,mytest.role:id
#################################################
3、服務(wù)啟動(dòng)
啟動(dòng)兩個(gè) canal 服務(wù),我們從 zookeeper gui 中查看服務(wù)運(yùn)行情況 。
修改一條 t_product 表記錄,可以從 RocketMQ 控制臺(tái)中觀測(cè)到新的消息。
6 消費(fèi)者
1、產(chǎn)品索引操作服務(wù)
2、消費(fèi)監(jiān)聽器
消費(fèi)者邏輯重點(diǎn)有兩點(diǎn):
- 順序消費(fèi)監(jiān)聽器
- 將消息數(shù)據(jù)轉(zhuǎn)換成 JSON 字符串,從 data 節(jié)點(diǎn)中獲取表最新數(shù)據(jù)(批量操作可能是多條)。然后根據(jù)操作類型 UPDATE、 INSERT、DELETE 執(zhí)行產(chǎn)品索引操作服務(wù)的方法。
7 寫到最后
canal 是一個(gè)非常有趣的開源項(xiàng)目,很多公司使用 canal 構(gòu)建數(shù)據(jù)傳輸服務(wù)( Data Transmission Service ,簡(jiǎn)稱 DTS ) 。
推薦大家閱讀這個(gè)開源項(xiàng)目,你可以從中學(xué)習(xí)到網(wǎng)絡(luò)編程、多線程模型、高性能隊(duì)列 Disruptor、 流程模型抽象等。
這篇文章涉及到的代碼已收錄到下面的工程中,有興趣的同學(xué)可以一看。
https://github.com/makemyownlife/rocketmq4-learning