1.18.3.Flink Catalog介绍、Catalog 定义、Catalog 的实现、Catalog 使用举例
1.18.3.Flink Catalog介紹
1.18.3.1.引言
1.18.3.2.Catalog 定義
1.18.3.3.Catalog 的實現
1.18.3.4.Catalog 使用舉例
1.18.3.Flink Catalog介紹
1.18.3.1.引言
以下轉自:http://legendtkl.com/2020/07/26/flink-catalog/
這篇文章我們介紹了一下 Flink 的 Catalog,基于 Flink 1.11,熟悉 Flink 或者 Spark 等大數據引擎的同學應該都知道這兩個計算引擎都有一個共同的組件叫 Catalog。下面是 Flink 的 Catalog 的官方定義。
Catalog 提供了元數據信息,例如數據庫、表、分區、視圖以及數據庫或其他外部系統中存儲的函數和信息。數據處理最關鍵的方面之一是管理元數據。 元數據可以是臨時的,例如臨時表、或者通過TableEnvironment注冊的 UDF。 元數據也可以是持久化的,例如 Hive Metastore 中的元數據。 Catalog提供了一個統一的API,用于管理元數據,并使其可以從Table API和SQL查詢語句中來訪問。簡單來說,Catalog 就是元數據管理中心,其中元數據包括數據庫、表、表結構等信息。
1.18.3.2.Catalog 定義
Flink 的 Catalog 相關代碼定義在 catalog.java 文件中,是一個 interface,如下。
/*** This interface is responsible for reading and writing metadata such as database/table/views/UDFs* from a registered catalog. It connects a registered catalog and Flink's Table API.*/ @PublicEvolving public interface Catalog {... }既然是interface,我們來看一下支持的操作。
我們可以將這些接口做一個簡單的分類。
Database 相關操作
?getDefaultDataBase:獲取默認的 database
?getDatabase:獲取特定的 database
?listDatabases:列出所有的 database
?databaseExists:判斷 database 是否存在
?createDatabases:創建 database
?dropDatabases:刪除 database
?alterDatabases:修改 database
Table 相關操作,一般都會有個參數是database
?listTables:列出所有的 table 和 view
?getTable:獲取指定的 table 或者 view
?tableExist:判斷 table 或者 view 是否存在
?dropTable:刪除 table 或者 view
?createTable:創建 table 或者 view
?renameTable:重命名 table 或者 view
?alterTable:修改 table 或者 view
View 相關操作,除了和 table 共用方法外,還有一個獨有的方法。
?listViews:列出所有的 view
Partition 相關操作,partition 是 table 的一個屬性,所以參數一般都會帶有 table 信息。
?listPartition:列出 table 的所有 partition
?getPartition:獲取指定的 partition
?partitionExist:判斷 parition 是否存在
?createPartition:創建 partition
?dropPartition:刪除 partition
?alterPartition:修改 parition
Function 相關操作,這里的 function 知道的是用戶自定義的 function,也就是 Udf。
?listFunctions:列出所有的 function
?getFunction:獲取指定的 func
?functionExist:判斷 function 是否存在
?dropFunction:刪除 function
?alterFunction:修改 function
1.18.3.3.Catalog 的實現
從上圖我們可以看到 Catalog 的最終實現有三個類:
?HiveCatalog:使用 Hive 的元數據來作為 Flink 的 HiveCatalog
?GenericInMemoryCatalog:使用內存實現 Catalog
?JdbcCatalog:使用其他支持 jdbc 協議的關系型數據庫來存儲元數據
?PostgresCatalog:使用 Postgres 數據庫來作為 Catalog 存儲元數據
1.18.3.4.Catalog 使用舉例
下面的示例是 Flink SQL 使用 Catalog 的示例。
TableEnvironment tableEnv = ...// Create a HiveCatalog Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");// Register the catalog tableEnv.registerCatalog("myhive", catalog);// Create a catalog database tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");// Create a catalog table tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");tableEnv.listTables(); // should return the tables in current catalog and database.下面是 api 的方式來使用 Catalog
import org.apache.flink.table.api.*; import org.apache.flink.table.catalog.*; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.descriptors.Kafka;TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());// Create a HiveCatalog Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");// Register the catalog tableEnv.registerCatalog("myhive", catalog);// Create a catalog database catalog.createDatabase("mydb", new CatalogDatabaseImpl(...));// Create a catalog table TableSchema schema = TableSchema.builder().field("name", DataTypes.STRING()).field("age", DataTypes.INT()).build();catalog.createTable(new ObjectPath("mydb", "mytable"),new CatalogTableImpl(schema,new Kafka().version("0.11").....startFromEarlist().toProperties(),"my comment"),false);List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"1.18.3.5.自定義Catalog
Catalog 是可擴展的,用戶可以通過實現 Catalog 接口來開發自定義 Catalog。 想要在 SQL CLI 中使用自定義 Catalog,用戶除了需要實現自定義的 Catalog 之外,還需要為這個 Catalog 實現對應的 CatalogFactory 接口。
CatalogFactory 定義了一組屬性,用于 SQL CLI 啟動時配置 Catalog。 這組屬性集將傳遞給發現服務,在該服務中,服務會嘗試將屬性關聯到 CatalogFactory 并初始化相應的 Catalog 實例。
1.18.3.6.總結
這篇文章寫的比較簡單,相當于自己的學習筆記,下一篇文章我們比較一下Spark 的 Catalog實現。
總結
以上是生活随笔為你收集整理的1.18.3.Flink Catalog介绍、Catalog 定义、Catalog 的实现、Catalog 使用举例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 1.18.2.10 解释表:Table.
- 下一篇: 深圳软银支付有牌照吗