mysql数据实时同步:Canal安装部署、kafka安装、zk安装、mysql安装、Canal Server+Canal Client HA,Canal+mysql+Kafka,相关验证(学习笔记)
目錄
1.1. 服務器準備
1.2. 設置主機名并配置hosts
1.3. 免密設置
1.4. 設置ntp時間
1.5. 關閉防火墻
1.6. 關閉selinux
1.7. 安裝JDK
1.8. 安裝zookeeper
1.9. 安裝scala
2.1. 解壓
2.2. 配置環境變量
2.3. 修改配置文件
2.4. 再次修改server.properties
2.5. 創建日志目錄
2.6. Kafka集群啟動與測試
2.7. topic數據發送與消費
2.8. Kafka集群監控–KafkaOffsetMonitor(老的方式)
2.9. Kafka集群監控–KafkaCenter
2.9.1. 下載
2.9.2. 初始化
2.9.3. 編輯 application.properties屬性文件
2.9.4. 編譯和運行
3.1. 卸載原來的mysql
3.2. 創建canal賬號
3.3. 開啟Binlog寫入功能
4.1. 機器準備
4.2. 下載canal
4.3. 解壓縮
4.4. 修改配置文件
4.5. 創建example的topic
4.6. 啟動canal服務
4.7. 驗證功能
4.8. 準備數據庫測試數據
4.9. ERROR c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x106d73f2, /192.168.106.1:1312 :> /192.168.106.103:11111], exception=java.nio.channels.ClosedChannelException
4.10. 數據監控微服務
5.1. 機器準備
5.2. 下載canal
5.3. 解壓縮
5.4. 修改配置文件
5.4.1. 修改 canal.properties
5.4.2. 修改 instance.properties
5.4.3. 另外一臺canal server配置
5.4.4. 啟動Zookeeper服務
5.4.5. 啟動canal服務(兩個canal同時啟動)
5.4.6. 客戶端鏈接消費數據
6.1. 機器準備
6.2. 下載canal
6.3. 解壓縮
6.4. 修改配置文件
6.4.1. 修改instance.properties
6.4.2. 修改canal.properties
6.5. 啟動相關服務
6.5.1. 啟動zookeeper服務
6.5.2. 啟動Kafka服務
6.5.3. 打開Kafka消費者
6.5.4. 啟動Canal服務
6.5.5. 觀察Kafka消費者
1.Canal安裝部署
1.1.服務器準備
| 192.168.106.103 | node1 | CentOS Linux release 7.4.1708 (Core) | Zookeeper,kafka (master),canal單集 |
| 192.168.106.104 | node2 | CentOS Linux release 7.4.1708 (Core) | Zookeeper,kafka (slave),canal-ha(master) |
| 192.168.106.105 | node3 | CentOS Linux release 7.4.1708 (Core) | Zookeeper,kafka (slave),canal-ha(slave) |
1.2.設置主機名并配置hosts
四臺機器分別執行:vim /etc/hostname ,分別修改為:node1,node2,node3
然后配置hosts,具體內容如下:
[root@node1 ~]# vim /etc/hosts192.168.106.103 node1 192.168.106.104 node2 192.168.106.105 node31.3.免密設置
四臺機器上分別執行:
ssh-keygen -t rsa ssh-copy-id node1 ssh-copy-id node2 ssh-copy-id node31.4.設置ntp時間
參考文檔:https://blog.csdn.net/tototuzuoquan/article/details/108900206
1.5.關閉防火墻
systemctl status firewalld.service # 查看防火墻的狀態 systemctl stop firewalld.service # 關閉防火墻 systemctl disable firewalld.service # 設置開機不啟動 systemctl is-enabled firewalld.service # 查看防火墻服務是否設置開機啟動1.6.關閉selinux
https://www.linuxidc.com/Linux/2016-11/137723.htm
1.7.安裝JDK
四臺機器中解壓jdk,然后配置環境變量,例如:
export JAVA_HOME=/root/jdk1.8.0_161 export JRE_HOME=$JAVA_HOME/jre export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib/rt.jar export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin然后在每臺機器上執行:source /etc/profile
1.8.安裝zookeeper
參考文章:https://blog.csdn.net/tototuzuoquan/article/details/54003140
其中zoo.cfg的內容如下:
將上面zookeeper遠程拷貝到node1、node2、node3上。
進入node1、node2、node3的/root/apache-zookeeper-3.6.2-bin/data,分別執行:
echo 1 > myid # node1上執行 echo 2 > myid # node2上執行 echo 3 > myid # node3上執行然后分別進入node1、node2、node3上執行:
# 啟動zk $ZOOKEEPER_HOME/bin/zkServer.sh start # 查看zk的狀態 $ZOOKEEPER_HOME/bin/zkServer.sh status1.9.安裝scala
此部分略。
配置環境變量
export SCALA_HOME=/root/scala-2.12.12 export PATH=$PATH:$SCALA_HOME/bin2.安裝Kafka
2.1.解壓
使用如下命令,解壓kafka安裝包:
tar -zxvf kafka_2.12-2.6.0.tgz刪除Kafka安裝包:
rm -rf kafka_2.12-2.6.0.tgz2.2.配置環境變量
環境變量如下:
export SCALA_HOME=/root/scala-2.12.12 export PATH=$PATH:$SCALA_HOME/binexport KAFKA_HOME=/root/kafka_2.12-2.6.0 export PATH=$PATH:$KAFKA_HOME/bin然后執行:source /etc/profile
2.3.修改配置文件
cd $KAFKA_HOME/config1、修改zookeeper.properties文件 [root@node1 config]# vim zookeeper.properties # ZooKeeper數據存儲路徑與Zookeeper配置文件保持一致 dataDir=/root/apache-zookeeper-3.6.2-bin/data2、修改consumer.properties [root@node1 config]# vim consumer.properties # 配置 Zookeeper 集群連接地址 zookeeper.connect=node1:2181,node2:2181,node3:21813 修改producer.properties [root@node1 config]# vim producer.properties # 修改kafka集群配置地址 bootstrap.servers=node1:9092,node2:9092,node3:90924 修改server.properties [root@node1 config]# vim server.properties # 配置ZooKeeper集群地址 zookeeper.connect=node1:2181,node2:2181,node3:2181 # 存儲日志文件目錄 log.dirs=/tmp/kafka-logs # 這個路徑可以修改將kafka等同步到各機器節點(在node1節點上執行) [root@node1 ~]# scp -r kafka_2.12-2.6.0 root@node2:$PWD [root@node1 ~]# scp -r kafka_2.12-2.6.0 root@node3:$PWD2.4.再次修改server.properties
在各個節點分別修改server.properties
# 修改node1節點 broker.id=1 #修改node2 節點 broker.id=2 #修改node3節點 broker.id=32.5.創建日志目錄
三臺機器上分別執行:
mkdir -p /tmp/kafka-logs (這里的/tmp/kafka-logs就是上面配置的kafka的日志目錄)2.6.Kafka集群啟動與測試
1、啟動zookeeper集群(3個節點上執行)
$ZOOKEEPER_HOME/bin/zkServer.sh start2、啟動kafka集群
# 啟動kafka cd $KAFKA_HOME bin/kafka-server-start.sh -daemon config/server.properties3、查看topic列表
[root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list4、創建topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 2 --partitions 2然后在看topic列表
[root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list test5、查看topic詳情
[root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test test2.7.topic數據發送與消費
1.新api使用
node2使用自帶腳本消費topic數據
node1使用自帶腳本向topic發送數據
[root@node1 kafka_2.12-2.6.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testnode3使用自帶腳本消費topic數據(此時消費最新數據)
[root@node3 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testnode3使用自帶腳本消費topic數據(從頭消費數據)
[root@node2 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning 輸入數據 adfasdasfd 輸入測試3 shuru 輸入測試2查看消費數據,必須要指定組。查看kafka組使用以下命令
[root@node2 kafka_2.12-2.6.0]# bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list console-consumer-21382查看topic每個partition數據消費情況
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --group xxx --describebin/kafka-consumer-groups.sh --describe --bootstrap-server 127.0.0.1:9092 --group xxx參數說明:
Group 消費者組
TOPIC:曾經消費或正在消費的 topic
PARTITION:分區編號
CURRENT-OFFSET:consumer group 最后一次提交的 offset
LOG-END-OFFSET: 最后提交的生產消息 offset
LAG:消費 offset 與生產 offset 之間的差值
CONSUMER-ID:當前消費 topic-partition 的 group 成員 id
HOST:消費節點的 ip 地址
CLIENT-ID:客戶端 id
2.8.Kafka集群監控–KafkaOffsetMonitor(老的方式)
KafkaOffsetMonitor 是一個可以用于監控 Kafka 的 Topic 及 Consumer 消費狀況的工具。以程
序一個 jar 包的形式運行,部署較為方便。只有監控功能,使用起來也較為安全。
作用:
1)監控 Kafka 集群狀態,Topic、Consumer Group 列表。
2)圖形化展示 topic 和 Consumer 之間的關系。
3)圖形化展示 Consumer 的 offset、Lag 等信息。
1.下載
下載地址:https://github.com/quantifind/KafkaOffsetMonitor(可以使用已經修改版本)
目前 kafka Monitor 必須使用舊 api 才可以監控到,新 api 目前還沒有實現。
2.腳本參數格式
zk: Zookeeper 集群地址
port: 為開啟 web 界面的端口號
refresh: 刷新時間
retain: 數據保留時間(單位 seconds, minutes, hours, days) 3.開發 kafkamonitor.sh 執行腳本
vi kafkamonitor.sh
4.腳本授權
給腳本 kafkamonitor.sh 賦予可執行權限
5.啟動監控腳本
bin/kafkamonitor.sh6.web 可視化
node1:80902.9.Kafka集群監控–KafkaCenter
github地址: https://github.com/xaecbd/KafkaCenter,下載KafkaCenter的包。
碼云的地址: https://gitee.com/newegg/KafkaCenter
2.9.1.下載
git clone https://github.com/xaecbd/KafkaCenter.git2.9.2.初始化
執行:KafkaCenter-master\KafkaCenter-Core\sql\table_script.sql。
2.9.3.編輯 application.properties屬性文件
具體位置是:KafkaCenter/KafkaCenter-Core/src/main/resources/application.properties
主要是修改數據庫的密碼。
2.9.4.編譯和運行
注意的是:確保你安裝的JDK是JDK8+
$ git clone https://github.com/xaecbd/KafkaCenter.git (上面已經執行過了) $ cd KafkaCenter $ mvn clean package -Dmaven.test.skip=true $ cd KafkaCenter\KafkaCenter-Core\target $ java -jar KafkaCenter-Core-2.3.0-SNAPSHOT.jar3.安裝mysql
3.1.卸載原來的mysql
mysql的安裝方式可以按照https://blog.csdn.net/tototuzuoquan/article/details/104210148中的方式進行安裝。
3.2.創建canal賬號
mysql -uroot -p 輸入:123456內容是: mysql> create user 'canal' identified by 'canal'; Query OK, 0 rows affected (0.00 sec)mysql> grant all privileges on *.* to 'canal'@'%' identified by 'canal'; Query OK, 0 rows affected (0.00 sec)mysql> flush privileges; Query OK, 0 rows affected (0.00 sec)mysql>3.3.開啟Binlog寫入功能
對于自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf
中配置如下:
重啟mysql
[root@node1 etc]# systemctl restart mysqld并創建數據庫test
create database test default character set utf8;4.Canal快速安裝部署
官網地址:https://github.com/alibaba/canal
4.1.機器準備
Canal服務端:node1
MySQL地址:node1
4.2.下載canal
下載地址:https://github.com/alibaba/canal/releases
主要有:
4.3.解壓縮
mkdir -p /root/canal tar zxvf canal.deployer-1.1.4.tar.gz -C /root/canal解壓完成后,進入/root/canal,可以看到如下結構:
[root@node1 canal]# pwd /root/canal [root@node1 canal]# ls bin conf lib logs [root@node1 canal]#4.4.修改配置文件
[root@node1 canal]# cd conf/example/ [root@node1 example]# ls instance.properties[root@node1 example]# vim instance.properties 內容是: ## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=1234## position info需要改成自己的數據庫信息 canal.instance.master.address=node1:3306# username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 canal.instance.defaultDatabaseName=test #此處不加的時候,表示的是所有庫4.5.創建example的topic
[root@node1 example]# cd $KAFKA_HOME [root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic example --replication-factor 1 --partitions 1 Created topic example.4.6.啟動canal服務
cd /root/canal bin/startup.sh觀察canal日志
[root@node1 canal]# cd /root/canal/logs/canal [root@node1 canal]# tail -f canal.log 2020-12-18 22:50:52.994 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler 2020-12-18 22:50:53.083 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2020-12-18 22:50:53.112 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server. 2020-12-18 22:50:53.192 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.106.103(192.168.106.103):11111] 2020-12-18 22:50:55.369 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......4.7.驗證功能
下載canal源碼,在idea中打開:canal-canal-1.1.4.zip。導入之后的效果如下:
打開類:com.alibaba.otter.canal.example.SimpleCanalClientPermanceTest,修改ip地址為:192.168.106.103。
4.8.準備數據庫測試數據
向mysql節點的數據庫中導入stu.sql表數據,然后可以對stu表進行插入、刪除或者修改操作。其中stu的內容如下:
create table `stu` (`name` varchar (60),`speciality` varchar (60) ); insert into `stu` (`name`, `speciality`) values('張三','美術'); insert into `stu` (`name`, `speciality`) values('張三','音樂'); insert into `stu` (`name`, `speciality`) values('李四','籃球'); insert into `stu` (`name`, `speciality`) values('小明','美術'); insert into `stu` (`name`, `speciality`) values('李四','美術'); insert into `stu` (`name`, `speciality`) values('小明','音樂');在插入數據,修改,刪除等操作后,查看數據變化。(也可以通過下面的”數據監控微服務”來查看數據)。
4.9.ERROR c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x106d73f2, /192.168.106.1:1312 :> /192.168.106.103:11111], exception=java.nio.channels.ClosedChannelException
在這個過程中可能出現類似上面這個問題,解決辦法是,參考:https://blog.csdn.net/woainimax/article/details/105991825 所說
4.10.數據監控微服務
當用戶執行數據庫的操作的時候,binlog 日志會被canal捕獲到,并解析出數據。我們就可以將解析出來的數據進行相應的邏輯處理。
我們在這里使用的一個開源的項目,它實現了springboot與canal的集成。比原生的canal更加優雅。
https://github.com/chenqian56131/spring-boot-starter-canal
使用前需要將starter-canal安裝到本地倉庫
我們可以參照它提供的canal-test,進行代碼實現。
(1)創建工程模塊changgou_canal,pom引入依賴(注意:也可以在canal-test工程中直接寫,并把下面的依賴添加進去)
<dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version> </dependency>(2)創建包com.changgou.canal ,包下創建啟動類
@SpringBootApplication @EnableCanalClient public class CanalApplication { ?public static void main(String[] args) {SpringApplication.run(CanalApplication.class, args);} }(3)添加配置文件application.properties
# 在在canal-test中,此處開始是注釋的 canal.client.instances.example.host=192.168.106.103 # 在canal-test中,此處為2181 canal.client.instances.example.port=11111 canal.client.instances.example.batchSize=1000 # canal.client.instances.example.zookeeperAddress=192.168.0.59:8080,192.168.0.59:8081 # canal.client.instances.example.clusterEnabled=true(4)創建com.changgou.canal.listener包,包下創建類
@CanalEventListener public class BusinessListener {@ListenPoint(schema = "test", table = {"stu"})public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {System.err.println("監聽test庫,stu表數據的變化");rowData.getBeforeColumnsList().forEach((c) -> System.err.println("更改前數據: " + c.getName() + " :: " + c.getValue()));rowData.getAfterColumnsList().forEach((c) -> System.err.println("更改后數據: " + c.getName() + " :: " + c.getValue()));} }測試:啟動數據監控微服務,修改test的stu表,觀察控制臺輸出。
執行后的效果如下:
5.Canal Server+Canal Client HA
Canal Server和client端的高可用方案依賴zookeer,啟動canal server和client的時候,都會向zookeeper讀取信息。Canal在zookeeper存儲的數據結構如下:
/otter └── canal└── destinations└── example # canal 實例名稱├── 1001 # canal client 信息│ ├── cursor # 當前消費的 mysql binlog 位點│ ├── filter # binlog 過濾條件│ └── running # 當前正在運行的 canal client 服務器├── cluster # canal server 列表│ └── ip:11111 └── running # 當前正在運行的 canal server 服務器Canal server 和 client 啟動的時候都會去搶占 zk 對應的 running 節點, 保證只有一個server 和 client 在運行, 而 server 和 client 的高可用切換也是基于監聽 running 節點進行的。
5.1.機器準備
3個節點zookeeper集群:node1,node2,node3
2個節點Canal服務端節點:node2,node3
MySQL節點:node1
5.2.下載canal
此處略
5.3.解壓縮
[root@node2 ~]# mkdir /root/canal-ha (node2,node3)上一樣。 [root@node2 ~]# tar -zxvf canal.deployer-1.1.4.tar.gz -C canal-ha/在node2上執行:
[root@node2 ~]# scp -r canal-ha root@node3:$PWD[root@node2 canal-ha]# pwd /root/canal-ha [root@node2 canal-ha]# ls bin conf lib logs [root@node2 canal-ha]#5.4.修改配置文件
5.4.1.修改 canal.properties
[root@node2 conf]# pwd /root/canal-ha/conf [root@node2 conf]# vim canal.properties # zk集群地址 canal.zkServers = node1:2181,node2:2181,node3:2181# 全局的spring配置方式的組件文件 canal.instance.global.spring.xml = classpath:spring/default-instance.xml備注:
default-instance.xml 介紹:store 選擇了內存模式,其余的 parser/sink依賴的位點管理選擇了持久化模式,目前持久化的方式主要是寫入zookeeper,保證數據集群共享。
特點:支持HA
場景:生產環境,集群化部署。
5.4.2.修改 instance.properties
# Canal偽裝的mysql slave的編號,不能與mysql數據庫與其他的slave重復。 canal.instance.mysql.slaveId = 1234 (兩臺canal不能一樣)# 要監聽的數據庫的地址和端口號 canal.instance.master.address=node1:3306# username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal5.4.3.另外一臺canal server配置
配置同上
注意:兩臺機器上的instance目錄的名字需要保證完全一致,HA模式是依賴于instance name進行管理,同時必須都選擇default-instance.xml配置。
5.4.4.啟動Zookeeper服務
此部分略
5.4.5.啟動canal服務(兩個canal同時啟動)
兩個節點分別執行如下命令啟動canal服務:
bin/startup.sh啟動后,你可以查看logs/example/example.log,只會看到一臺機器上出現了啟動成功的日志。
node2上可以看到:
node3上可以看到:
[root@node3 logs]# pwd /root/canal-ha/logs [root@node3 logs]# ls canal example [root@node3 logs]# cd example/ [root@node3 example]# ls example.log [root@node3 example]# tail -f example.log -n 500 2020-12-27 03:42:07.860 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2020-12-27 03:42:07.910 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2020-12-27 03:42:08.708 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)] 2020-12-27 03:42:08.983 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2020-12-27 03:42:08.983 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2020-12-27 03:42:10.851 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2020-12-27 03:42:10.864 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$ 2020-12-27 03:42:10.864 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 2020-12-27 03:42:10.907 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful.... 2020-12-27 03:42:11.634 [destination = example , address = node1/192.168.106.103:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2020-12-27 03:42:11.636 [destination = example , address = node1/192.168.106.103:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status 2020-12-27 03:42:26.160 [destination = example , address = node1/192.168.106.103:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=3802,serverId=1,gtid=,timestamp=1608982052000] cost : 14301ms , the next step is binlog dump查看一下zookeeper中節點信息,也可以知道當前工作的節點。
[root@node2 bin]# pwd /root/apache-zookeeper-3.6.2-bin/bin [root@node2 bin]# ./zkCli.sh [zk: localhost:2181(CONNECTED) 6] get /otter/canal/destinations/example/running {"active":true,"address":"192.168.106.105:11111"} [zk: localhost:2181(CONNECTED) 8] ls /otter/canal/destinations/example/cluster [192.168.106.104:11111, 192.168.106.105:11111]5.4.6.客戶端鏈接消費數據
可以直接指定zookeeper地址和instance name,canal client會自動從zookeeper中的running節點,獲取當前服務的工作節點,然后與其建立鏈接(這里還是使用官方提供的例子),要修改的類是com.alibaba.otter.canal.example.ClusterCanalClientTest:
package com.alibaba.otter.canal.example;import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors;/*** 集群模式的測試例子* * @version 1.0.4*/ public class ClusterCanalClientTest extends AbstractCanalClientTest {public ClusterCanalClientTest(String destination){super(destination);}public static void main(String args[]) {String destination = "example";// 基于固定canal server的地址,建立鏈接,其中一臺server發生crash,可以支持failover// CanalConnector connector = CanalConnectors.newClusterConnector(// Arrays.asList(new InetSocketAddress(// AddressUtils.getHostIp(),// 11111)),// "stability_test", "", "");// 基于zookeeper動態獲取canal server的地址,建立鏈接,其中一臺server發生crash,可以支持failoverCanalConnector connector = CanalConnectors.newClusterConnector("192.168.106.104:2181", destination, "canal", "canal");final ClusterCanalClientTest clientTest = new ClusterCanalClientTest(destination);clientTest.setConnector(connector);clientTest.start();Runtime.getRuntime().addShutdownHook(new Thread() {public void run() {try {logger.info("## stop the canal client");clientTest.stop();} catch (Throwable e) {logger.warn("##something goes wrong when stopping canal:", e);} finally {logger.info("## canal client is down.");}}});} }輸出的內容是:
**************************************************** * Batch Id: [1] ,count : [2] , memsize : [81] , Time : 2020-12-27 04:26:06 * Start : [mysql-bin.000001:3853:1608982052000(2020-12-26 19:27:32)] * End : [mysql-bin.000001:3903:1608982052000(2020-12-26 19:27:32)] ****************************************************----------------> binlog[mysql-bin.000001:3853] , name[test,stu] , eventType : INSERT , executeTime : 1608982052000(2020-12-26 19:27:32) , gtid : () , delay : 32314134 ms name : 小明 type=varchar(60) update=true speciality : 音樂 type=varchar(60) update=true ----------------END ----> transaction id: 730 ================> binlog[mysql-bin.000001:3903] , executeTime : 1608982052000(2020-12-26 19:27:32) , gtid : () , delay : 32314141ms連接成功后,canal server會記錄當前正在工作的canal client信息,比如客戶端ip,連接的端口信息等。
[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/1001/running {"active":true,"address":"192.168.106.1:3222","clientId":1001} [zk: localhost:2181(CONNECTED) 23] get /otter/canal/destinations/example/1001/cursor {"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"node1","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":4172,"serverId":1,"timestamp":1609019073000}}6.MySQL+Canal+Kafka集成開發
官網地址:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
6.1.機器準備
Zookeeper集群:node1,node2,node3
Kafka集群:node1,node2,node3
MySQL節點:node1
Canal服務端:node1
6.2.下載canal
下載地址:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
6.3.解壓縮
[root@node1 ~]# pwd /root [root@node1 ~]# mkdir canal-kafka [root@node1 ~]# tar -zxvf canal.deployer-1.1.4.tar.gz -C canal-kafka解壓完成后,進入/root/canal-kafka
[root@node1 ~]# cd canal-kafka/ [root@node1 canal-kafka]# ls bin conf lib logs [root@node1 canal-kafka]#6.4.修改配置文件
6.4.1.修改instance.properties
/root/canal-kafka/conf/example/instance.properties # position info canal.instance.master.address=192.168.106.103:3306# username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal# mq config canal.mq.topic=test6.4.2.修改canal.properties
/root/canal-kafka/conf/canal.properties
# tcp, kafka, RocketMQ canal.serverMode = kafka# zk集群地址 canal.zkServers = node1:2181,node2:2181,node3:2181# kafka集群地址 canal.mq.servers = node1:9092,node2:9092,node3:90926.5.啟動相關服務
6.5.1.啟動zookeeper服務
source /etc/profile $ZOOKEEPER_HOME/bin/zkServer.sh start6.5.2.啟動Kafka服務
# 啟動kafka cd $KAFKA_HOME bin/kafka-server-start.sh -daemon config/server.properties6.5.3.打開Kafka消費者
查看kafka-topic列表
cd $KAFKA_HOME[root@node1 kafka_2.12-2.6.0]# bin/kafka-topics.sh --zookeeper node1:2181 -list test [root@node1 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test6.5.4.啟動Canal服務
[root@node1 ~]# cd canal-kafka/ [root@node1 canal-kafka]# bin/startup.sh cd to /root/canal-kafka/bin for workaround relative path LOG CONFIGURATION : /root/canal-kafka/bin/../conf/logback.xml canal conf : /root/canal-kafka/bin/../conf/canal.properties CLASSPATH :/root/canal-kafka/bin/../conf:/root/canal-kafka/bin/../lib/zookeeper-3.4.5.jar:/root/canal-kafka/bin/../lib/zkclient-0.10.jar:/root/canal-kafka/bin/../lib/spring-tx-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-orm-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-jdbc-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-expression-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-core-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-context-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-beans-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/spring-aop-3.2.18.RELEASE.jar:/root/canal-kafka/bin/../lib/snappy-java-1.1.7.1.jar:/root/canal-kafka/bin/../lib/snakeyaml-1.19.jar:/root/canal-kafka/bin/../lib/slf4j-api-1.7.12.jar:/root/canal-kafka/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/root/canal-kafka/bin/../lib/simpleclient_httpserver-0.4.0.jar:/root/canal-kafka/bin/../lib/simpleclient_hotspot-0.4.0.jar:/root/canal-kafka/bin/../lib/simpleclient_common-0.4.0.jar:/root/canal-kafka/bin/../lib/simpleclient-0.4.0.jar:/root/canal-kafka/bin/../lib/scala-reflect-2.11.12.jar:/root/canal-kafka/bin/../lib/scala-logging_2.11-3.8.0.jar:/root/canal-kafka/bin/../lib/scala-library-2.11.12.jar:/root/canal-kafka/bin/../lib/rocketmq-srvutil-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-remoting-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-logging-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-common-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-client-4.5.2.jar:/root/canal-kafka/bin/../lib/rocketmq-acl-4.5.2.jar:/root/canal-kafka/bin/../lib/protobuf-java-3.6.1.jar:/root/canal-kafka/bin/../lib/oro-2.0.8.jar:/root/canal-kafka/bin/../lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/root/canal-kafka/bin/../lib/netty-all-4.1.6.Final.jar:/root/canal-kafka/bin/../lib/netty-3.2.2.Final.jar:/root/canal-kafka/bin/../lib/mysql-connector-java-5.1.47.jar:/root/canal-kafka/bin/../lib/metrics-core-2.2.0.jar:/root/canal-kafka/bin/../lib/lz4-java-1.4.1.jar:/root/canal-kafka/bin/../lib/logback-core-1.1.3.jar:/root/canal-kafka/bin/../lib/logback-classic-1.1.3.jar:/root/canal-kafka/bin/../lib/kafka-clients-1.1.1.jar:/root/canal-kafka/bin/../lib/kafka_2.11-1.1.1.jar:/root/canal-kafka/bin/../lib/jsr305-3.0.2.jar:/root/canal-kafka/bin/../lib/jopt-simple-5.0.4.jar:/root/canal-kafka/bin/../lib/jctools-core-2.1.2.jar:/root/canal-kafka/bin/../lib/jcl-over-slf4j-1.7.12.jar:/root/canal-kafka/bin/../lib/javax.annotation-api-1.3.2.jar:/root/canal-kafka/bin/../lib/jackson-databind-2.9.6.jar:/root/canal-kafka/bin/../lib/jackson-core-2.9.6.jar:/root/canal-kafka/bin/../lib/jackson-annotations-2.9.0.jar:/root/canal-kafka/bin/../lib/ibatis-sqlmap-2.3.4.726.jar:/root/canal-kafka/bin/../lib/httpcore-4.4.3.jar:/root/canal-kafka/bin/../lib/httpclient-4.5.1.jar:/root/canal-kafka/bin/../lib/h2-1.4.196.jar:/root/canal-kafka/bin/../lib/guava-18.0.jar:/root/canal-kafka/bin/../lib/fastsql-2.0.0_preview_973.jar:/root/canal-kafka/bin/../lib/fastjson-1.2.58.jar:/root/canal-kafka/bin/../lib/druid-1.1.9.jar:/root/canal-kafka/bin/../lib/disruptor-3.4.2.jar:/root/canal-kafka/bin/../lib/commons-logging-1.1.3.jar:/root/canal-kafka/bin/../lib/commons-lang3-3.4.jar:/root/canal-kafka/bin/../lib/commons-lang-2.6.jar:/root/canal-kafka/bin/../lib/commons-io-2.4.jar:/root/canal-kafka/bin/../lib/commons-compress-1.9.jar:/root/canal-kafka/bin/../lib/commons-codec-1.9.jar:/root/canal-kafka/bin/../lib/commons-cli-1.2.jar:/root/canal-kafka/bin/../lib/commons-beanutils-1.8.2.jar:/root/canal-kafka/bin/../lib/canal.store-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.sink-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.server-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.protocol-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.prometheus-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.parse.driver-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.parse.dbsync-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.parse-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.meta-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.instance.spring-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.instance.manager-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.instance.core-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.filter-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.deployer-1.1.4.jar:/root/canal-kafka/bin/../lib/canal.common-1.1.4.jar:/root/canal-kafka/bin/../lib/aviator-2.2.1.jar:/root/canal-kafka/bin/../lib/aopalliance-1.0.jar:.:/root/jdk1.8.0_161/lib/dt.jar:/root/jdk1.8.0_161/lib/tools.jar:/root/jdk1.8.0_161/jre/lib/rt.jar cd to /root/canal-kafka for continue [root@node1 canal-kafka]#6.5.5.觀察Kafka消費者
第一次啟動canal,如果mysql binlog有數據的話,可以直接采集到Kafka集群,打印到Kafka消費者控制臺。
[root@node1 kafka_2.12-2.6.0]# bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test {"data":null,"database":"`kafka_center`","es":1609021630000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Dumping database structure for kafka_center\r\nCREATE DATABASE IF NOT EXISTS `kafka_center` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */","sqlType":null,"table":"","ts":1609079707068,"type":"QUERY"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Dumping structure for table kafka_center.alert_group\r\nCREATE TABLE IF NOT EXISTS `alert_group` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `cluster_id` int(11) NOT NULL,\r\n `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `consummer_group` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `consummer_api` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `threshold` int(11) DEFAULT NULL,\r\n `dispause` int(11) DEFAULT NULL,\r\n `mail_to` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `webhook` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `create_date` datetime DEFAULT NULL,\r\n `owner_id` int(11) DEFAULT NULL,\r\n `team_id` int(11) DEFAULT NULL,\r\n `disable_alerta` tinyint(1) DEFAULT 0,\r\n `enable` tinyint(1) NOT NULL DEFAULT 1,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"alert_group","ts":1609079707071,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.cluster_info\r\nCREATE TABLE IF NOT EXISTS `cluster_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `name` varchar(255) COLLATE utf8_bin NOT NULL,\r\n `zk_address` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `broker` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `create_time` datetime DEFAULT NULL,\r\n `comments` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `enable` int(11) DEFAULT NULL,\r\n `broker_size` int(4) DEFAULT 0,\r\n `kafka_version` varchar(10) COLLATE utf8_bin DEFAULT '',\r\n `location` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `graf_addr` varchar(255) COLLATE utf8_bin DEFAULT '',\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"cluster_info","ts":1609079707071,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.ksql_info\r\nCREATE TABLE IF NOT EXISTS `ksql_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `cluster_id` int(11) DEFAULT NULL,\r\n `cluster_name` varchar(255) DEFAULT NULL,\r\n `ksql_url` varchar(255) DEFAULT NULL,\r\n `ksql_serverId` varchar(255) DEFAULT NULL,\r\n `version` varchar(255) DEFAULT NULL,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8","sqlType":null,"table":"ksql_info","ts":1609079707071,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.task_info\r\nCREATE TABLE IF NOT EXISTS `task_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `cluster_ids` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `location` varchar(20) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `partition` int(11) DEFAULT NULL,\r\n `replication` int(11) DEFAULT NULL,\r\n `message_rate` int(50) DEFAULT NULL,\r\n `ttl` int(11) DEFAULT NULL,\r\n `owner_id` int(11) DEFAULT NULL,\r\n `team_id` int(11) DEFAULT NULL,\r\n `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `create_time` datetime DEFAULT NULL,\r\n `approved` int(11) DEFAULT NULL,\r\n `approved_id` int(11) DEFAULT NULL,\r\n `approved_time` datetime DEFAULT NULL,\r\n `approval_opinions` varchar(1000) COLLATE utf8_bin DEFAULT '',\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"task_info","ts":1609079707071,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.team_info\r\nCREATE TABLE IF NOT EXISTS `team_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `own` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `alarm_group` varchar(255) COLLATE utf8_bin DEFAULT NULL,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"team_info","ts":1609079707071,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.topic_collection\r\nCREATE TABLE IF NOT EXISTS `topic_collection` (\r\n `id` int(11) unsigned NOT NULL AUTO_INCREMENT,\r\n `cluster_id` int(11) NOT NULL,\r\n `user_id` int(11) NOT NULL,\r\n `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"topic_collection","ts":1609079707072,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.topic_info\r\nCREATE TABLE IF NOT EXISTS `topic_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `cluster_id` int(11) NOT NULL,\r\n `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `partition` int(11) DEFAULT NULL,\r\n `replication` int(11) DEFAULT NULL,\r\n `ttl` bigint(11) DEFAULT NULL,\r\n `config` varchar(512) COLLATE utf8_bin DEFAULT NULL,\r\n `owner_id` int(11) DEFAULT NULL,\r\n `team_id` int(11) DEFAULT NULL,\r\n `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `create_time` datetime DEFAULT NULL,\r\n `file_size` bigint(20) NOT NULL DEFAULT -1,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"topic_info","ts":1609079707072,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.user_info\r\nCREATE TABLE IF NOT EXISTS `user_info` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `real_name` varchar(255) COLLATE utf8_bin DEFAULT '',\r\n `email` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',\r\n `role` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '100',\r\n `create_time` datetime DEFAULT NULL,\r\n `password` varchar(255) COLLATE utf8_bin DEFAULT '',\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"user_info","ts":1609079707072,"type":"CREATE"} {"data":null,"database":"kafka_center","es":1609021630000,"id":1,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 7.2.5 - SQLEditor <Script-14.sql> */ -- Data exporting was unselected.\r\n\r\n\r\n-- Dumping structure for table kafka_center.user_team\r\nCREATE TABLE IF NOT EXISTS `user_team` (\r\n `id` int(11) NOT NULL AUTO_INCREMENT,\r\n `user_id` int(11) DEFAULT NULL,\r\n `team_id` int(11) DEFAULT NULL,\r\n PRIMARY KEY (`id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin","sqlType":null,"table":"user_team","ts":1609079707072,"type":"CREATE"}可以往mysql刪除、更新、插入數據,kafka消費者控制臺可以實時消費到binlog日志數據。
往stu表中插入數據:
觀察日志,新增的內容如下:
{"data":[{"name":"田七","speciality":"語文"}],"database":"test","es":1609080224000,"id":2,"isDdl":false,"mysqlType":{"name":"varchar(60)","speciality":"varchar(60)"},"old":null,"pkNames":null,"sql":"","sqlType":{"name":12,"speciality":12},"table":"stu","ts":1609080224938,"type":"INSERT"}打個賞唄,您的支持是我堅持寫好博文的動力
總結
以上是生活随笔為你收集整理的mysql数据实时同步:Canal安装部署、kafka安装、zk安装、mysql安装、Canal Server+Canal Client HA,Canal+mysql+Kafka,相关验证(学习笔记)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 股票阿尔法和贝塔什么意思
- 下一篇: 住房公积金缴存比例怎么算 公积金由这些