Debezium的基本使用(以MySQL为例)
- GreatSQL社區原創內容未經授權不得隨意使用,轉載請聯系小編并注明來源。
- GreatSQL是MySQL的國產分支版本,使用上與MySQL一致。
一、Debezium介紹
摘自官網:Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.
簡單理解就是Debezium可以捕獲數據庫中所有行級的數據變化并包裝成事件流順序輸出。
二、基本使用
下面以MySQL為例介紹Debezium的基本使用。
1. MySQL的準備工作
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'dbz' IDENTIFIED BY 'dbzpwd';
2. 檢查MySQL是否開啟`log-bin` ```sql SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin';-- If the following error occurs: The 'INFORMATION_SCHEMA.GLOBAL_VARIABLES' feature is disabled... -- please execute the given SQL again after execute this SQL: set global show_compatibility_56=on;如果是OFF則需要修改MySQL配置文件,類似下面這樣:
server-id = 223344 #必須有 log_bin = mysql-bin #log_bin的值是binlog文件序列的基本名稱 binlog_format = ROW #必須是ROW binlog_row_image = FULL #必須是FULL expire_logs_days = 10 #依據實際情況而定2. 編寫程序
2.1. 工程依賴(Maven)
pom.xml
<dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>${version.debezium}</version> </dependency> <dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>${version.debezium}</version> </dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>${version.debezium}</version> </dependency>目前Debezium最新穩定版本為:1.9.5.Final
2.2. 準備數據庫&表
create database inventory; create table inventory.a (id bigint primary key, name varchar(32)); insert into inventory.a values (1, 'n1'),(2, 'n2'),(3, 'n3');2.3. 代碼編寫
package com.greatdb.dbzdemo;import java.io.IOException; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.Json;/*** @author wang.jianwen* @version 1.0* @date 2022/07/29*/ public class DebeziumTest {private static DebeziumEngine<ChangeEvent<String, String>> engine;public static void main(String[] args) throws Exception {final Properties props = new Properties();props.setProperty("name", "dbz-engine");props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");//offset config begin - 使用文件來存儲已處理的binlog偏移量props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");props.setProperty("offset.storage.file.filename", "/tmp/dbz/storage/mysql_offsets.dat");props.setProperty("offset.flush.interval.ms", "0");//offset config endprops.setProperty("database.server.name", "mysql-connector");props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");props.setProperty("database.history.file.filename", "/tmp/dbz/storage/mysql_dbhistory.txt");props.setProperty("database.server.id", "122112"); //需要與MySQL的server-id不同props.setProperty("database.hostname", "tmg");props.setProperty("database.port", "3306");props.setProperty("database.user", "mysqluser");props.setProperty("database.password", "mysqlpw");props.setProperty("database.include.list", "inventory");//要捕獲的數據庫名props.setProperty("table.include.list", "inventory.a");//要捕獲的數據表props.setProperty("snapshot.mode", "initial");//全量+增量// 使用上述配置創建Debezium引擎,輸出樣式為Json字符串格式engine = DebeziumEngine.create(Json.class).using(props).notifying(record -> {System.out.println(record);//輸出到控制臺}).using((success, message, error) -> {if (error != null) {// 報錯回調System.out.println("------------error, message:" + message + "exception:" + error);}closeEngine(engine);}).build();ExecutorService executor = Executors.newSingleThreadExecutor();executor.execute(engine);addShutdownHook(engine);awaitTermination(executor);System.out.println("------------main finished.");}private static void closeEngine(DebeziumEngine<ChangeEvent<String, String>> engine) {try {engine.close();} catch (IOException ignored) {}}private static void addShutdownHook(DebeziumEngine<ChangeEvent<String, String>> engine) {Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine)));}private static void awaitTermination(ExecutorService executor) {if (executor != null) {try {executor.shutdown();while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}} }3. 測試
程序跑起來后,可以看到控制臺輸出:
...(省略) EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":1}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"n1"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005186,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005191,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=1,name=n1},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005186,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005191}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}] EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":2}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"n2"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005195,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=2,name=n2},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005195,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}] EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":3}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":3,"name":"n3"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005196,"snapshot":"last","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=3,name=n3},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005196,snapshot=last,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}] ...(省略)可以看到全量的數據已經輸出,關鍵的數據如下:
..."payload":{"before":null,"after":{"id":1,"name":"n1"}..."op":"r"... ..."payload":{"before":null,"after":{"id":2,"name":"n2"}..."op":"r"... ..."payload":{"before":null,"after":{"id":3,"name":"n3"}..."op":"r"...接下來新增一條數據:
insert into inventory.a values (4, 'n4');控制臺輸出:
..."payload":{"before":null,"after":{"id":4,"name":"n4"}..."op":"c"...修改一條數據:
update inventory.a set name = 'n4-upd' where id = 4;控制臺輸出:
..."payload":{"before":{"id":4,"name":"n4"},"after":{"id":4,"name":"n4-upd"}..."op":"u"...刪除一條數據:
delete from inventory.a where id = 1;控制臺輸出:
..."payload":{"before":{"id":1,"name":"n1"},"after":null..."op":"d"...三、總結
本文以MySQL為例介紹了Debezium在代碼中基本使用流程,對MySQL的數據進行常見的增刪改操作,Debezium將捕獲這些數據行的變化,并記錄了數據行變化前后的數據,并對外提供事件流,外部可以獲取并對事件進行相應處理。
參考:https://debezium.io/documentation/reference/1.8/index.html
總結
以上是生活随笔為你收集整理的Debezium的基本使用(以MySQL为例)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SpringBoot整合RabbitMQ
- 下一篇: 【安全资讯】incaseformat蠕虫