1.18.2.5.Table APISQL(查询表、Table API、SQL、混用Table API和SQL、输出表、翻译与执行查询、Blink planner、Old planner)等
1.18.2.5.查詢表
1.18.2.5.1.Table API
1.18.2.5.2.SQL
1.18.2.5.3.混用Table API和SQL
1.18.2.6.輸出表
1.18.2.7.翻譯與執行查詢
1.18.2.7.1.Blink planner
1.18.2.7.2.Old planner
1.18.2.5.查詢表
1.18.2.5.1.Table API
Table API 是關于 Scala 和 Java 的集成語言式查詢 API。與 SQL 相反,Table API 的查詢不是由字符串指定,而是在宿主語言中逐步構建。
Table API 是基于 Table 類的,該類表示一個表(流或批處理),并提供使用關系操作的方法。這些方法返回一個新的 Table 對象,該對象表示對輸入 Table 進行關系操作的結果。 一些關系操作由多個方法調用組成,例如 table.groupBy(…).select(),其中 groupBy(…) 指定 table 的分組,而 select(…) 在 table 分組上的投影。
package com.toto.demo.sql;
1.18.2.5.2.SQL
Flink SQL 是基于實現了SQL標準的 Apache Calcite 的。SQL 查詢由常規字符串指定。
文檔 SQL 描述了Flink對流處理和批處理表的SQL支持。
下面的示例演示了如何指定查詢并將結果作為 Table 對象返回。
Java代碼版本:
package com.toto.demo.sql;import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment;public class Demo {public static void main(String[] args) {EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// get a TableEnvironmentTableEnvironment tableEnv = TableEnvironment.create(bsSettings);// register Orders table// compute revenue for all customers from FranceTable revenue = tableEnv.sqlQuery("SELECT cID, cName, SUM(revenue) AS revSum " +"FROM Orders " +"WHERE cCountry = 'FRANCE' " +"GROUP BY cID, cName");// emit or convert Table// execute query}}Scala版本
package com.toto.learn.sqlimport org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}object Demo {def main(args: Array[String]): Unit = {val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()val tableEnv: TableEnvironment = TableEnvironment.create(bbSettings)val revenue = tableEnv.sqlQuery("""|SELECT cID, cName, SUM(revenue) AS revSum|FROM Orders|WHERE cCountry = 'FRANCE'|GROUP BY cID,cName""".stripMargin)// emit or convert Table// execute query}}如下的示例展示了如何指定一個更新查詢,將查詢的結果插入到已注冊的表中。
package com.toto.demo.sql;import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment;public class Demo {public static void main(String[] args) {EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// get a TableEnvironmentTableEnvironment tableEnv = TableEnvironment.create(bsSettings);// register "Orders" table// register "RevenueFrance" output table// compute revenue for all customers from France and emit to "RevenueFrance"tableEnv.executeSql("INSERT INTO RevenueFrance " +"SELECT cID, cName, SUM(revenue) AS revSum " +"FROM Orders " +"WHERE cCountry = 'FRANCE' " +"GROUP BY cID, cName");}}1.18.2.5.3.混用Table API和SQL
Table API和SQL查詢的混用非常簡單因為它們都返回 Table 對象:
?可以在SQL查詢返回的Table對象上定義Table API查詢。
?在TableEnvironment中注冊的結果表可以在 SQL查詢的FROM子句中引用,通過這種方法就可以在 Table API 查詢的結果上定義SQL查詢。
1.18.2.6.輸出表
Table通過寫入 TableSink 輸出。TableSink是一個通用接口,用于支持多種文件格式(如 CSV、Apache Parquet、Apache Avro)、存儲系統(如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息隊列系統(如Apache Kafka、RabbitMQ)。
批處理Table只能寫入 BatchTableSink,而流處理 Table 需要指定寫入 AppendStreamTableSink,RetractStreamTableSink 或者 UpsertStreamTableSink。
請參考文檔 Table Sources & Sinks (https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sourceSinks.html)以獲取更多關于可用 Sink 的信息以及如何自定義 TableSink。
方法Table.executeInsert(String tableName) 將Table發送至已注冊的TableSink。該方法通過名稱在catalog中查找TableSink并確認Table schema和TableSink schema一致。
下面的示例演示如何輸出 Table:
package com.toto.demo.sql;
import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.OldCsv; import org.apache.flink.table.descriptors.Schema;public class Demo {public static void main(String[] args) {EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();// get a TableEnvironmentTableEnvironment tableEnv = TableEnvironment.create(bsSettings);// create an output Tablefinal Schema schema = new Schema().field("a", DataTypes.INT()).field("b", DataTypes.STRING()).field("c", DataTypes.BIGINT());tableEnv.connect(new FileSystem().path("/path/to/file")).withFormat(new OldCsv().fieldDelimiter("|").deriveSchema()).withSchema(schema).createTemporaryTable("CsvSinkTable");// compute a result Table using Table API operators and/or SQL queriesTable result = ...// emit the result Table to the registered TableSinkresult.executeInsert("CsvSinkTable");}}1.18.2.7.翻譯與執行查詢
兩種計劃器翻譯和執行查詢的方式是不同的。
1.18.2.7.1.Blink planner
不論輸入數據源是流式的還是批式的,Table API 和 SQL 查詢都會被轉換成 DataStream 程序。查詢在內部表示為邏輯查詢計劃,并被翻譯成兩個階段:
1.優化邏輯執行計劃
2.翻譯成DataStream程序
?當 TableEnvironment.executeSql() 被調用時。該方法是用來執行一個 SQL 語句,一旦該方法被調用, SQL 語句立即被翻譯。
?當 Table.executeInsert() 被調用時。該方法是用來將一個表的內容插入到目標表中,一旦該方法被調用, TABLE API 程序立即被翻譯。
?當 Table.execute() 被調用時。該方法是用來將一個表的內容收集到本地,一旦該方法被調用, TABLE API 程序立即被翻譯。
?當 StatementSet.execute() 被調用時。Table (通過 StatementSet.addInsert() 輸出給某個 Sink)和 INSERT 語句 (通過調用 StatementSet.addInsertSql())會先被緩存到 StatementSet 中,StatementSet.execute() 方法被調用時,所有的 sink 會被優化成一張有向無環圖。
?當 Table 被轉換成 DataStream 時(參閱與 DataStream 和 DataSet API 結合)。轉換完成后,它就成為一個普通的 DataStream 程序,并會在調用 StreamExecutionEnvironment.execute() 時被執行。
注意:從1.11版本開始,sqlUpdate方法和insertInfo方法被廢棄,從這兩個方法構建的Table程序必須通過StreamTableEnvironment.execute()方法執行,而不能通過StreamExecutionEnvironment.execute()方法來執行。
1.18.2.7.2.Old planner
Table API 和 SQL 查詢會被翻譯成 DataStream 或者 DataSet 程序, 這取決于它們的輸入數據源是流式的還是批式的。查詢在內部表示為邏輯查詢計劃,并被翻譯成兩個階段:
1.優化邏輯執行計劃
2.翻譯成DataStream或DataSet程序。
Table API 或者 SQL 查詢在下列情況下會被翻譯:
?當 TableEnvironment.executeSql() 被調用時。該方法是用來執行一個 SQL 語句,一旦該方法被調用, SQL 語句立即被翻譯。
?當 Table.executeInsert() 被調用時。該方法是用來將一個表的內容插入到目標表中,一旦該方法被調用, TABLE API 程序立即被翻譯。
?當 Table.execute() 被調用時。該方法是用來將一個表的內容收集到本地,一旦該方法被調用, TABLE API 程序立即被翻譯。
?當 StatementSet.execute() 被調用時。Table (通過 StatementSet.addInsert() 輸出給某個 Sink)和 INSERT 語句 (通過調用 StatementSet.addInsertSql())會先被緩存到 StatementSet 中,StatementSet.execute() 方法被調用時,所有的 sink 會被優化成一張有向無環圖。
?對于 Streaming 而言,當Table 被轉換成 DataStream 時(參閱與 DataStream 和 DataSet API 結合)觸發翻譯。轉換完成后,它就成為一個普通的 DataStream 程序,并會在調用 StreamExecutionEnvironment.execute() 時被執行。對于 Batch 而言,Table 被轉換成 DataSet 時(參閱與 DataStream 和 DataSet API 結合)觸發翻譯。轉換完成后,它就成為一個普通的 DataSet 程序,并會在調用 ExecutionEnvironment.execute() 時被執行。
注意 :從 1.11 版本開始,sqlUpdate 方法 和 insertInto 方法被廢棄。對于 Streaming 而言,如果一個 Table 程序是從這兩個方法構建出來的,必須通過 StreamTableEnvironment.execute() 方法執行,而不能通過 StreamExecutionEnvironment.execute() 方法執行;對于 Batch 而言,如果一個 Table 程序是從這兩個方法構建出來的,必須通過 BatchTableEnvironment.execute() 方法執行,而不能通過 ExecutionEnvironment.execute() 方法執行。
總結
以上是生活随笔為你收集整理的1.18.2.5.Table APISQL(查询表、Table API、SQL、混用Table API和SQL、输出表、翻译与执行查询、Blink planner、Old planner)等的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 荷军如何应对郑成功收复台湾
- 下一篇: 重庆参观军舰怎么预约?