1.18.2.Table APISQL(概念与通用API、两种计划器(Planner)的主要区别、创建 TableEnvironment、临时表、永久表、创建表、虚拟表、Connector 等)
1.18.2.概念與通用API
1.18.2.1.兩種計劃器(Planner)的主要區別:
1.18.2.2.Table API和SQL程序的結構
1.18.2.3.創建 TableEnvironment
1.18.2.4.在Catalog中創建表
1.18.2.4.1.臨時表(Temporary Table)和永久表(Permanent Table)
1.18.2.4.1.1.屏蔽(Shadowing)
1.18.2.4.2.創建表
1.18.2.4.2.1.虛擬表
1.18.2.4.2.2.Connector Tables
1.18.2.4.3.擴展表標識符
1.18.2.概念與通用API
Table API 和 SQL 集成在同一套 API 中。這套 API 的核心概念是Table,用作查詢的輸入和輸出。本文介紹了 Table API 和 SQL 查詢程序的通用結構、如何注冊 Table 、如何查詢 Table 以及如何輸出 Table。
1.18.2.1.兩種計劃器(Planner)的主要區別:
1.Blink將批處理作業視作流處理的一種特例。嚴格來說,Table 和 DataSet 之間不支持相互轉換,并且批處理作業也不會轉換成 DataSet 程序而是轉換成 DataStream 程序,流處理作業也一樣。
2.Blink 計劃器不支持 BatchTableSource,而是使用有界的 StreamTableSource 來替代。
3.舊計劃器和 Blink 計劃器中 FilterableTableSource 的實現是不兼容的。舊計劃器會將 PlannerExpression 下推至 FilterableTableSource,而 Blink 計劃器則是將 Expression 下推。
4.基于字符串的鍵值配置選項僅在 Blink 計劃器中使用。(詳情參見 配置 )
5.PlannerConfig 在兩種計劃器中的實現(CalciteConfig)是不同的。
6.Blink 計劃器會將多sink(multiple-sinks)優化成一張有向無環圖(DAG),TableEnvironment 和 StreamTableEnvironment 都支持該特性。舊計劃器總是將每個sink都優化成一個新的有向無環圖,且所有圖相互獨立。
7.舊計劃器目前不支持 catalog 統計數據,而 Blink 支持。
1.18.2.2.Table API和SQL程序的結構
所有用于批處理和流處理的 Table API 和 SQL 程序都遵循相同的模式。下面的代碼示例展示了 Table API 和 SQL 程序的通用結構。
其中pom.xml添加的依賴如下:
<!-- 取決于你使用的編程語言,選擇Java或者Scala API來構建你的Table API和SQL程序 --> <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.12.0</version><!--<scope>provided</scope>--> </dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.12.0</version><!--<scope>provided</scope>--> </dependency><!-- 如果你想在 IDE 本地運行你的程序,你需要添加下面的模塊,具體用哪個取決于你使用哪個 Planner --> <!-- Either... (for the old planner that was available before Flink 1.9) --> <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.12.0</version><!--<scope>provided</scope>--> </dependency><!-- or.. (for the new Blink planner) --> <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.12.0</version><!--<scope>provided</scope>--> </dependency><!-- 內部實現上,部分 table 相關的代碼是用 Scala 實現的。所以,下面的依賴也需要添加到你的程序里,不管是批式還是流式的程序: --> <dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.12.0</version><!--<scope>provided</scope>--> </dependency><!-- 如果你想實現自定義格式來解析Kafka數據,或者自定義函數,下面的依賴就足夠了,編譯出來的jar文件可以直接給SQL Client使用 --> <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.12.0</version><!--<scope>provided</scope>--> </dependency><build><plugins><!-- 編譯插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target><encoding>UTF-8</encoding><compilerVersion>${maven.compiler.source}</compilerVersion><showDeprecation>true</showDeprecation><showWarnings>true</showWarnings></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12.4</version><configuration><skipTests>${maven.test.skip}</skipTests></configuration></plugin><plugin><groupId>org.apache.rat</groupId><artifactId>apache-rat-plugin</artifactId><version>0.12</version><configuration><excludes><exclude>README.md</exclude></excludes></configuration></plugin><plugin><artifactId>maven-checkstyle-plugin</artifactId><version>2.17</version><executions><execution><id>verify</id><phase>verify</phase><configuration><configLocation>style/rmq_checkstyle.xml</configLocation><encoding>UTF-8</encoding><consoleOutput>true</consoleOutput><failsOnError>true</failsOnError><includeTestSourceDirectory>false</includeTestSourceDirectory><includeTestResources>false</includeTestResources></configuration><goals><goal>check</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-javadoc-plugin</artifactId><version>2.10.4</version><configuration><aggregate>true</aggregate><reportOutputDirectory>javadocs</reportOutputDirectory><locale>en</locale></configuration></plugin><!-- scala編譯插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(會包含所有依賴) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!-- 可以設置jar包的入口類(可選) --><mainClass></mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins> </build>Java代碼如下:
package com.toto.demo.sql;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult;/*** @author tuzuoquan** @description** TableAPIAndSQLStructure** @date 2021/1/28 10:27**/ public class TableAPIAndSQLStructure {public static void main(String[] args) {//Flink STREAMING QUERYEnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();// create a TableEnvironment for specific planner batch or streaming//StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);//或者使用下面的方式TableEnvironment tableEnv = TableEnvironment.create(fsSettings);// create an input TabletableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )");// register an output TabletableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");// create a Table object from a Table API queryTable table2 = tableEnv.from("table1").select(...);// create a Table object from a SQL queryTable table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");// emit a Table API result Table to a TableSink, same for SQL resultTableResult tableResult = table2.executeInsert("outputTable");tableResult...}}scala代碼如下:
package com.toto.learn.sqlimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}object TableAPIAndSQLStructure {def main(args: Array[String]): Unit = {//FLINK STREAMING QUERYval fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment//val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings)// 或使用下面的創建方式val tableEnv = TableEnvironment.create(fsSettings)// create a TableEnvironment for specific planner batch or streaming//val tableEnv = ... // see "Create a TableEnvironment" section// create an input TabletableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )")// register an output TabletableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )")// create a Table from a Table API queryval table2 = tableEnv.from("table1").select(...)// create a Table from a SQL queryval table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ...")// emit a Table API result Table to a TableSink, same for SQL resultval tableResult = table2.executeInsert("outputTable")tableResult...}}1.18.2.3.創建 TableEnvironment
TableEnvironment是Table API和SQL的核心概念。它負責:
1.在內部的catalog中注冊Table
2.注冊外部的catalog
3.加載可插拔模塊
4.執行SQL查詢
5.注冊自定義函數 (scalar、table 或 aggregation)
6.將 DataStream 或 DataSet 轉換成 Table
7.持有對 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用。
Table 總是與特定的 TableEnvironment 綁定。不能在同一條查詢中使用不同 TableEnvironment 中的表,例如,對它們進行 join 或 union 操作。
TableEnvironment 可以通過靜態方法 BatchTableEnvironment.create() 或者 StreamTableEnvironment.create() 在 StreamExecutionEnvironment 或者 ExecutionEnvironment 中創建,TableConfig 是可選項。TableConfig可用于配置TableEnvironment或定制的查詢優化和轉換過程(參見 查詢優化(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/common.html#query-optimization))。
請確保選擇與你的編程語言匹配的特定的計劃器BatchTableEnvironment/StreamTableEnvironment。
如果兩種計劃器的 jar 包都在 classpath 中(默認行為),你應該明確地設置要在當前程序中使用的計劃器。
JAVA版本
// ********************** // FLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); // or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);// ****************** // FLINK BATCH QUERY // ****************** import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);// ********************** // BLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);// ****************** // BLINK BATCH QUERY // ****************** import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment;EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);SCALA版本
// ********************** // FLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentval fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings) // or val fsTableEnv = TableEnvironment.create(fsSettings)// ****************** // FLINK BATCH QUERY // ****************** import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.table.api.bridge.scala.BatchTableEnvironmentval fbEnv = ExecutionEnvironment.getExecutionEnvironment val fbTableEnv = BatchTableEnvironment.create(fbEnv)// ********************** // BLINK STREAMING QUERY // ********************** import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentval bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) // or val bsTableEnv = TableEnvironment.create(bsSettings)// ****************** // BLINK BATCH QUERY // ****************** import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() val bbTableEnv = TableEnvironment.create(bbSettings)注意:如果/lib目錄中只有一種計劃器的 jar 包,則可以使用useAnyPlanner(python 使用 use any_u_planner)創建 EnvironmentSettings。
1.18.2.4.在Catalog中創建表
TableEnvironment 維護著一個由標識符(identifier)創建的表 catalog 的映射。標識符由三個部分組成:catalog 名稱、數據庫名稱以及對象名稱。如果 catalog 或者數據庫沒有指明,就會使用當前默認值。(參見表標識符擴展章節中的例子)。
Table 可以是虛擬的(視圖 VIEWS)也可以是常規的(表 TABLES)。視圖VIEWS可以從已經存在的Table中創建,一般是Table API或者SQL的查詢結果。表TABLES描述的是外部數據,例如文件、數據庫表或者消息隊列。
1.18.2.4.1.臨時表(Temporary Table)和永久表(Permanent Table)
表可以是臨時的,并與單個 Flink 會話(session)的生命周期相關,也可以是永久的,并且在多個 Flink 會話和群集(cluster)中可見。
永久表需要 catalog(例如 Hive Metastore)以維護表的元數據。一旦永久表被創建,它將對任何連接到 catalog 的 Flink 會話可見且持續存在,直至被明確刪除。
1.18.2.4.1.1.屏蔽(Shadowing)
可以使用與已存在的永久表相同的標識符去注冊臨時表。臨時表會屏蔽永久表,并且只要臨時表存在,永久表就無法訪問。所有使用該標識符的查詢都將作用于臨時表。
這可能對實驗(experimentation)有用。它允許先對一個臨時表進行完全相同的查詢,例如只有一個子集的數據,或者數據是不確定的。一旦驗證了查詢的正確性,就可以對實際的生產表進行查詢。
1.18.2.4.2.創建表
1.18.2.4.2.1.虛擬表
在 SQL 的術語中,Table API 的對象對應于視圖(虛擬表)。它封裝了一個邏輯查詢計劃。它可以通過以下方法在 catalog 中創建:
package com.toto.demo.sql;import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment;/*** @author tuzuoquan* @version 1.0* @ClassName CreateTable* @description TODO* @date 2021/1/29 10:24**/ public class CreateTable {public static void main(String[] args) {EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// get a TableEnvironmentTableEnvironment tableEnv = TableEnvironment.create(bsSettings);// table is the result of a simple projection queryTable projTable = tableEnv.from("X").select(...);// register the Table projTable as table "projectedTable"tableEnv.createTemporaryView("projectedTable", projTable);}}Scala代碼
package com.toto.learn.sqlimport org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment}object CreateTable {def main(args: Array[String]): Unit = {val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()val tableEnv = TableEnvironment.create(bbSettings)// table is the result of a simple projection query val projTable: Table = tableEnv.from("X").select(...)tableEnv.createTemporaryView("projectedTable",projTable)}}注意:從傳統數據庫系統的角度來看,Table對象與VIEW視圖非常像。也就是,定義了Table的查詢是沒有被優化的,而且會被內嵌到另一個引用了這個注冊了的Table的查詢中。如果多個查詢都引用了同一個注冊了的Table,那么它會被內嵌每個查詢中被執行多次,也就是說注冊了的Table的結果不會被共享(注:Blink計劃器的TableEnvironment會被優化成只執行了一次)。
1.18.2.4.2.2.Connector Tables
另外一個方式去創建 TABLE 是通過 connector 聲明。Connector 描述了存儲表數據的外部系統。存儲系統例如 Apache Kafka 或者常規的文件系統都可以通過這種方式來聲明。
java代碼:
tableEnvironment.connect(...).withFormat(...).withSchema(...).inAppendMode().createTemporaryTable("MyTable")Scala代碼:
tableEnvironment.connect(...).withFormat(...).withSchema(...).inAppendMode().createTemporaryTable("MyTable")1.18.2.4.3.擴展表標識符
表總是通過三元標識符注冊,包括 catalog 名、數據庫名和表名。
用戶可以指定一個 catalog 和數據庫作為 “當前catalog” 和”當前數據庫”。有了這些,那么剛剛提到的三元標識符的前兩個部分就可以被省略了。如果前兩部分的標識符沒有指定, 那么會使用當前的 catalog 和當前數據庫。用戶也可以通過 Table API 或 SQL 切換當前的 catalog 和當前的數據庫。
標識符遵循 SQL 標準,因此使用時需要用反引號(`)進行轉義。
Java代碼
Scala代碼
package com.toto.learn.sqlimport org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment}object Demo {def main(args: Array[String]): Unit = {val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()val tableEnv: TableEnvironment = TableEnvironment.create(bbSettings)tableEnv.useCatalog("custom_catalog")tableEnv.useDatabase("custom_database")val table: Table = tableEnv.sqlQuery("SELECT ... FROM table1 ...")// register the view named 'exampleView' in the catalog named 'custom_catalog'// in the database named 'custom_database'tableEnv.createTemporaryView("exampleView", table)// register the view named 'exampleView' in the catalog named 'custom_catalog'// in the database named 'other_database'tableEnv.createTemporaryView("other_database.exampleView", table)// register the view named 'example.View' in the catalog named 'custom_catalog'// in the database named 'custom_database'tableEnv.createTemporaryView("`example.View`", table)// register the view named 'exampleView' in the catalog named 'other_catalog'// in the database named 'other_database'tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table)}}總結
以上是生活随笔為你收集整理的1.18.2.Table APISQL(概念与通用API、两种计划器(Planner)的主要区别、创建 TableEnvironment、临时表、永久表、创建表、虚拟表、Connector 等)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ok卡怎么提现
- 下一篇: 荷军如何应对郑成功收复台湾