基于debezium实时数据同步(Oracle为主)
基于debezium實時數(shù)據(jù)同步
- 全部需要下載的內(nèi)容鏈接
- 1、下載zookeeper-3.4.10
- 2、下載kafka_2.13-2.8.0
- 3、下載Kafka Connector:建議使用1.6以上版本可以對ddl進(jìn)行捕獲
- 4、安裝debezium-connector-oracle
- 4.1下載debezium-connector-oracle-1.6.0.Final-plugin.tar.gz并解壓,安裝在自己的服務(wù)器,我的安裝目錄是/home/debezium/
- 4.2、將debezium-connector-oracle 目錄下得jar包都拷貝一份到${KAFKA_HOME}/libs中
- 4.3、Oracle需要下載客戶端并把jar包復(fù)制到${KAFKA_HOME}/libs
- 5、kafka環(huán)境修改,使用集群方式配置,但其實kafka非集群搭建
- 6、啟動zookeeper、kafka,connect-distributed環(huán)境
- 6.1.進(jìn)入zookeeper目錄
- 6.2.進(jìn)入kafka目錄
- 6.3.以環(huán)境配置方式啟動connect-distributed
- 7、提交Oracle-connector,監(jiān)視Oracle數(shù)據(jù)庫
- 8、查看創(chuàng)建的kafka connector列表
- 9、查看創(chuàng)建的kafka connector狀態(tài)
- 10、查看創(chuàng)建的kafka connector配置
- 11、查看kafka中topic
- 12、flinksqlclient創(chuàng)建表并測試
- 附上:Oracle的歸檔開啟
- Oracle 開啟歸檔日志
- 創(chuàng)建 新得表空間與dbzuser,并賦予相應(yīng)得權(quán)限
- 暫時可以不用,官網(wǎng)有做要求,暫時沒明白有什么用
- kafka查看topic和消息內(nèi)容命令
- 1、查詢topic,進(jìn)入kafka目錄:
- 2、查詢topic內(nèi)容:
全部需要下載的內(nèi)容鏈接
https://download.csdn.net/download/u010978399/217334521、下載zookeeper-3.4.10
https://blog.csdn.net/She_lock/article/details/80435176?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-8.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-8.control2、下載kafka_2.13-2.8.0
kafka安裝參考:
https://blog.csdn.net/weixin_39984161/article/details/91971731?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522161959594516780262520102%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=161959594516780262520102&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-1-91971731.pc_search_result_before_js&utm_term=linux%E5%AE%89%E8%A3%85kafka3、下載Kafka Connector:建議使用1.6以上版本可以對ddl進(jìn)行捕獲
debezium-connector-mysql-1.6.0.Final-plugin.tar.gz
https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.6.0.Final/debezium-connector-mysql-1.6.0.Final-plugin.tar.gzdebezium-connector-postgres-1.6.0.Final-plugin.tar.gz
https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.6.0.Final/debezium-connector-postgres-1.6.0.Final-plugin.tar.gzdebezium-connector-oracle-1.6.0.Final-plugin.tar.gz
https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/1.6.0.Final/debezium-connector-oracle-1.6.0.Final-plugin.tar.gz4、安裝debezium-connector-oracle
4.1下載debezium-connector-oracle-1.6.0.Final-plugin.tar.gz并解壓,安裝在自己的服務(wù)器,我的安裝目錄是/home/debezium/
4.2、將debezium-connector-oracle 目錄下得jar包都拷貝一份到${KAFKA_HOME}/libs中
4.3、Oracle需要下載客戶端并把jar包復(fù)制到${KAFKA_HOME}/libs
客戶端下載地址:
https://download.oracle.com/otn_software/linux/instantclient/211000/instantclient-basic-linux.x64-21.1.0.0.0.zip5、kafka環(huán)境修改,使用集群方式配置,但其實kafka非集群搭建
kafka安裝目錄:/home/kafka/kafka_2.13-2.8.0/
單機部署修改 [connect-standalone.properties]
集群部署修改 [connect-distributed.properties]
6、啟動zookeeper、kafka,connect-distributed環(huán)境
6.1.進(jìn)入zookeeper目錄
啟動zookeeper
sh zkServer.sh start停止zookeeper
sh zkServer.sh stop6.2.進(jìn)入kafka目錄
啟動kafka
/home/kafka/kafka_2.13-2.8.0/bin/kafka-server-start.sh /home/kafka/kafka_2.13-2.8.0/config/server.properties &關(guān)閉kafka
/home/kafka/kafka_2.13-2.8.0/bin/kafka-server-stop.sh /home/kafka/kafka_2.13-2.8.0/config/server.properties &6.3.以環(huán)境配置方式啟動connect-distributed
加載環(huán)境
export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/home/kafka/kafka_2.13-2.8.0/config/connect-log4j.properties啟動
./bin/connect-distributed.sh /home/kafka/kafka_2.13-2.8.0/config/connect-distributed.properties &末尾 一定要加上符號&是為了后臺運行,這樣就不會頁面一關(guān),服務(wù)就沒有了
7、提交Oracle-connector,監(jiān)視Oracle數(shù)據(jù)庫
這個就是在liunx里面,命令直接貼進(jìn)去
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://172.16.50.22:8085/connectors/ -d ' { "name": "debezium-oracle", "config": { "connector.class" : "io.debezium.connector.oracle.OracleConnector", "tasks.max" : "1", "database.server.name" : "XE", "database.hostname" : "172.16.50.239", "database.port" : "1521", "database.user" : "amir", "database.password" : "amir", "database.dbname" : "XE", "database.schema" : "MSCDW", "database.connection.adapter": "logminer", "database.tablename.case.insensitive": "true", "table.include.list" : "MSCDW.*", "snapshot.mode" : "initial", "schema.include.list" : "MSCDW", "database.history.kafka.bootstrap.servers" : "172.16.50.22:9092", "database.history.kafka.topic": "schema-changes.inventory" } }'8、查看創(chuàng)建的kafka connector列表
鏈接:
172.16.50.22:8085/connectors9、查看創(chuàng)建的kafka connector狀態(tài)
鏈接:
172.16.50.22:8085/connectors/debezium-oracle/status這里的debezium-oracle是上一步查出來的名稱
10、查看創(chuàng)建的kafka connector配置
鏈接:
172.16.50.22:8085/connectors/debezium-oracle/config11、查看kafka中topic
當(dāng)環(huán)境搭建好之后,默認(rèn)為每個表創(chuàng)建一個屬于自己的主題,如圖所示,小編這里使用的kafka Tool工具查看,注意這里的主題為XE.SCOTT.DEPT,而非XE.MSCDW.CONFIG,其實按照上述步驟應(yīng)該是MSCDW,但因為在寫文檔的時候忘記放這塊的內(nèi)容,是后來才發(fā)現(xiàn)補的,補的時候配置是監(jiān)聽SCOTT庫的DDL,就懶的換了。
12、flinksqlclient創(chuàng)建表并測試
CREATE TABLE sinkMysqlConfigTable ( ID STRING, CRON STRING ) WITH ( ‘connector.type’ = ‘jdbc’, ‘connector.url’ = ‘jdbc:mysql://IP:3306/admin’, ‘connector.table’ = ‘config’, ‘connector.username’ = ‘root’, ‘connector.password’ = ‘dhcc@2020’, ‘connector.write.flush.max-rows’ = ‘1’ );CREATE TABLE createOracleConfigTable ( id STRING, cron STRING ) WITH ( ‘connector’ = ‘kafka’, ‘topic’ = ‘XE.MSCDW.CONFIG’, ‘properties.bootstrap.servers’ = ‘172.16.50.22:9092’, ‘debezium-json.schema-include’ =‘false’, ‘properties.group.id’ = ‘a(chǎn)mirdebezium’, ‘scan.startup.mode’ = ‘earliest-offset’, ‘value.format’ = ‘debezium-json’ );附上:Oracle的歸檔開啟
#按要求修改,不然會報錯
alter system set db_recovery_file_dest_size=5G;Oracle 開啟歸檔日志
#開啟行模式
alter database add supplemental log data (all) columns;創(chuàng)建 新得表空間與dbzuser,并賦予相應(yīng)得權(quán)限
CREATE TABLESPACE LOGMINER_TBS DATAFILE '/home/oracle/app/oracle/oradata/amir/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; CREATE USER dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; GRANT CREATE SESSION TO dbzuser; GRANT SELECT ON V_$DATABASE TO dbzuser; GRANT FLASHBACK ANY TABLE TO dbzuser; GRANT SELECT ANY TABLE TO dbzuser; GRANT SELECT_CATALOG_ROLE TO dbzuser; GRANT EXECUTE_CATALOG_ROLE TO dbzuser; GRANT SELECT ANY TRANSACTION TO dbzuser; GRANT SELECT ANY DICTIONARY TO dbzuser;GRANT CREATE TABLE TO dbzuser; GRANT ALTER ANY TABLE TO dbzuser; GRANT LOCK ANY TABLE TO dbzuser; GRANT CREATE SEQUENCE TO dbzuser;GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser; GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser; GRANT SELECT ON V_$LOGMNR_LOGS to dbzuser; GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser; GRANT SELECT ON V_$LOGFILE TO dbzuser; GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;暫時可以不用,官網(wǎng)有做要求,暫時沒明白有什么用
CREATE USER debezium IDENTIFIED BY dbz DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS; GRANT CONNECT TO debezium; GRANT CREATE SESSION TO debezium; GRANT CREATE TABLE TO debezium; GRANT CREATE SEQUENCE to debezium; ALTER USER debezium QUOTA 100M on users;kafka查看topic和消息內(nèi)容命令
1、查詢topic,進(jìn)入kafka目錄:
bin/kafka-topics.sh --list --zookeeper localhost:21812、查詢topic內(nèi)容:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --from-beginning總結(jié)
以上是生活随笔為你收集整理的基于debezium实时数据同步(Oracle为主)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Ae:摄像机命令
- 下一篇: Android程序员该如何进阶?,202