整合flink-cdc实现实时读postgrasql
整合flink-cdc實現實時讀postgrasql
什么是wal日志
wal日志即write ahead log預寫式日志,簡稱wal日志。wal日志可以說是PostgreSQL中十分重要的部分,相當于oracle中的redo日志。
當數據庫中數據發生變更時:
change發生時:先要將變更后內容計入wal buffer中,再將變更后的數據寫入data buffer;
commit發生時:wal buffer中數據刷新到磁盤;
checkpoint發生時:將所有data buffer刷新的磁盤。
可以想象,如果沒有wal日志,那么數據庫中將會發生什么?
首先,當我們在數據庫中更新數據時,如果沒有wal日志,那么每次更新都會將數據刷到磁盤上,并且這個動作是隨機i/o,性能可想而知。并且沒有wal日志,關系型數據庫中事務的ACID如何保證呢?
因此wal日志重要性可想而知。其中心思想就是:先寫入日志文件,再寫入數據
什么是復制槽
制槽(replication slot)在postgresql9.4版本中被引入,引入之初是為了防止備庫需要的xlog日志在主庫被刪除,主庫會會根據備庫返回的信息確認哪些xlog已不再需要,,才能進行清理。同時主庫不會移除那些導致恢復沖突的行,關于恢復沖突,前面有一篇文章講到過可以通過設置hot_standby_feedback、max_standby_streaming_delay等參數進行預防,但是這些參數只有在主備關系正常時才能起到作用,而replication slot能夠確保在主備斷連后主庫的xlog仍不被清理。
復制槽分為物理復制槽physical replication slot和邏輯復制槽logic replication slot。物理復制槽一般結合流復制一起使用,能夠很好的保證備庫需要的日志不會在主庫刪除,本文重點討論邏輯復制槽。
Logic replication slots一般被用于邏輯異步復制,一個很好的應用就是用于異構數據庫之間的邏輯復制。大致原理是將源端xlog進行解碼,解析成具體sql,然后到目標端進行回放。支持邏輯解碼需要將wal_level設置為logic,logic會在replica的基礎上增加一些信息以支持邏輯解碼,該模式會增大wal日志的數量,尤其是大量的update,delete操作的庫。
需要關注的問題
對于邏輯復制槽,有下面幾點需要注意:
①一個邏輯復制槽只能解碼一個database,但是一個database可以有多個復制槽進行解碼,同一個復制槽可能同時有多個接收端進行訂閱。
②復制槽的消息只發送一次,同時它不關心接收端的狀態,如果接收端執行失敗,那么復制槽不會向前推進,接收端成功后繼續從上次失敗的位點繼續進行消費。
③不支持DDL、列存、壓縮表的解碼,DDL一般需要需要創建額外的觸發器來進行處理,但可以做到表級訂閱。
④邏輯復制不能保證數據不丟失,不能用作數據容災,但是可以用于數據遷移,在主庫有大事務的情況下延遲較大。
⑤不使用的復制槽一定要及時刪除。
注意
通過復制槽,從庫訂閱主庫,可以保證從庫在沒有收到主庫的日志之前,主庫不會刪除從庫未讀的部分。也因此不用的槽要即時刪除,否則會導致日志積壓
***flink-cdc***的就是通過創建復制槽訂閱PG來實現實時監控數據庫變化的。
flink-cdc配置以及使用
1.maven依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.flink</groupId> <artifactId>database</artifactId> <version>1.0-SNAPSHOT</version> <properties> <flink.version>1.13.0</flink.version> <scala.binary.version>2.11</scala.binary.version> <kafka.version>2.2.0</kafka.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <!--maven properties --> <maven.test.skip>false</maven.test.skip> <maven.javadoc.skip>false</maven.javadoc.skip> <!-- compiler settings properties --> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <commons-lang.version>2.5</commons-lang.version> <scala.binary.version>2.11</scala.binary.version> <spotless.version>2.4.2</spotless.version> <jaxb-api.version>2.3.1</jaxb-api.version> </properties><dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.13.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-postgres-cdc</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope></dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-rabbitmq_2.11</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!--log4j日志包 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>5.2.8.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>PgsqlToMysqlTest</mainClass> </transformer> </transformers> <createDependencyReducedPom>false</createDependencyReducedPom> </configuration> </execution> </executions> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>${maven.compiler.source}</source> <target>${maven.compiler.target}</target> <compilerVersion>${maven.compiler.source}</compilerVersion> <showDeprecation>true</showDeprecation> <showWarnings>true</showWarnings> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.12.4</version> <configuration> <skipTests>${maven.test.skip}</skipTests> </configuration> </plugin> <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> <version>0.12</version> <configuration> <excludes> <exclude>.gitignore</exclude> <exclude>.travis.yml</exclude> <exclude>.asf.yaml</exclude> <exclude>README.md</exclude> </excludes> </configuration> </plugin> <plugin> <groupId>org.jacoco</groupId> <artifactId>jacoco-maven-plugin</artifactId> <version>0.8.7</version> <executions> <execution> <id>prepare-agent</id> <goals> <goal>prepare-agent</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.eluder.coveralls</groupId> <artifactId>coveralls-maven-plugin</artifactId> <version>4.3.0</version> <dependencies> <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>${jaxb-api.version}</version> </dependency> </dependencies> </plugin> <plugin> <artifactId>maven-checkstyle-plugin</artifactId> <version>2.17</version> <executions> <execution> <id>verify</id> <phase>verify</phase> <configuration> <configLocation>style/rmq_checkstyle.xml</configLocation> <encoding>UTF-8</encoding> <consoleOutput>true</consoleOutput> <failsOnError>true</failsOnError> <includeTestSourceDirectory>false</includeTestSourceDirectory> <includeTestResources>false</includeTestResources> </configuration> <goals> <goal>check</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-javadoc-plugin</artifactId> <version>2.10.4</version> <configuration> <aggregate>true</aggregate> <reportOutputDirectory>javadocs</reportOutputDirectory> <locale>en</locale> </configuration> </plugin> <!-- Due to the Flink build setup, "mvn spotless:apply" and "mvn spotless:check" don't work. You have to use the fully qualified name, i.e. "mvn com.diffplug.spotless:spotless-maven-plugin:apply" --> </plugins> </build></project>再看看代碼怎么寫
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PgsqlToMysqlTest { public static void main(String[] args) throws Exception { // 設置flink表環境變量 EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // 獲取flink流環境變量 StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment(); exeEnv.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE); exeEnv.getCheckpointConfig().setCheckpointTimeout(60000); // make sure 500 ms of progress happen between checkpoints exeEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // allow only one checkpoint to be in progress at the same time exeEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained after job cancellation exeEnv.getCheckpointConfig() .enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); FsStateBackend stateBackend = new FsStateBackend("file:\\\\D:\\fsdata"); // // stateBackend.resolveCheckpoint("D:\\fsdata\\dda9ae98c2b864ba8448d2c5eee2e5c3\\chk-6"); // 固定延遲重啟(最多嘗試3次,每次間隔10s) // exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 10000L)); // 失敗率重啟(在10分鐘內最多嘗試3次,每次至少間隔1分鐘) // exeEnv.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10), // Time.minutes(1))); // exeEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000)); exeEnv.setStateBackend(stateBackend); // exeEnv.setDefaultSavepointDirectory(); exeEnv.setParallelism(2); // 表執行環境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(exeEnv, fsSettings); // 拼接souceDLL String sourceDDL = "CREATE TABLE pgsql_source (\n" + " id int,\n" + " name STRING,\n" + " py_code STRING,\n" + " seq_no int,\n" + " description STRING\n" + ") WITH (\n" + " 'connector' = 'postgres-cdc',\n" + " 'hostname' = '192.168.159.100',\n" + " 'port' = '5431',\n" + " 'username' = 'postgres',\n" + " 'password' = '123',\n" + " 'database-name' = 'postgres',\n" + " 'schema-name' = 'public',\n" + " 'debezium.snapshot.mode' = 'initial',\n" + " 'decoding.plugin.name' = 'pgoutput',\n" + " 'debezium.slot.name' = 'pgsql_source_li2',\n" + " 'table-name' = 'test_copy2_copy1'\n" + ")"; // 執行source表ddl tableEnv.executeSql(sourceDDL); String transformSQL = "select * from pgsql_source"; Table tableResult = tableEnv.sqlQuery(transformSQL); DataStream<Tuple2<Boolean, Row>> dataStream = tableEnv.toRetractStream(tableResult, Row.class); dataStream.print(); // StreamGraph graph = new StreamGraph() exeEnv.execute("jobname"); // 執行sink表ddl // 執行邏輯sql語句 // TableResult tableResult = tableEnv.executeSql(transformSQL); // tableEnv.execute(""); // 控制塔輸出 // tableResult.print(); } }說明兩個配置
debezium.snapshot.mode = 'initial'initial :默認設置,第一次啟動創建數據庫快照,后面根據記錄偏移量繼續讀
never:從不建立快照,如果本地無偏移量,從最后的log開始讀
always:每次啟動都建立快照
exporter: 功能和inintial相同,不同之處在于其不會對表上鎖,使用SET TRANSACTION ISOLATION LEVEL REPEATABLE READ,可重復讀的隔離級別
實現類io.debezium.connector.postgresql.snapshot.ExportedSnapshotter
custom :用戶自定義 快照,配合debezium.snapshot.custom.class使用
什么是快照?
之前說過wal日志實際上會刪除,因此單純讀wal日志,并不能讀到全數據庫的數據
,因此在第一次啟動flink程序時,需要對數據庫相應表做一個快照,將全表的數據拿到flink處理對應源碼位置io.debezium.connector.postgresql.spi.Snapshotter
可以看出快照需要鎖表(exporter除外),IN ACCESS SHARE MODE說明鎖表不影響插入讀寫,但是如果有全表更新操作, 會被阻塞。
開啟串行,只讀的事務。Snapshotter子類有這幾種配置的實現,有興趣的可以看看。
這就是flink-cdc創建的邏輯復制槽。
使用flink-cdc碰到的一些問題
1 能不能保證EXACTLY_ONCE一致性要求
假設在默認snapshot.mode=initial下在第一次啟動程序時,會對數據路進行快照讀,讀取當前全量數據,后面根據記錄的偏移量繼續讀取數據,這樣既不丟失數據,也不重復讀,是保證了EXACTLY_ONCE一致性的。即使flink程序重啟,在啟動的時候指定savePoint Path也可以繼續之前的偏移量,讀取到未接收的數據。
這里分享一個技巧,flink在本地Idea運行沒有提供設置savepPoint的方法。
***org.apache.flink.client.deployment.executors.LocalExecutor#execute***方法中斷點設置
2 在快照時數據一股腦讀進flink,事件時間語義下,數據會不會亂序
如果我們對數據開窗計算,那么亂序可能導致窗口提前關閉導致數據丟失,而在對表做快照時,會將全表數據全部拿到flink處理,就很可能導致亂序數據產生,那么flink-cdc有沒有解決這個問題呢,我們知道waterMark時周期成的(源碼位置org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator#onProcessingTime),
一種解決思路時,在waterMark還沒生成之前,將全部快照數據處理掉,那么就不會丟失數據。
對于單一slot是單線程處理任務的,如果突然來了一批數據,那么生成waterMark的任務必須等到這批數據全部處理完畢才能繼續。因此在批數據未處理完畢時,盡管批數據亂序,但是不存在窗口關閉,丟失數據問題
如果有多并行度多槽生成watermrk呢?
多并行度情況下,數據被分散到多個槽,并且不再是一次處理一批數據,處理數據和waterMark會一起生成,如果一次讀一批,就可能會有丟數據的風險
因此從讀數據源到設置waterMark設置單并行度1,那么就可以避免數據亂序導致的丟失數據問題
另一種思路也很簡單,在數據庫做快照時,對數據庫進行排序。我們來看看flink-cdc有沒有提供類似的接口。
看下io.debezium.connector.postgresql.spi.Snapshotter#buildSnapshotQuery快照查詢的sql
public Optional<String> buildSnapshotQuery(TableId tableId) { StringBuilder q = new StringBuilder(); q.append("SELECT * FROM "); q.append(tableId.toDoubleQuotedString()); return Optional.of(q.toString()); }很遺憾,并未提供排序的接口。
但是就沒辦法了嗎?
還記得之前的自定義快照custom嗎。
繼承InitialSnapshotter自定義快照做簡單排序
配置改一下
'debezium.snapshot.mode' = 'custom''debezium.snapshot.custom.class' = 'xxx.OrderSnapShoter'總結
以上是生活随笔為你收集整理的整合flink-cdc实现实时读postgrasql的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 解决VS报表.rdl 显示乱码“小方块”
- 下一篇: json怎么转为HTML并显示,将JSO