使用canal同步MySQL数据到Elasticsearch(ES)
目錄
- 1、功能及使用場景
- 1.1、功能介紹
- 1.2、使用場景
- 2、需求引入
- 3、canal文件下載及準備
- 3.1 下載文件
- 3.2 準備文件
- 4、deployer安裝及效果測試
- 4.1、deployer 配置修改
- 4.1.1 準備
- 4.1.2 修改連接數據庫信息
- 4.2 啟動deployer
- 4.3 測試deployer效果
- 4.3.1 在本地電腦新建普通maven工程
- 4.3.2 新建ClientSimple類
- 5、adapter安裝及效果測試
- 5.1 修改配置
- 5.1.1 修改啟動器配置: application.yml
- 5.1.2 修改 conf/es/mytest_user.yml文件
- 5.2 啟動adapter
- 5.3 效果測試
- 6、全量數據導入
1、功能及使用場景
1.1、功能介紹
canal是阿里巴巴開源的mysql數據傳輸組件,基于mysql binlog,提供了準確、實時的數據傳輸服務。有關binlog介紹,參見binlog介紹。
以下來自官方GitHub介紹。GitHub地址
canal [k?’n?l],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費
早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基于業務 trigger 獲取增量變更。從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。
基于日志增量訂閱和消費的業務包括
- 數據庫鏡像
- 數據庫實時備份
- 索引構建和實時維護(拆分異構索引、倒排索引等)
- 業務 cache 刷新
- 帶業務邏輯的增量數據處理
當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
1.2、使用場景
根據官方文檔,目標系統支持MySQL、kafka、elasticsearch、hbase、rocketMQ、pulsar等。
本文主要介紹MySQL同步到Elasticsearch的使用。
2、需求引入
整個過程用一個示例來介紹canal的安裝使用。
存在如下兩張表(一對多關系),用戶信息表和用戶權限表,需要將用戶信息以及權限同步到es
user_info(用戶信息表):
| 1 | 張三 | 1 |
| 2 | 李四 | 2 |
| 3 | 王大錘 | 2 |
role(權限表):
| 1 | 管理員 |
| 2 | 測試員 |
查詢sql:
SELECT a.id AS _id, a.name, a.role_id, b.role_name FROM user_info a LEFT JOIN role b ON b.role_id = a.role_id查詢結果:
| 3 | 王大錘 | 2 | 測試員 |
| 1 | 張三 | 1 | 管理員 |
| 2 | 李四 | 1 | 管理員 |
對應的es索引結構:
{"user_index": {"mappings": {"_doc": {"properties": {"name": {"type": "text"},"role_id": {"type": "long"},"role_name": {"type": "text"}}}}} }3、canal文件下載及準備
3.1 下載文件
官方GitHub下載地址:https://github.com/alibaba/canal/releases
下載canal服務端(canal.deployer-1.1.4.tar.gz)和客戶端(canal.adapter-1.1.4.tar.gz),如下圖。
下載完成如下
3.2 準備文件
命令運行完成后,進入adapter和deployer可以看到如下結構(忽略我本機的.DS_Store)
adapter:
deployer:
4、deployer安裝及效果測試
4.1、deployer 配置修改
4.1.1 準備
(針對阿里云 RDS for MySQL , 默認打開了 binlog , 并且賬號默認具有 binlog dump 權限 , 不需要任何權限或者 binlog 設置,可以直接跳過這一步)
-
對于自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下
[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復 -
授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant,下面新建了canal賬號
4.1.2 修改連接數據庫信息
vi conf/example/instance.properties
修改如下標紅信息
4.2 啟動deployer
在deployer目錄運行啟動腳本
sh bin/startup.sh- 查看 server 日志
tail -f logs/canal/canal.log
- 查看 instance 的日志
tail -f logs/example/example.log
看到如上日志,標志啟動成功
4.3 測試deployer效果
(4.3 步驟可跳過,主要為驗證deployer端效果)
4.3.1 在本地電腦新建普通maven工程
pom文件,依賴如下pom
4.3.2 新建ClientSimple類
粘貼測試代碼(下面鏈接頁面上的ClientSimple代碼)
https://github.com/alibaba/canal/wiki/ClientExample
將圈紅ip改為部署adapter的ip,然后直接啟動此main方法
啟動完成看到如下日志:
日志會循環打印count
此時觸發數據庫變更
user_info表變更前
user_info表變更后,新加了一條名叫邏輯的記錄
可在日志處觀察到變更信息,標志著deployer監聽mysql binlog變更成功
5、adapter安裝及效果測試
canal adapter 的 Elastic Search 版本支持6.x.x以上
官方文檔地址
https://github.com/alibaba/canal/wiki/Sync-ES
5.1 修改配置
5.1.1 修改啟動器配置: application.yml
進adapter/conf目錄
vi application.yml
修改如下配置,注意縮進格式,yml文件嚴格縮進格式,格式錯誤會引起啟動失敗問題。
mode使用rest模式,測試使用transport會出問題,目前沒找到原因,下圖mode還沒更改
5.1.2 修改 conf/es/mytest_user.yml文件
adapter將會自動加載 conf/es 下的所有.yml結尾的配置文件
不需要的文件可刪除,只配置需要的yml文件
修改如下配置,esMapping信息,包括 index,type,sql,其中sql盡量保證不要換行,在文本編輯器中編輯成一行,在粘貼進去,否則可能會出問題(還是由于yml格式問題)
此處的sql的字段別名即是es字段名,不寫別名,默認原名即是es字段名,有關詳細說明,可參見官方文檔https://github.com/alibaba/canal/wiki/Sync-ES
etlCondition 可以注釋掉,我們默認任何條件都同步
5.2 啟動adapter
啟動命令 sh bin/startup.sh
查看日志:
tail -f logs/adapter/adapter.log
觀察如下日志,即表示啟動成功
5.3 效果測試
canal是一個MySQL增量訂閱組件,所以不支持數據的初始化
我們需要在數據庫觸發變更,才能將數據同步到es
變更前:
es數據->在kibana查詢對應數據,可以看到右側數據為空
mysql數據→使用查詢sql,查詢到數據如下
下面,我們進行數據變更:
比如將張三名字變成張六,MySQL數據如下:
此時,es數據在MySQL數據變更同時,es數據相應變更,如下,可以觀察到,數據變更已經成功從MySQL同步到elasticsearch
需要注意的是,由于我們只變更了user_info表,所以此處只同步了user_info表的name和id字段,role_name字段,并沒有同步,只有role_name字段變更時,才會被同步,所以實際使用時,要先做好數據初始化工作。
同時,可在logs/adapter/adapter.log觀察到數據變更日志
至此,全部操作完成
6、全量數據導入
由于canal只支持增量導入,所以官方adapter提供了全量導入手動觸發的功能,
canal全表同步(etl功能,手動觸發)
參見源碼:
只需要發送http命令即可
例:如下,ip和端口是部署canal adapter的IP和端口,然后類型選es,后面在跟上對應的yml配置文件即可
curl http://127.0.0.1:8081/etl/es/mytest_person2.yml -X POST導入成功提示
總結
以上是生活随笔為你收集整理的使用canal同步MySQL数据到Elasticsearch(ES)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java 远程debug服务器配置
- 下一篇: const_cast