生活随笔
收集整理的這篇文章主要介紹了
Flink示例——Flink-CDC
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
Flink示例——Flink-CDC
版本信息
產(chǎn)品版本
| Flink | 1.11.1 |
| flink-cdc-connectors | 1.1.0 |
| Java | 1.8.0_231 |
| MySQL | 5.7.16 |
注意:官方說目前支持MySQL-5.7和8,但筆者還簡單測試過mariadb-10.0.38(對應MySQL-5.6)。包括增加、刪除、更新、聚合,目前皆可用,但不排除未知問題。
Mavan依賴
pom.xml 依賴部分
<properties><project
.build
.sourceEncoding
>UTF
-8</project
.build
.sourceEncoding
><flink.version>1.11.1</flink
.version
>
</properties
><dependencies><dependency><groupId>org
.apache
.flink
</groupId
><artifactId>flink
-table
-common
</artifactId
><version>$
{flink
.version
}</version
></dependency
><dependency><groupId>org
.apache
.flink
</groupId
><artifactId>flink
-java
</artifactId
><version>$
{flink
.version
}</version
></dependency
><dependency><groupId>org
.apache
.flink
</groupId
><artifactId>flink
-clients_2
.11</artifactId
><version>$
{flink
.version
}</version
></dependency
><dependency><groupId>org
.apache
.flink
</groupId
><artifactId>flink
-streaming
-java_2
.11</artifactId
><version>$
{flink
.version
}</version
></dependency
><dependency><groupId>org
.apache
.flink
</groupId
><artifactId>flink
-table
-api
-java
-bridge_2
.11</artifactId
><version>$
{flink
.version
}</version
></dependency
><dependency><groupId>org
.apache
.flink
</groupId
><artifactId>flink
-table
-api
-java
</artifactId
><version>$
{flink
.version
}</version
></dependency
><dependency><groupId>org
.apache
.flink
</groupId
><artifactId>flink
-table
-planner
-blink_2
.11</artifactId
><version>$
{flink
.version
}</version
></dependency
><dependency><groupId>org
.apache
.flink
</groupId
><artifactId>flink
-table
-planner
-blink_2
.11</artifactId
><version>$
{flink
.version
}</version
><type>test
-jar
</type
></dependency
><!-- Flink
-CDC
--><dependency><groupId>com
.alibaba
.ververica
</groupId
><artifactId>flink
-connector
-mysql
-cdc
</artifactId
><version>1.1.0</version
></dependency
></dependencies
>
主從同步配置、數(shù)據(jù)準備
- 關閉MySQL服務
- 在需要被同步的MySQL節(jié)點,添加如下配置(可供參考的文檔)
[mysqld
]
server
-id
= 12345
log
-bin
= mysql
-bin
binlog_format
= ROW
binlog_row_image
= FULL
expire_logs_days
= 10
- 啟動MySQL服務
- 使用如下命令,可查看binlog相關變量配置
SHOW VARIABLES
LIKE '%binlog%';
- 創(chuàng)建待測試的庫、表、數(shù)據(jù)
CREATE DATABASE db_inventory_cdc
;CREATE TABLE tb_products_cdc
(id
INT PRIMARY KEY AUTO_INCREMENT,name
VARCHAR(64),description
VARCHAR(128)
);INSERT INTO tb_products_cdc
VALUES (DEFAULT, 'zhangsan', 'aaa'),(DEFAULT, 'lisi', 'bbb'),(DEFAULT, 'wangwu', 'ccc');
- 創(chuàng)建用于同步的用戶,并給予權限(可供參考的文檔)
CREATE USER 'flinkuser' IDENTIFIED
BY 'flinkpassword';
GRANT SELECT, RELOAD
, SHOW DATABASES, REPLICATION SLAVE
, REPLICATION CLIENT
ON *.* TO 'flinkuser';
使用Flink-CDC
- sql-client 方面,官方已經(jīng)給出了示例,點擊查看
- 編碼方式,方便提交jar包,示例如下
import org
.apache
.flink
.streaming
.api
.environment
.StreamExecutionEnvironment
;
import org
.apache
.flink
.table
.api
.EnvironmentSettings
;
import org
.apache
.flink
.table
.api
.TableResult
;
import org
.apache
.flink
.table
.api
.bridge
.java
.StreamTableEnvironment
;
import org
.apache
.flink
.table
.planner
.factories
.TestValuesTableFactory
;
public class FlinkCDCSQLTest {public static void main(String
[] args
) throws Exception
{EnvironmentSettings fsSettings
= EnvironmentSettings
.newInstance().useBlinkPlanner().inStreamingMode().build();StreamExecutionEnvironment env
= StreamExecutionEnvironment
.getExecutionEnvironment();env
.setParallelism(1);StreamTableEnvironment tableEnv
= StreamTableEnvironment
.create(env
, fsSettings
);String sourceDDL
="CREATE TABLE mysql_binlog (\n" +" id INT NOT NULL,\n" +" name STRING,\n" +" description STRING\n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'username' = 'flinkuser',\n" +" 'password' = 'flinkpassword',\n" +" 'database-name' = 'db_inventory_cdc',\n" +" 'table-name' = 'tb_products_cdc'\n" +")";String sinkDDL
="CREATE TABLE tb_sink (\n" +" name STRING,\n" +" countSum BIGINT,\n" +" PRIMARY KEY (name) NOT ENFORCED\n" +") WITH (\n" +" 'connector' = 'print'\n" +")";String transformSQL
="INSERT INTO tb_sink " +"SELECT name, COUNT(1) " +"FROM mysql_binlog " +"GROUP BY name";tableEnv
.executeSql(sourceDDL
);tableEnv
.executeSql(sinkDDL
);TableResult result
= tableEnv
.executeSql(transformSQL
);waitForSnapshotStarted("tb_sink");result
.print();result
.getJobClient().get().cancel().get();}private static void waitForSnapshotStarted(String sinkName
) throws InterruptedException
{while (sinkSize(sinkName
) == 0) {Thread
.sleep(100);}}private static int sinkSize(String sinkName
) {synchronized (TestValuesTableFactory
.class) {try {return TestValuesTableFactory
.getRawResults(sinkName
).size();} catch (IllegalArgumentException e
) {return 0;}}}}
簡單的測試
- 進行簡單測試,開始修改MySQL表的數(shù)據(jù)
INSERT INTO tb_products_cdc
VALUE(DEFAULT, 'lisi', 'ddd');DELETE FROM tb_products_cdc
WHERE id
=4;UPDATE tb_products_cdc
SET name
='wangwu' WHERE id
=2;
- 執(zhí)行一條SQL,查看一下Flink的結果變化
總結
以上是生活随笔為你收集整理的Flink示例——Flink-CDC的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。