OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步(附视频)
《OpenShift / RHEL / DevSecOps 匯總目錄》
說明:本文已經在OpenShift 4.10環境中驗證
文章目錄
- 場景說明
- 部署環境
- 安裝CDC源和目標數據庫
- 安裝 MySQL
- 安裝 PostgreSQLSQL
- 安裝 AMQ Stream 環境
- 安裝 AMQ Stream Opeartor
- 創建 Kafka 實例
- 創建 KafkaConnect 用到的 Image
- 配置 KafkaConnect
- 配置 KafkaConnector
- MySqlConnector
- JdbcSinkConnector
- 環境檢查
- CDC 驗證
- 數據同步
- 添加數據
- 更新數據
- 刪除數據
- 演示視頻
- 參考
場景說明
本文使用 OpenShift 的 AMQ Steams(即企業版 Kafka)和 Redhat 主導的 CDC 開源項目 Debezium 來實現從 MySQL 到 PostgreSQL 數據庫的數據同步。
上圖中的 Kafka Connector 提供了訪問源或目標的參數, 而 Kafka Connect 為訪問源或目標的實際運行環境,該環境運行在相關容器中。
注意:本文操作需要用到 access.redhat.com 賬號,另外還需有一個鏡像 Registry 服務的賬號,本文使用的是 quay.io Registry 服務。
部署環境
首先創建一個項目
$ oc project db-cdc安裝CDC源和目標數據庫
安裝 MySQL
安裝 PostgreSQLSQL
安裝 AMQ Stream 環境
安裝 AMQ Stream Opeartor
在 OpenShift 中使用默認配置安裝 AMQ Stream Opeartor,步驟略。
創建 Kafka 實例
在安裝好的 AMQ Stream Opeartor 中根據以下配置創建 kafka 服務。
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata:name: my-cluster spec:kafka:config:offsets.topic.replication.factor: 3transaction.state.log.replication.factor: 3transaction.state.log.min.isr: 2default.replication.factor: 3min.insync.replicas: 2inter.broker.protocol.version: '3.1'storage:type: ephemerallisteners:- name: plainport: 9092type: internaltls: false- name: tlsport: 9093type: internaltls: trueversion: 3.1.0replicas: 3entityOperator:topicOperator: {}userOperator: {}zookeeper:storage:type: ephemeralreplicas: 3創建后會在 OpenShift 中看到部署的相關資源。
創建 KafkaConnect 用到的 Image
配置 KafkaConnect
在安裝好的 AMQ Stream Opeartor 中根據以下配置創建 KafkaConnect 對象,其中使用了前面生成的 “quay.io/dawnskyliu/connect-debezium:v1” 鏡像。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata:name: my-connect-clusterannotations:strimzi.io/use-connector-resources: "true" spec:version: 3.1.0replicas: 1image: 'quay.io/dawnskyliu/connect-debezium:v1'bootstrapServers: 'my-cluster-kafka-bootstrap:9093'tls:trustedCertificates:- secretName: my-cluster-cluster-ca-certcertificate: ca.crtconfig:group.id: connect-clusteroffset.storage.topic: connect-cluster-offsetsconfig.storage.topic: connect-cluster-configsstatus.storage.topic: connect-cluster-statusconfig.storage.replication.factor: 1offset.storage.replication.factor: 1status.storage.replication.factor: 1config.storage.min.insync.replicas: 1offset.storage.min.insync.replicas: 1status.storage.min.insync.replicas: 1配置 KafkaConnector
MySqlConnector
在安裝好的 AMQ Stream Opeartor 中根據以下配置創建 KafkaConnector 對象。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata:name: mysql-source-connectorlabels:strimzi.io/cluster: my-connect-cluster spec:class: io.debezium.connector.mysql.MySqlConnectortasksMax: 1config:"database.hostname": "mysql""database.ssl.mode": "disabled""database.allowPublicKeyRetrieval": "true""database.port": "3306""database.user": "debezium""database.password": "dbz""database.server.id": "1""database.server.name": "dbserver1""database.include": "inventory""database.history.kafka.bootstrap.servers": "my-cluster-kafka-bootstrap:9092""database.history.kafka.topic": "schema-changes.inventory""transforms": "route""transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter""transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)""transforms.route.replacement": "$3"JdbcSinkConnector
在安裝好的 AMQ Stream Opeartor 中根據以下配置創建 KafkaConnector 對象。
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata:name: postgresql-sink-connectorlabels:strimzi.io/cluster: my-connect-cluster spec:class: io.confluent.connect.jdbc.JdbcSinkConnectortasksMax: 1config:"topics": "customers""connection.url": "jdbc:postgresql://postgresql:5432/inventory?user=postgresuser&password=postgrespw""transforms": "unwrap""transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState""transforms.unwrap.drop.tombstones": "false""auto.create": "true""insert.mode": "upsert""delete.enabled": "true""pk.fields": "id""pk.mode": "record_key"環境檢查
在 AMQ Streams 的 Operator 中確認 Kafka,Kafka Connect 和 Kafka Connector 的運行狀態。
CDC 驗證
數據同步
確認 customers 表和數據已經從 MySQL 同步到 PostgreSQL 中。
$ POSTGRESQL_POD=$(oc get pod -l name=postgresql -o jsonpath={.items[0].metadata.name}) $ oc exec $POSTGRESQL_POD -it -- psql -U postgresuser inventory inventory=> select * from customers;last_name | id | first_name | email -----------+------+------------+-----------------------Thomas | 1001 | Sally | sally.thomas@acme.comBailey | 1002 | George | gbailey@foobar.comWalker | 1003 | Edward | ed@walker.comKretchmar | 1004 | Anne | annek@noanswer.org (4 rows)添加數據
在 MySQL 中執行命令添加新數據,然后在 PostgreSQL 確認變化數據已同步。
mysql> INSERT INTO customers VALUES (default,"test1","test1","test1@acme.com");更新數據
在 MySQL 中執行命令更新數據,然后在 PostgreSQL 確認變化數據已同步。
mysql> update customers set first_name='Test' where id = 1001;刪除數據
在 MySQL 中執行命令刪除數據,然后在 PostgreSQL 確認變化數據已同步。
mysql> delete from customers where first_name='test1';演示視頻
視頻
參考
https://github.com/liuxiaoyu-git/debezium_openshift
https://debezium.io/documentation/reference/1.9/operations/openshift.html
https://github.com/debezium/debezium-examples/tree/main/openshift
https://aws.amazon.com/cn/blogs/china/debezium-deep-dive/
總結
以上是生活随笔為你收集整理的OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步(附视频)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Plot 绘制点图
- 下一篇: json与xml的相互转换