flink mysql connector_Flink JDBC Connector:Flink 与数据库集成最佳实践
整理:陳政羽(Flink 社區志愿者)
摘要:Flink 1.11 引入了 CDC,在此基礎上, JDBC Connector 也發生比較大的變化,本文由?Apache Flink Contributor,阿里巴巴高級開發工程師徐榜江(雪盡)分享,主要介紹 Flink 1.11 JDBC Connector 的最佳實踐。大綱如下:
JDBC connector
JDBC Catalog
JDBC Dialect
Demo
Tips:點擊下方鏈接可查看作者原版 PPT 及分享視頻:
https://flink-learning.org.cn/developers/flink-training-course3/
JDBC-Connector 的重構
JDBC Connector 在 Flink 1.11 版本發生了比較大的變化,我們先從以下幾個 Feature 來具體了解一下 Flink 社區在這個版本上對 JDBC 所做的改進。
FLINK-15782 :Rework JDBC Sinks[1] (重寫 JDBC Sink)
這個 issue 主要為 DataStream API 新增了 JdbcSink,對于使用 DataStream 編程的用戶會更加方便地把數據寫入到 JDBC;并且規范了一些命名規則,以前命名使用的是 JDBC 加上連接器名稱,目前命名規范為 Jdbc+ 連接器名稱
FLINK-17537:Refactor flink-jdbc connector structure[2] (重構 flink-jdbc 連接器的結構)
這個 issue 將 flink-jdbc 包名重命名為 flink-connector-jdbc,與 Flink 的其他 connector 統一,將所有接口和類從 org.apache.flink.java.io.jdbc(舊包)規范為新包路徑 org.apache.flink.connector.jdbc(新包),通過這種重命名用戶在對底層源代碼的閱讀上面會更加容易的理解和統一。
FLIP-95: New TableSource and TableSink interfaces[3] (新的 TableSource 和 TableSink 接口)
由于早期數據類型系統并不是很完善,導致了比較多的 Connector 在使用上會經常報數據類型相關的異常,例如 DECIMAL 精度類型,在以往的 Flink 1.10 版本中有可能出現下圖問題:
基于 FLIP-95 新的 TableSource 和 TableSink 在精度支持方面做了重構,目前數據精度方面的支持已經很完善了。
FLIP-122:New Connector Property Keys for New Factory[4](新的連接器參數)
在 Flink 1.11 版本中,我們對 DDL 的 WITH 參數相對于 1.10 版本做了簡化,從用戶視角看上就是簡化和規范了參數,如表格所示:
Old Key (Flink 1.10)
New Key (Flink 1.11)
connector.type
connector.type
connector.url
url
connector.table
table-name
connector.driver
driver
connector.username
username
connector.password
password
connector.read.partition.column
scan.partition.column
connector.read.partition.num
scan.partition.num
connector.read.partition.lower-bound
scan.partition.lower-bound
connector.read.partition.upper-bound
scan.partition.upper-bound
connector.read.fetch-size
scan.fetch-size
connector.lookup.cache.max-rows
lookup.cache.max-rows
connector.lookup.cache.ttl
lookup.cache.ttl
connector.lookup.max-retries
lookup.max-retries
connector.write.flush.max-rows
sink.buffer-flush.max-rows
connector.write.flush.interval
sink.buffer-flush.interval
connector.write.max-retries
sink.max-retries
大家可以看到表格中有 3 個標紅的地方,這個是相對于 1.10 有發生變化比較多的地方。這次 FLIP 希望進一步簡化連接器屬性,以便使屬性更加簡潔和可讀,并更好地與 FLIP-107 協作。如果需要了解更多的 Connector 參數可以進一步參考官方文檔和 FLIP-122 中提到的改變,這樣有助于從舊版本遷移到新版本并了解參數的變化。
FLIP-87:Primary key Constraints in Table API[5] (Table API 接口中的主鍵約束問題)
Flink 1.10 存在某些 Query 無法推斷出主鍵導致無法進行 Upsert 更新操作(如下圖所示錯誤)。所以在 FLIP-87 中為 Flink SQL 引入的 Primary Key 約束。Flink 的主鍵約束遵循 SQL 標準,主鍵約束分為 PRIMARY KEY NOT ENFORCED 和 PRIMARY KEY ENFORCED, ENFORCED 表示是否對數據進行校驗。我們常見數據庫的主鍵約束屬于 PRIMARY KEY ENFORCED,會對數據進行校驗。因為 Flink 并不持有數據,因此 Flink 支持的主鍵模式是 PRIMARY KEY NOT ENFORCED, ?這意味著 Flink 不會校驗數據,而是由用戶確保主鍵的完整性。例如 HBase 里面對應的主鍵應該是 RowKey,在 MySQL 中對應的主鍵是在用戶數據庫的表中對應的主鍵。
JDBC Catalog
目前 Flink 支持 Catalog 主要有 JDBC Catalog 和 Hive Catalog 。在關系數據庫中的表,如果要在 Flink 中使用,用戶需要手動寫表的 DDL,一旦表的 Schema 發生改變,用戶需要手動修改, 這是比較繁瑣的事情。JDBC Catalog 提供了接口用于連接到各種關系型數據庫,使得 Flink 能夠自動檢索表,不用用戶手動輸入和修改。目前 JDBC Catalog 內置目前實現了 Postgres Catalog。Postgres catalog 是一個 read-only (只讀)的 Catalog,只支持讀取 Postgres 表,支持的功能比較有限。下面代碼展示了目前 Postgres catalog 支持的 6 個功能:數據庫是否存在、數據庫列表、獲取數據庫、根據數據庫名獲取表列表、獲得表、表是否存在。
// The supported methods by Postgres Catalog.PostgresCatalog.databaseExists(String databaseName)PostgresCatalog.listDatabases()PostgresCatalog.getDatabase(String databaseName)PostgresCatalog.listTables(String databaseName)PostgresCatalog.getTable(ObjectPath tablePath)PostgresCatalog.tableExists(ObjectPath tablePath)
如果需要支持其他 DB (如 MySQL),需要用戶根據 FLIP-93 的 JdbcCatalog 接口實現對應不同的 JDBC Catalog。
JDBC Dialect
什么是 Dialect?
Dialect (方言)對各個數據庫來說,Dialect 體現各個數據庫的特性,比如語法、數據類型等。如果需要查看詳細的差異,可以點擊這里[6]查看詳細差異。下面通過對比 MySQL 和 Postgres 的一些常見場景舉例:
Dialect
MySQL
Postgres
場景描述
Grammar(語法)
LIMIT 0,30
WITH LIMIT 30 OFFSET 0
分頁
Data Type (數據類型)
BINARY
BYTEA,ARRAY
字段類型
Command (命令)
show tables
dt
查看所有表
在數據類型上面,Flink SQL 的數據類型目前映射規則如下:
MySQL type
PostgreSQL type
Flink SQL type
TINYINT
TINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT2
SMALLSERIAL
SERIAL2
SMALLINT
INT
MEDIUMINT
SMALLINT
UNSIGNED
INTEGER
SERIAL
INT
BIGINT
INT
UNSIGNED
BIGINT
BIGSERIAL
BIGINT
BIGINT
評論留言
還沒有評論留言,趕緊來搶樓吧~~
吐槽小黑屋()
* 這里是“吐槽小黑屋”,所有人可看,只保留當天信息。
Erlo.vip2021-02-06 19:01:47Hello、歡迎使用吐槽小黑屋,這就是個吐槽的地方。
回車鍵發送
當前在線 17 人 源碼約 75909篇
返回頂部
給這篇文章打個標簽吧~
棒極了
糟糕透頂
好文章
PHP
JAVA
JS
小程序
Python
SEO
MySql
確認
總結
以上是生活随笔為你收集整理的flink mysql connector_Flink JDBC Connector:Flink 与数据库集成最佳实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql二进制文件复制_MySQL 主
- 下一篇: hbase shell 查看列名_hba