FlinkSQL实战开发
FlinkSQL實(shí)戰(zhàn)開發(fā)
1、基礎(chǔ)知識(shí)
FlinkSQL分為Table API和SQL API,是架構(gòu)于Flink Core之上用SQL予以方便快捷地進(jìn)行結(jié)構(gòu)化數(shù)據(jù)處理的上層庫(kù)。
- 工作流程
SQL和Table在進(jìn)入Flink以后轉(zhuǎn)化成統(tǒng)一的數(shù)據(jù)結(jié)構(gòu)表達(dá)形式,也就是邏輯計(jì)劃(logic plan),其中catalog提供元數(shù)據(jù)信息,用于后續(xù)的優(yōu)化,邏輯計(jì)劃是優(yōu)化的入門,經(jīng)過一系列規(guī)則后,F(xiàn)link把初始的邏輯計(jì)劃優(yōu)化為物理計(jì)劃(phy plan),物理計(jì)劃通過代碼構(gòu)造器翻譯為Transformation,最后轉(zhuǎn)換為工作圖(job graph)。
整個(gè)過程沒有單獨(dú)的流處理和批處理,因?yàn)榱魈幚砗团幚韮?yōu)化過程和擴(kuò)建都是共享的。
- 編程模型
創(chuàng)建Flink SQL運(yùn)行環(huán)境。
將數(shù)據(jù)源定義成表。
執(zhí)行SQL語(yǔ)義查詢。
將查詢結(jié)果輸出到目標(biāo)表中。
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<flink.version>1.15.2</flink.version>
<scala.version>2.12.2</scala.version>
<log4j.version>2.12.1</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink客戶端-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!--本地運(yùn)行的webUI-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink與kafka整合-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
<!--狀態(tài)后端-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
</dependency>
<!--日志系統(tǒng)-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.3.21</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep</artifactId>
<version>${flink.version}</version>
</dependency>
<!--json格式依賴-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!--csv格式依賴-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink CDC 的依賴 -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<!-- flink與File整合的依賴 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink On Hive-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.41</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
</dependency>
</dependencies>
- emp.txt數(shù)據(jù)
{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":345830400000,"sal":800.0,"comm":null,"deptno":20}
{"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":351446400000,"sal":1600.0,"comm":300.0,"deptno":30}
{"empno":7521,"ename":"WARD","job":"SALESMAN","mgr":7698,"hiredate":351619200000,"sal":1250.0,"comm":500.0,"deptno":30}
{"empno":7566,"ename":"JONES","job":"MANAGER","mgr":7839,"hiredate":354988800000,"sal":2975.0,"comm":null,"deptno":20}
{"empno":7654,"ename":"MARTIN","job":"SALESMAN","mgr":7698,"hiredate":370454400000,"sal":1250.0,"comm":1400.0,"deptno":30}
{"empno":7698,"ename":"BLAKE","job":"MANAGER","mgr":7839,"hiredate":357494400000,"sal":2850.0,"comm":null,"deptno":30}
{"empno":7782,"ename":"CLARK","job":"MANAGER","mgr":7839,"hiredate":360864000000,"sal":2450.0,"comm":null,"deptno":10}
{"empno":7788,"ename":"SCOTT","job":"ANALYST","mgr":7566,"hiredate":553100400000,"sal":3000.0,"comm":null,"deptno":20}
{"empno":7839,"ename":"KING","job":"PRESIDENT","mgr":null,"hiredate":374774400000,"sal":5000.0,"comm":null,"deptno":10}
{"empno":7844,"ename":"TURNER","job":"SALESMAN","mgr":7698,"hiredate":368726400000,"sal":1500.0,"comm":0.0,"deptno":30}
{"empno":7876,"ename":"ADAMS","job":"CLERK","mgr":7788,"hiredate":553100400000,"sal":1100.0,"comm":null,"deptno":20}
{"empno":7900,"ename":"JAMES","job":"CLERK","mgr":7698,"hiredate":376156800000,"sal":950.0,"comm":null,"deptno":30}
{"empno":7902,"ename":"FORD","job":"ANALYST","mgr":7566,"hiredate":376156800000,"sal":3000.0,"comm":null,"deptno":20}
{"empno":7934,"ename":"MILLER","job":"CLERK","mgr":7782,"hiredate":380563200000,"sal":1300.0,"comm":null,"deptno":10}
- JAVA代碼
public static void main(String[] args) throws Exception {
//快速入門
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment);
//讀取文本文件數(shù)據(jù)轉(zhuǎn)為Table 對(duì)象
DataStream<Emp> source = environment.readTextFile("data/emp.txt")
.map(lines ->JSONObject.parseObject(lines, Emp.class));
//把JAVA對(duì)象轉(zhuǎn)為table對(duì)象
//注意Emp對(duì)象中hiredate時(shí)間戳是Long類型
// {"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":351446400000,"sal":1600.0,"comm":300.0,"deptno":30}
Table table = tableEnv.fromDataStream(source);
table.select(Expressions.$("*")).execute().print();
}
2、Flink SQL編程概覽
- 運(yùn)行環(huán)境
TableEnvironment是Table API和SQL的核心概念:
- 內(nèi)部catalog中注冊(cè)Table
- 注冊(cè)外部的catalog
- 加載可插拔模式
- 執(zhí)行SQL查詢
- 注冊(cè)自定義函數(shù)(scalar table aggregation)
- DataStream和Table之間的轉(zhuǎn)換
Table與特定的TableEnvironment綁定,不能在同一條查詢中使用不同的TableEnvironment中的表。
輸入源流式還是批式,Table API和SQL查詢都會(huì)轉(zhuǎn)換成DataStream程序。
Table對(duì)象的標(biāo)識(shí)位:CataLog.DB.Table
- 創(chuàng)建方式一
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment);
- 創(chuàng)建方式二
EnvironmentSettings build = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment TabEnv = TableEnvironment.create(build);
- 創(chuàng)建表
標(biāo)識(shí)符由三個(gè)部分組成:catalog 名稱、數(shù)據(jù)庫(kù)名稱以及對(duì)象名稱。
如果catalog或者數(shù)據(jù)庫(kù)沒有指明,就會(huì)使用當(dāng)前默認(rèn)值。
Table可以是虛擬的(視圖views)也可以是常規(guī)的表Tables,其中視圖是臨時(shí)的存儲(chǔ)在內(nèi)存中,會(huì)話結(jié)束臨時(shí)表就消失,而tables表示永久化保存的外部數(shù)據(jù)物理表。
表分類:臨時(shí)表(僅存在flink會(huì)話中)永久表(元數(shù)據(jù)保存在catalog中)屏蔽特性(臨時(shí)表與永久表同名,臨時(shí)表存在永久表就無法訪問,刪除臨時(shí)表就可以訪問永久表)
- 案例
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment);
Table table = tabEnv.fromDataStream(source);
//Table table = tabEnv.fromDataStream(source,$("deptno").as("dno"));查詢指定列數(shù)據(jù)并設(shè)置別名。
tabEnv.createTemporaryView("t_emp",table);
tabEnv.sqlQuery("select * from t_emp").execute().print();
- DataStream轉(zhuǎn)Table對(duì)象
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment);
//設(shè)置別名并查詢指定列數(shù)據(jù)
Table table = tabEnv.fromDataStream(source,$("deptno").as("dno"));
- createTemporaryView
創(chuàng)建臨時(shí)視圖(臨時(shí)表),第一個(gè)參數(shù)是注冊(cè)的表名([catalog.db.]tableName),第二個(gè)參數(shù)可以是Tabe對(duì)象也可以是DataStream對(duì)象,第三個(gè)參數(shù)是指定的列字段名(可選)。
Table table = tabEnv.fromDataStream(source);
//Table table = tabEnv.fromDataStream(source,$("deptno").as("dno"));查詢指定列數(shù)據(jù)并設(shè)置別名。
tabEnv.createTemporaryView("t_emp",table);
=========================================================================================
DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment);
//設(shè)置別名 并指定查詢的列數(shù)據(jù)
tabEnv.createTemporaryView("t_emp",source,$("deptno").as("dd"));
tabEnv.sqlQuery("select * from t_emp").execute().print();
- 數(shù)據(jù)類型
- 原子類型:DataStream中支持的數(shù)據(jù)類型,Table也是支持的,也就是基本數(shù)據(jù)類和通用類型(Integer、Double、String等)
- Tuple類型:從f0開始計(jì)數(shù),f0 f1 f2,所有字段都可以被重新排序,也可以提前一部分字段。
- Pojo類型:Flink 也支持多種數(shù)據(jù)類型組合成的“復(fù)合類型”,最典型的就是簡(jiǎn)單 Java 對(duì)象(POJO 類型)。將 POJO 類型的 DataStream 轉(zhuǎn)換成 Table,如果不指定字段名稱,就會(huì)直接使用原始 POJO 類型 中的字段名稱。Pojo字段可以被重新排序、提取和重命名。
- Row類型:Flink 中還定義了一個(gè)在關(guān)系型表中更加通用的數(shù)據(jù)類型——行(Row),它是 Table 中數(shù)據(jù)的基 本組織形式。長(zhǎng)度固定,無法推斷出每個(gè)字段的類型,在使用時(shí)必須聲明具體的類型信息。
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = environment.readTextFile("data/dept.txt");
//所謂的字段重新排序就是查詢出來的指定字段順序可以自定義
StreamTableEnvironment.create(environment).fromDataStream(source,$("f1")).execute().print();
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(environment);
DataStreamSource<Row> source = environment.fromElements(Row.ofKind(RowKind.INSERT, "張三", 20)
, Row.ofKind(RowKind.INSERT, "李四", 25)
//RowKind.UPDATE_BEFORE 打標(biāo)記的作用
, Row.ofKind(RowKind.UPDATE_BEFORE, "yy", 12)
, Row.ofKind(RowKind.UPDATE_AFTER, "aaa", 18));
Table table = tabEnv.fromChangelogStream(source);
table.execute().print();
- 查詢表
Table API 是關(guān)于 Scala 和 Java 的集成語(yǔ)言式查詢 API。與 SQL 相反,Table API 的查詢不是由字符串指定,而是在宿主語(yǔ)言中逐步構(gòu)建。
table.groupBy(...).select() ,其中 groupBy(...) 指定 table 的分組,而 select(...) 在 table 分組上的投影
//{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":345830400000,"sal":800.0,"comm":null,"deptno":20}
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
Table table = StreamTableEnvironment.create(environment).fromDataStream(source);
table.where($("deptno").isEqual(10)).select($("ename"), $("job")).execute().print();
table.groupBy($("deptno")).select($("deptno"),$("sal").avg().as("sal_avg")).execute().print();
- SQL語(yǔ)法
StreamTableEnvironment對(duì)象有兩個(gè)常用的方法:sqlQuery()和executeSql()兩個(gè)方法。
- sqlQuery()主要用于查詢數(shù)據(jù),并且可以查詢混用。
- executeSql()可以用來增刪改查數(shù)據(jù)都可以。
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
StreamTableEnvironment tbl = StreamTableEnvironment.create(environment);
tbl.createTemporaryView("t_emp_demo",source);
String sql="select deptno,avg(sal) " +
" from t_emp_demo " +
" group by deptno ";
tbl.executeSql(sql).print();
=========================================================================================
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
Table empTable = tableEnvironment.fromDataStream(source);
tableEnvironment.sqlQuery("select * from "+empTable).execute().print();
- 輸出表
insertInto:Table通過寫入TableSink輸出。TableSink是一個(gè)通用接口,包括:
- 用于支持多種文件格式(如CSV、Apache Parquest、Apache Avro)
- 存儲(chǔ)系統(tǒng)(如JDBC、Apache Hbase、Apache Cassandra、Es)
- 消息隊(duì)列系統(tǒng)(如Apache kafka、Rabbit MQ)
- 控制臺(tái)寫入并輸出
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.createTemporaryView("t_emp_d",source);
Table tableSource = tblEnv.fromDataStream(source, $("empno"), $("ename"), $("job"));
String sql=
"create table t_emp_r(" +
"empno Integer," +
"ename String," +
"job String) " +
"with ( " +
"'connector'='print')";
tblEnv.executeSql(sql);
tableSource.insertInto("t_emp_r").execute();
//t_emp_r 不能當(dāng)做表進(jìn)行查詢 只能當(dāng)做sink端
// tblEnv.executeSql("select * from t_emp_r").print();
environment.execute();
3、Flink SQL連接器
- kafka寫入
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
String sqlSource="create table kafka_source( " +
"deptno int," +
"dname String," +
"loc String)" +
"with (" +
"'connector'='kafka'," +
"'topic'='flink_kafka_source'," +
"'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
"'properties.group.id'='flink-zwf'," +
"'scan.startup.mode'='earliest-offset'," +
"'format'='csv')";
tblEnv.executeSql(sqlSource);
String sqlSink="create table kafka_sink( " +
"deptno int," +
"dname String," +
"loc String)" +
"with (" +
"'connector'='kafka'," +
"'topic'='flink_kafka_sink'," +
"'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
"'properties.group.id'='flink-zwf'," +
"'scan.startup.mode'='earliest-offset'," +
"'format'='json')";
tblEnv.executeSql(sqlSink);
//從一張表查詢數(shù)據(jù)插入到另外一張表中
tblEnv.sqlQuery("select * from kafka_source").insertInto("kafka_sink").execute();
- 查看執(zhí)行計(jì)劃
tblEnv.sqlQuery("select * from kafka_source").insertInto("kafka_sink").printExplain();
- 表對(duì)象轉(zhuǎn)換為流對(duì)象
將一個(gè)Table對(duì)象轉(zhuǎn)換成DataStream,直接調(diào)用表環(huán)境中國(guó)的ToDataStream();
tableEnv.toDataStream(table).print();
- toChangelogStream
對(duì)于有更新操作的表,我們不要視圖直接把它轉(zhuǎn)換成DataStream打印,而是記錄一下它的更新日志(change log)。
對(duì)于表的更新操作的表,就變成了一條更新日志的流,可轉(zhuǎn)換成流打印輸出。
規(guī)則:Insert插入操作編碼是add消息。Delete刪除操作編碼為retract消息 update更新操作則為編碼更改行的retract消息和更新后行的add消息。
tableEnv.toChangelogStream(table).print();
- JDBC連接
Flink 支持連接到多個(gè)使用方言(dialect)的數(shù)據(jù)庫(kù),如 MySQL、Oracle、PostgreSQL、Derby 等。其中,Derby 通常是用于測(cè)試目的。下表列出了從關(guān)系數(shù)據(jù)庫(kù)數(shù)據(jù)類型到 Flink SQL 數(shù)據(jù)類型的類型映射,映射表可以使得在 Flink 中定義 JDBC 表更加簡(jiǎn)單。
- 常見的數(shù)據(jù)類型映射
- 依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.15.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
- 案例
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
String jdbcSQL=
"create table jdbc_scott_emp(" +
"empno int," +
"ename string," +
"job string," +
"mgr int," +
"hiredate date," +
"sal double," +
"comm double," +
"deptno int)" +
"with (" +
"'connector'='jdbc'," +
"'url'='jdbc:mysql://master:3306/scott?serverTimeZone=Asia/Shanghai'," +
"'table-name'='emp'," +
"'driver'='com.mysql.cj.jdbc.Driver'," +
"'username'='root'," +
"'password'='Root@123456.')";
tblEnv.executeSql(jdbcSQL);
tblEnv.sqlQuery("select * from jdbc_scott_emp").execute().print();
- SQL語(yǔ)句(jdbc數(shù)據(jù)插入操作、時(shí)態(tài)關(guān)聯(lián)創(chuàng)建維表)
-- 從另一張表 "T" 將數(shù)據(jù)寫入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;
-- JDBC 表在時(shí)態(tài)表關(guān)聯(lián)中作為維表
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;
- DataGen SQL連接器
用于生成模擬數(shù)據(jù),DataGen 連接器允許按數(shù)據(jù)生成規(guī)則進(jìn)行讀取。
不支持復(fù)雜類型: Array,Map,Row。 請(qǐng)用計(jì)算列構(gòu)造這些類型。
連接器參數(shù)
- 案例
//按照一定規(guī)則隨機(jī)生成數(shù)據(jù)
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
String SqlStr="CREATE TABLE datagen (\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='5',\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
" 'fields.f_random_str.length'='10'\n" +
")";
tblEnv.executeSql(SqlStr);
tblEnv.sqlQuery("select * from datagen").execute().print();
- Upsert Kafka SQL連接器
由于flink是流式計(jì)算,會(huì)出現(xiàn)相同的key值數(shù)據(jù)寫入,在寫入kafka中,同一個(gè)key生成的value值會(huì)不斷被更新(update
-u u+標(biāo)記),如果沒有重復(fù)的key則被插入(insert+i標(biāo)記),如果value為空值就會(huì)被標(biāo)記刪除(delete+d標(biāo)記)。作為 sink,upsert-kafka 連接器可以消費(fèi) changelog 流。它會(huì)將 INSERT/UPDATE_AFTER 數(shù)據(jù)作為正常的 Kafka 消息寫入,并將 DELETE 數(shù)據(jù)以 value 為空的 Kafka 消息寫入(表示對(duì)應(yīng) key 的消息被刪除)。Flink 將根據(jù)主鍵列的值對(duì)數(shù)據(jù)進(jìn)行分區(qū),從而保證主鍵上的消息有序,因此同一主鍵上的更新/刪除消息將落在同一分區(qū)中。
- 案例
//使用datagen模擬數(shù)據(jù)
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
String dataGen=
"create table t_dataGen(" +
"deptno int," +
"salnum int," +
"ts AS localtimestamp," +
"WATERMARK FOR ts AS ts" +
") with ( " +
"'connector'='datagen'," +
"'rows-per-second'='2'," +
"'fields.deptno.min'='88'," +
"'fields.deptno.max'='99'," +
"'fields.salnum.min'='10'," +
"'fields.salnum.max'='20')";
tblEnv.executeSql(dataGen);
// tblEnv.sqlQuery("select deptno,sum(salnum) as salnum from t_dataGen group by deptno").execute().print();
//kafka sink端
String kafkaSink="create table upsert_kafka_num(" +
"deptno int," +
"salnum int," +
"primary key(deptno) not enforced)" +
"with(" +
"'connector'='upsert-kafka'," +
"'topic'='upsert_kafka'," +
"'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
"'key.format'='csv'," +
"'value.format'='json')";
tblEnv.executeSql(kafkaSink);
//插入數(shù)據(jù)
tblEnv.executeSql("insert into upsert_kafka_num select deptno,sum(salnum) as salnum from t_dataGen group by deptno");
- FileSystem連接器
文件系統(tǒng)分為:本地文件系統(tǒng)、外部文件系統(tǒng)。
本地文件系統(tǒng):ink 原生支持本地機(jī)器上的文件系統(tǒng),包括任何掛載到本地文件系統(tǒng)的 NFS 或 SAN 驅(qū)動(dòng)器,默認(rèn)即可使用,無需額外配置。本地文件可通過 file:// URI Scheme 引用。
外部文件系統(tǒng):常見的有HDFS、clickhouse、HBase,上述文件系統(tǒng)可以并且需要作為插件使用。
使用外部文件系統(tǒng)時(shí),在啟動(dòng) Flink 之前需將對(duì)應(yīng)的 JAR 文件從
opt目錄復(fù)制到 Flink 發(fā)行版plugin目錄下的某一文件夾中。
- 本地文件測(cè)試
public static void main(String[] args) {
//設(shè)置環(huán)境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
String sqlDemo="create table t_dept_d(" +
"deptno int," +
"dname string," +
"loc string)" +
"with(" +
"'connector'='filesystem'," +
"'path'='data/dept.txt'," +
"'format'='csv'" +
")";
tblEnv.executeSql(sqlDemo);
tblEnv.sqlQuery("select * from t_dept_d").execute().print();
}
- HDFS分布文件系統(tǒng)測(cè)試
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.4</version>
</dependency>
<!--加載一些其他配置文件 比如core-site.xml dfs-core.xml yarn-site.xml等配置文件進(jìn)resource目錄-->
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
String hdfsSql="create table dfs_dept(" +
"deptno int," +
"dname string," +
"loc string)" +
"with (" +
"'connector'='filesystem'," +
"'path'='hdfs://hdfs-zwf/dept.txt'," +
"'format'='csv')";
tblEnv.executeSql(hdfsSql);
tblEnv.sqlQuery("select * from dfs_dept").execute().print();
}
4、Schema結(jié)構(gòu)
- Pythsical column
物理字段:源自于外部存儲(chǔ)系統(tǒng)本身schema中的字段
- kafka消息的key、value中的字段
- mysql表中的字段
- hive表中的字段
- parquet文件中的字段
- computed column
表達(dá)式字段:在物理字段上施加一個(gè)sql表達(dá)式,并將表達(dá)式結(jié)果定義為一個(gè)字段.
// 第一種sqlAPI
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
String sqlStr="create table upsert_info(" +
"deptno int," +
"salnum2 as salnum*100,"+ //計(jì)算列
"salnum int)" +
"with (" +
"'connector'='kafka'," +
"'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
"'properties.group.id'='zwf'," +
"'topic'='upsert_kafka'," +
"'scan.startup.mode'='earliest-offset'," +
"'format'='json')";
tblEnv.executeSql(sqlStr);
tblEnv.sqlQuery("select * from upsert_info").execute().print();
//第二種方式 TableAPI
tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("deptno", DataTypes.INT())
.column("salnum",DataTypes.INT())
.columnByExpression("salpluns","salnum*100")
.build()).option("connector","kafka")
.option("topic","upsert_kafka")
.option("scan.startup.mode","earliest-offset")
.option("properties.bootstrap.servers","node1:9092,master:9092,node2:9092")
.format("json").build());
tblEnv.sqlQuery("select * from kafka_dept").execute().print();
- metadata column
元數(shù)據(jù)字段:來源于connector從外部存儲(chǔ)系統(tǒng)中獲取到外部系統(tǒng)元信息。
kafka消息,通常意義上的數(shù)據(jù)內(nèi)容是在record的key和value中,但是kafka還會(huì)攜帶所屬partition、offset、timestamp等元信息。而flink的連接器可以獲取并暴露這些元信息,允許用戶將信息定義成flinksql表中的字段。
//第一種sqlAPi
String sqlStr="create table upsert_info(" +
"deptno int," +
"salnum2 as salnum*100," + //計(jì)算列
"event_time timestamp_ltz(3) metadata from 'timestamp',"+ //metadata列
"salnum int)" +
"with (" +
"'connector'='kafka'," +
"'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
"'properties.group.id'='zwf'," +
"'topic'='upsert_kafka'," +
"'scan.startup.mode'='earliest-offset'," +
"'format'='json')";
tblEnv.executeSql(sqlStr);
tblEnv.sqlQuery("select * from upsert_info").execute().print();
//第二種方式 TableAPI
tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("deptno", DataTypes.INT())
.column("salnum",DataTypes.INT())
//metadata column
.columnByMetadata("event_time",DataTypes.TIMESTAMP_LTZ(2),"timestamp",true)
.columnByMetadata("k_offset",DataTypes.INT(),"offset",true)
.build()).option("connector","kafka")
.option("topic","upsert_kafka")
.option("scan.startup.mode","earliest-offset")
.option("properties.bootstrap.servers","node1:9092,master:9092,node2:9092")
.format("json").build());
tblEnv.sqlQuery("select * from kafka_dept").execute().print();
- 主鍵約束
單字段主鍵約束語(yǔ)法:
// SQL API
id INT PRIMARY KEY NOT ENFORCED,
name STRING
// Table Api
tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("deptno", DataTypes.INT())
//設(shè)置主鍵字段 primary key
.primaryKey("deptno")
.column("salnum",DataTypes.INT())
//metadata column
.columnByMetadata("event_time",DataTypes.TIMESTAMP_LTZ(2),"timestamp",true)
.columnByMetadata("k_offset",DataTypes.INT(),"offset",true)
.build()).option("connector","kafka")
多字段主鍵約束語(yǔ)法:
-- SQL API
id,
name,
PRIMARY KEY(id,name) NOT ENFORCED
//Table API
tblEnv.createTable("kafka_dept", TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("deptno", DataTypes.INT())
//設(shè)置主鍵字段 primary key
.primaryKey("deptno","event_time")
.column("salnum",DataTypes.INT())
//metadata column
.columnByMetadata("event_time",DataTypes.TIMESTAMP_LTZ(2),"timestamp",true)
.columnByMetadata("k_offset",DataTypes.INT(),"offset",true)
.build()).option("connector","kafka")
//第一種sqlAPi
String sqlStr="create table upsert_info(" +
"deptno int," +//計(jì)算列
"event_time timestamp_ltz(3) metadata from 'timestamp',"+ //metadata列
"dname string," +
"loc string," +
"primary key(deptno,loc) not enforced)" +
"with (" +
"'connector'='upsert-kafka'," +
"'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
"'properties.group.id'='zwf'," +
"'topic'='flink_kafka_source'," +
"'key.format'='csv'," +
"'value.format'='json')";
tblEnv.executeSql(sqlStr);
tblEnv.sqlQuery("select * from upsert_info").execute().print();
注意的是:kafka連接器模式下不能設(shè)置主鍵,但是upsert-kafka連接器模式必須設(shè)置主鍵!主鍵字段不能有空值
在upsert-kafka模式下,key和value值不能為空,否則在csv模式中會(huì)解析失敗!
5、FlinkSQL Format
connector 連接器:對(duì)接外部存儲(chǔ)時(shí), 根據(jù)外部存儲(chǔ)中的數(shù)據(jù)格式不同, 需要用到不同的 format 組件;
format 組件:作用就是告訴連接器, 如何解析外部存儲(chǔ)中的數(shù)據(jù)及映射到表 schema;
使用基本步驟:
- 導(dǎo)入format組件的jar依賴
- 指導(dǎo)format組件名稱
- 設(shè)置format組件所需的參數(shù)
- FlinkSQL支持的Format
- 案例
<!--json格式依賴-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!--csv格式依賴-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
- 案例
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true'
)
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
6、Flink WaterMark
動(dòng)態(tài)表 是 Flink 的支持流數(shù)據(jù)的 Table API 和 SQL 的核心概念。與表示批處理數(shù)據(jù)的靜態(tài)表不同,動(dòng)態(tài)表是隨時(shí)間變化的。可以像查詢靜態(tài)批處理表一樣查詢它們。查詢動(dòng)態(tài)表將生成一個(gè)連續(xù)查詢(Continuous Query) 。一個(gè)連續(xù)查詢永遠(yuǎn)不會(huì)終止,結(jié)果會(huì)生成一個(gè)動(dòng)態(tài)表。查詢不斷更 新其(動(dòng)態(tài))結(jié)果表,以反映其(動(dòng)態(tài))輸入表上的更改。本質(zhì)上,動(dòng)態(tài)表上的連續(xù)查詢非常類似于定 義物化視圖的查詢。
需要注意的是,連續(xù)查詢的結(jié)果在語(yǔ)義上總是等價(jià)于以批處理模式在輸入表快照上執(zhí)行的相同查詢的結(jié)果。
與spark、hive組件中的表最大不同之處在于flink SQL中的表是動(dòng)態(tài)表。flink核心就是對(duì)有界或者*的數(shù)據(jù)流處理,是流式持續(xù)處理的過程。
- 連續(xù)查詢
在動(dòng)態(tài)表上計(jì)算一個(gè)連續(xù)查詢,生成一個(gè)新的動(dòng)態(tài)表。與批處理查詢不同,連續(xù)查詢從不終止,根據(jù)其輸入表上的更新其結(jié)果表。在任何時(shí)候,連續(xù)查詢的結(jié)果在語(yǔ)義上與批處理模式在輸入表快照上執(zhí)行相同查詢的結(jié)果相同。
- 事件時(shí)間
創(chuàng)建表的DDL,增加一個(gè)字段,通過watermark語(yǔ)句來定義事件時(shí)間屬性。
WATERMARK 語(yǔ)句主要用來定義水位線(watermark)的生成表達(dá)式,這個(gè)表達(dá)式會(huì)將帶有事件 時(shí)間戳的字段標(biāo)記為事件時(shí)間屬性,并在它基礎(chǔ)上給出水位線的延遲時(shí)間。
//水位線 設(shè)置延遲時(shí)間5s
String eventTime="create table proc_dept_tbl(" +
"deptno int," +
"dname string," +
"loc string," +
"ts timestamp_ltz(3) metadata from 'timestamp'," +
"watermark for ts as ts-interval '5' second" + // pt是事件處理
")with( " +
"'connector'='kafka'," +
"'topic'='flink_kafka_sink'," +
"'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
"'properties.group.id'='zwf'," +
"'scan.startup.mode'='earliest-offset'," +
"'format'='json')";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.executeSql(eventTime);
tblEnv.sqlQuery("select * from proc_dept_tbl").execute().print();
//Table API
tblEnv.createTable("t_water_mark", TableDescriptor.forConnector("kafka")
.option("topic","flink_kafka_sink")
.option("properties.bootstrap.servers","node1:9092,master:9092,node2:9092")
.option("properties.group.id","zwf")
.option("scan.startup.mode","earliest-offset")
.format("json")
.schema(Schema.newBuilder()
.column("deptno",DataTypes.INT())
.column("dname",DataTypes.STRING())
.column("loc",DataTypes.STRING())
.columnByMetadata("ts",DataTypes.TIMESTAMP_LTZ(3),"timestamp",true)
.watermark("ts","ts-interval '5' second").build()).build());
tblEnv.sqlQuery("select deptno,dname,ts from t_water_mark").execute().print();
- 處理時(shí)間
定義處理時(shí)間屬性時(shí),必須要額外聲明一個(gè)字段,專門用來保存當(dāng)前的處理時(shí)間
在創(chuàng)建表的 DDL(CREATE TABLE 語(yǔ)句)中,可以增加一個(gè)額外的字段,通過調(diào)用系統(tǒng)內(nèi)置的 PROCTIME()函數(shù)來指定當(dāng)前的處理時(shí)間屬性,返回的類型是 TIMESTAMP_LTZ
- 案例
//Flink SQL 水位線 處理時(shí)間
String procTime="create table proc_dept_tbl(" +
"deptno int," +
"dname string," +
"loc string," +
"pt as proctime()" + // pt是事件處理
")with( " +
"'connector'='kafka'," +
"'topic'='flink_kafka_sink'," +
"'properties.bootstrap.servers'='node1:9092,master:9092,node2:9092'," +
"'properties.group.id'='zwf'," +
"'scan.startup.mode'='earliest-offset'," +
"'format'='json')";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.executeSql(procTime);
tblEnv.sqlQuery("select * from proc_dept_tbl").execute().print();
//使用TableApi執(zhí)行
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
DataStream<Emp> source = environment.readTextFile("data/emp.txt").map(x -> JSONObject.parseObject(x, Emp.class));
Table table = tblEnv.fromDataStream(source, Schema.newBuilder()
.column("empno",DataTypes.INT())
.column("ename", DataTypes.STRING())
.column("job",DataTypes.STRING())
.column("mgr",DataTypes.INT())
.column("hiredate",DataTypes.BIGINT())
.column("sal",DataTypes.DOUBLE())
.column("comm",DataTypes.DOUBLE())
.column("deptno",DataTypes.INT())
.columnByExpression("ts","proctime()")
.build());
tblEnv.sqlQuery("select empno,ename,ts from"+table.toString()).execute().print();
- DataStream定義時(shí)間
處理時(shí)間屬性同樣可以在將DataStream轉(zhuǎn)換為表的時(shí)候來定義。我們調(diào)用fromDataStream()方法 創(chuàng)建表時(shí),可以用.proctime()后綴來指定處理時(shí)間屬性字段。
由于處理時(shí)間是系統(tǒng)時(shí)間,原始數(shù)據(jù)中并沒有這個(gè)字段,所以處理時(shí)間屬性一定不能定義在一個(gè)已 有字段上,只能定義在表結(jié)構(gòu)所有字段的最后,作為額外的邏輯字段出現(xiàn)。
//快速入門
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// environment.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(environment);
//讀取文本文件數(shù)據(jù)轉(zhuǎn)為Table 對(duì)象
DataStream<Emp> source = environment.readTextFile("data/emp.txt")
.map(lines ->JSONObject.parseObject(lines, Emp.class));
//把JAVA對(duì)象轉(zhuǎn)為table對(duì)象
//注意Emp對(duì)象中hiredate時(shí)間戳是Long類型
// {"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":351446400000,"sal":1600.0,"comm":300.0,"deptno":30}
Table table = tableEnv.fromDataStream(source,$("empno"),$("ename"),$("ts").proctime());
table.select($("*")).execute().print();
7、FlinkSQL 窗口TVF
- TVF窗口化表值函數(shù)
- 目前flink提供了以下幾個(gè)窗口:
- 滑動(dòng)窗口
- 滾動(dòng)窗口
- 累積窗口
- 會(huì)話窗口
- 窗口TVF的返回值中,除去原始表中的所有列,增加描述窗口的額外3個(gè)列:
? 窗口起始點(diǎn):窗口開始起始時(shí)間
? 窗口結(jié)束點(diǎn):窗口結(jié)束時(shí)間
? 窗口時(shí)間:窗口結(jié)束時(shí)間-1
滾動(dòng)窗口在DataStream API中的定義完全一樣,是長(zhǎng)度固定、時(shí)間對(duì)齊、無重疊的窗口,一般用于周期性的統(tǒng)計(jì)計(jì)算。
Tumble(table data,timecol,size[,offset])函數(shù)三個(gè)必需參數(shù):
data:表參數(shù),此表需要包含一個(gè)時(shí)間屬性列。
timecol:一個(gè)描述符,指示數(shù)據(jù)的哪個(gè)時(shí)間屬性列應(yīng)該映射到滾動(dòng)的窗口。
size:指定滾動(dòng)窗口的大小。
- 案例
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
//執(zhí)行SQL 隨機(jī)生成gid和sales gid隨機(jī)值取10到20 sales隨機(jī)值取1到9
//ts 使用本地時(shí)間 水位線是本地時(shí)間延遲5s
tblEnv.executeSql("CREATE TABLE t_goods (\n" +
" gid INT,\n" +
" sales INT,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.gid.min'='10',\n" +
" 'fields.gid.max'='20',\n" +
" 'fields.sales.min'='1',\n" +
" 'fields.sales.max'='9'\n" +
")");
// tblEnv.sqlQuery("select * from t_goods").execute().print();
//使用滾動(dòng)窗口 5s滾動(dòng)計(jì)算一次
// String tumbleWin="select * from table(tumble(table t_goods,descriptor(ts),interval '5' second))";
// tblEnv.sqlQuery(tumbleWin).execute().print();
//每個(gè)時(shí)間窗口中每個(gè)guid中總銷售額信息
tblEnv.sqlQuery(
"select window_start,window_end,gid,sum(sales) as sum_sales " +
"from table(tumble(table t_goods,descriptor(ts),interval '5' second))" +
"group by window_start,window_end,gid"
).execute().print();
- 滑動(dòng)窗口
Hopping windows也稱為"sliding windows"
HOP函數(shù)分配的窗口覆蓋大小間隔內(nèi)的行,并根據(jù)時(shí)間屬性性列移動(dòng)每個(gè)窗口
HOP函數(shù)有三個(gè)必需的參數(shù):HOP(Table data,slide,size[,offset])
- data:表格值,帶有時(shí)間戳字段的表格。
- slide:指定順序hopping窗口開始之間的持續(xù)時(shí)間。
- size:指定hopping窗口寬度的持續(xù)時(shí)間,size必須是slide的整數(shù)倍。
- 案例
//滑動(dòng)窗口表值函數(shù) 窗口表值函數(shù)
//隨機(jī)生成gid大小是10到20 sales大小是1到10
String datagen="create table t_datagen(" +
"gid int," +
"sales int," +
"ts as localtimestamp," +
"watermark for ts as ts-interval '5' second" +
") with (" +
"'connector'='datagen'," +
"'rows-per-second'='10'," +
"'fields.gid.min'='10'," +
"'fields.gid.max'='20'," +
"'fields.sales.min'='1'," +
"'fields.sales.max'='10')";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.executeSql(datagen);
// tblEnv.sqlQuery("select * from t_datagen").execute().print();
//窗口大小是15s 滑動(dòng)3s
tblEnv.sqlQuery("select gid,sum(sales),window_start,window_end from table(hop(table t_datagen,descriptor(ts),interval '3' second,interval '15' second)) group by window_start,window_end,gid").execute().print();
- 累積窗口
CUMULATE函數(shù)將元素分配給覆蓋在初始步長(zhǎng)間隔內(nèi)的行,并將每一步擴(kuò)展為多一個(gè)步長(zhǎng)(保持 window start固定),直到最大窗口大小。
可以把cumulative函數(shù)看作應(yīng)用TUMBLE窗口,首先使用最大窗口大小,然后將每個(gè)滾動(dòng)窗口分 割成幾個(gè)具有相同窗口開始和窗口結(jié)束步長(zhǎng)差異的窗口。
因此,累積窗口確實(shí)是重疊的,而且沒有固定的大小。
cumulate函數(shù)有三個(gè)必須的參數(shù):
cumulate(table data,descriptor(timecol),step,size)——必須參數(shù)有以下:
- data:表格參數(shù),表格必須包含一個(gè)時(shí)間屬性列
- timecol:時(shí)間屬性字段,也就是使用那個(gè)時(shí)間。
- step:連續(xù)累積窗口結(jié)束之間增加的窗口大小的持續(xù)時(shí)間。
- size:累積窗口的最大寬度的持續(xù)時(shí)間。大小必須是步長(zhǎng)的整數(shù)倍。
- 案例
//累加窗口大小時(shí)間
//滑動(dòng)窗口表值函數(shù) 窗口表值函數(shù)
//隨機(jī)生成gid大小是10到20 sales大小是1到10
String datagen="create table t_datagen(" +
"gid int," +
"sales int," +
"ts as localtimestamp," +
"watermark for ts as ts-interval '5' second" +
") with (" +
"'connector'='datagen'," +
"'rows-per-second'='10'," +
"'fields.gid.min'='10'," +
"'fields.gid.max'='20'," +
"'fields.sales.min'='1'," +
"'fields.sales.max'='10')";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.executeSql(datagen);
//每3s計(jì)算一次 并進(jìn)行累加 比如19:45-19:48:10 19:48-19:51:20=>19:45-19:51:30
tblEnv.sqlQuery("select window_start,window_end,gid,sum(sales) as sales_sum from table(cumulate(table t_datagen,descriptor(ts),interval '3' second,interval '15' second)) group by window_start,window_end,gid").execute().print();
- 分組去重
group+distinct:表示分組+去重,在用于uv統(tǒng)計(jì)時(shí)就需要!
- 案例
//用于網(wǎng)站統(tǒng)計(jì) uv 用戶訪問數(shù) pv 頁(yè)面訪問數(shù)
String websiteSQL="create table wbSiteNum(" +
"gid int," +
"url string," +
"ts as localtimestamp," +
"watermark for ts as ts-interval '5' second" +
")with(" +
"'connector'='datagen'," +
"'fields.gid.min'='1000'," +
"'fields.gid.max'='2000'," +
"'fields.url.length'='10')";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.executeSql(websiteSQL);
// tblEnv.sqlQuery("select * from wbSiteNum").execute().print();
tblEnv.sqlQuery(
"select count(distinct gid) as uv,count(url) as pv\n" +
"from wbSiteNum"
).execute().print();
8、FlinkSQL聚合函數(shù)
- 分組聚合
在SQL中一般所說的聚合,通過一些內(nèi)置的函數(shù)來實(shí)現(xiàn),比如SUM、MAX、MIN、AVG、以及count。
它得特點(diǎn)是對(duì)多條輸入數(shù)據(jù)進(jìn)行計(jì)算,得到一個(gè)唯一的值,屬于多對(duì)一的轉(zhuǎn)換。比如我們可以通過下面的代碼計(jì)算輸入數(shù)據(jù)的個(gè)數(shù)。更多時(shí)候,我們通過group by子句指定分組的鍵,從而對(duì)數(shù)據(jù)按照某個(gè)字段做一個(gè)分組統(tǒng)計(jì)。
- 案例
//分組求和
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.sqlQuery("SELECT pid, sum(num) AS total\n" +
"FROM (VALUES\n" +
" ('省1','市1','縣1',100),\n" +
" ('省1','市2','縣2',101),\n" +
" ('省1','市2','縣1',102),\n" +
" ('省2','市1','縣4',103),\n" +
" ('省2','市2','縣1',104),\n" +
" ('省2','市2','縣1',105),\n" +
" ('省3','市1','縣1',106),\n" +
" ('省3','市2','縣1',107),\n" +
" ('省3','市2','縣2',108),\n" +
" ('省4','市1','縣1',109),\n" +
" ('省4','市2','縣1',110))\n" +
"AS t_person_num(pid, cid, xid,num)\n" +
"GROUP BY pid;").execute().print();
- rollup
維度的上卷,字段維度從細(xì)粒度上轉(zhuǎn)變粗粒度!
//分組求和 rollup(pid,cid,xid) 維度從粗粒度到細(xì)粒度 pid->cid->xid
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.sqlQuery("SELECT pid, sum(num) AS total\n" +
"FROM (VALUES\n" +
" ('省1','市1','縣1',100),\n" +
" ('省1','市2','縣2',101),\n" +
" ('省1','市2','縣1',102),\n" +
" ('省2','市1','縣4',103),\n" +
" ('省2','市2','縣1',104),\n" +
" ('省2','市2','縣1',105),\n" +
" ('省3','市1','縣1',106),\n" +
" ('省3','市2','縣1',107),\n" +
" ('省3','市2','縣2',108),\n" +
" ('省4','市1','縣1',109),\n" +
" ('省4','市2','縣1',110))\n" +
"AS t_person_num(pid, cid, xid,num)\n" +
"GROUP BY rollup(pid,cid,xid)").execute().print();
- cube
所有維度分組顯示,也就是正方體原則!比如(col1,col2,col3)2^3個(gè)維度表示。
tableEnvironment.sqlQuery("SELECT pid, cid, xid, sum(num) AS total\n" +
"FROM (VALUES\n" +
" ('省1','市1','縣1',100),\n" +
" ('省1','市2','縣2',101),\n" +
" ('省1','市2','縣1',102),\n" +
" ('省2','市1','縣4',103),\n" +
" ('省2','市2','縣1',104),\n" +
" ('省2','市2','縣1',105),\n" +
" ('省3','市1','縣1',106),\n" +
" ('省3','市2','縣1',107),\n" +
" ('省3','市2','縣2',108),\n" +
" ('省4','市1','縣1',109),\n" +
" ('省4','市2','縣1',110))\n" +
"AS t_person_num(pid, cid, xid, num)\n" +
"GROUP BY CUBE(pid, cid, xid)").execute().print();
- grouping Sets
自定義維度分組,以下案例
(pid, cid, xid),(pid, cid),(pid), ()自定義四個(gè)維度分組。
//自定義維度分組
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.sqlQuery("SELECT pid, sum(num) AS total\n" +
"FROM (VALUES\n" +
" ('省1','市1','縣1',100),\n" +
" ('省1','市2','縣2',101),\n" +
" ('省1','市2','縣1',102),\n" +
" ('省2','市1','縣4',103),\n" +
" ('省2','市2','縣1',104),\n" +
" ('省2','市2','縣1',105),\n" +
" ('省3','市1','縣1',106),\n" +
" ('省3','市2','縣1',107),\n" +
" ('省3','市2','縣2',108),\n" +
" ('省4','市1','縣1',109),\n" +
" ('省4','市2','縣1',110))\n" +
"AS t_person_num(pid, cid, xid,num)\n" +
"GROUP BY GROUPING SETS ((pid, cid, xid),(pid, cid),(pid), ())").execute().print();
9、開窗函數(shù)
比如說,我們可以以每一行數(shù)據(jù)為基準(zhǔn),計(jì)算它之前 1 小時(shí)內(nèi)所有數(shù)據(jù)的平均值;也可以計(jì)算它 之前 10 個(gè)數(shù)的平均值。 就好像是在每一行上打開了一扇窗戶、收集數(shù)據(jù)進(jìn)行統(tǒng)計(jì)一樣,這就是所謂的“開窗函數(shù)”。
分組聚合、窗口 TVF聚合都是“多對(duì)一”的關(guān)系,將數(shù)據(jù)分組之后每組只會(huì)得到一個(gè)聚合結(jié)果;
而開窗函數(shù)是對(duì)每行都要做一次開窗聚合,因此聚合之后表中的行數(shù)不會(huì)有任何減少,是一 個(gè)“多對(duì)多”的關(guān)系.
- 基本語(yǔ)法
SELECT
<聚合函數(shù)> OVER ( [PARTITION BY <字段 1>[, <字段 2>, ...]] ORDER BY <時(shí)間屬性字段> <開窗范圍>)
, ...
FROM ...
- over():關(guān)鍵字前面是一個(gè)聚合函數(shù),它會(huì)應(yīng)用在后面over定義的窗口上,有如下參數(shù):
? 1、partition by(可選)
? 用來指定分區(qū)的鍵,類似于group by的分組,這部分是可選的。
? 2、order by (必選)
? OVER 窗口是基于當(dāng)前行擴(kuò)展出的一段數(shù)據(jù)范圍,選擇的標(biāo)準(zhǔn)可以 基于時(shí)間也可以基于數(shù)量 。
? 在 Flink 的流處理中,目前只支持按照時(shí)間屬性的升序排列,所以這里 ORDER BY 后面 的字段必須是定義好的時(shí)間屬性
? 開窗范圍:
? 1、對(duì)于開窗函數(shù)而言,還有一個(gè)必須要指定的就是開窗的范圍,也就是到底要擴(kuò)展多少行來做聚合。
? 2、這個(gè)范圍是由between<下界>and<上界>來定義,也就是"從下界到上界"的范圍。
? 3、目前上界只能是current row,也就是定義一個(gè)”從之前某一行到當(dāng)前行“的范圍。
? 4、開窗選擇的范圍可以基于時(shí)間,也可以基于數(shù)據(jù)的數(shù)量。所以開窗范圍還應(yīng)該在兩種模式之間做出選擇:
- 行間隔(rows intervals )
- 行間隔以rows為前綴,就是直接確定要選多少行,由當(dāng)前行出發(fā)向前選取多少行。
- 例如開窗函數(shù)選擇當(dāng)前行之前的5行數(shù)據(jù):ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
-
范圍間隔(range intervals 以時(shí)間劃分范圍)
- 范圍間隔:范圍間隔以
range為前綴,就是基于order by指定時(shí)間字段去選擇一個(gè)范圍,一般就是當(dāng)前行時(shí)間戳之前的一段時(shí)間。 - 例如:開窗范圍選擇當(dāng)前行之前1小時(shí)的數(shù)據(jù):RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
- 范圍間隔:范圍間隔以
-
案例
//執(zhí)行環(huán)境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
//執(zhí)行SQL
tableEnvironment.executeSql("CREATE TABLE t_goods (\n" +
" gid STRING,\n" +
" type INT,\n" +
" price INT,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.gid.length'='10',\n" +
" 'fields.type.min'='1',\n" +
" 'fields.type.max'='5',\n" +
" 'fields.price.min'='1',\n" +
" 'fields.price.max'='9'\n" +
")");
//截止當(dāng)前 前10s的每個(gè)類型的平均價(jià)格
tableEnvironment.sqlQuery(
"select tg.*,avg(price) over(partition by type order by ts range between interval '10' second preceding and current row) as price_avg\n" +
"from t_goods tg"
).execute().print();
//截止當(dāng)前 前10行的每個(gè)類型商品的平均價(jià)格
tableEnvironment.sqlQuery(
"select tg.*,avg(price) over(partition by type order by ts rows between 10 preceding and current row) as price_avg\n" +
"from t_goods tg"
).execute().print();
- TopN
在 Flink SQL 中,是通過 OVER 聚合和一個(gè)條件篩選來實(shí)現(xiàn)TopN的。
利用row_number()函數(shù)為每一行數(shù)據(jù)聚合得到一個(gè)排序之后的行號(hào),行號(hào)為row_num,并在外層的查詢中以row_num<=N作為條件進(jìn)行篩選,就可以得到根據(jù)排序字段統(tǒng)計(jì)的topN結(jié)果了。
FlinkSQL專門用over聚合做了優(yōu)化實(shí)現(xiàn),只有在topN的應(yīng)用場(chǎng)景中,over窗口oder by后才可以指定其他排序字段,要實(shí)現(xiàn)top N要嚴(yán)格按照上面格式定義,否則FlinkSQL優(yōu)化器將無法正常解析。而且目前TableApi不支持row_number()函數(shù),只有SQL API實(shí)現(xiàn)TopN方式。
SELECT ... FROM ( SELECT ...,
ROW_NUMBER() OVER ( [PARTITION BY <字段 1>[, <字段 1>...]] ORDER BY <排序字段 1> [asc|desc][, <排序字段 2> [asc|desc]...] ) AS row_num FROM ...)
WHERE row_num <= N [AND <其它條件>]
- 案例
//窗口排序
String dataGenDemo="create table t_datagen(" +
"gid string," +
"price int," +
"type int," +
"ts as localtimestamp," +
"watermark for ts as ts-interval '10' second" +
")with(" +
"'connector'='datagen'," +
"'fields.gid.length'='10'," +
"'rows-per-second'='10'," +
"'fields.price.min'='100'," +
"'fields.price.max'='999'," +
"'fields.type.min'='1'," +
"'fields.type.max'='1')";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.executeSql(dataGenDemo);
// tblEnv.sqlQuery("select * from t_datagen").execute().print();
String topNStr="select * from\n" +
"(select d.*,row_number() over(partition by type order by price desc) as row_num from\n"+
"t_datagen d) where row_num<=3";
tblEnv.sqlQuery(topNStr).execute().print();
=========================================================================================
//滾動(dòng)窗口每5s滾動(dòng)一次 每種類型排名前3的商品信息
String topNWin=" select *\n" +
" from(\n" +
" select *,row_number() over(partition by type order by price desc) as row_num\n" +
" from table(tumble(table t_datagen,descriptor(ts),interval '5' second))\n" +
" ) where row_num<=3";
tblEnv.sqlQuery(topNWin).execute().print();
=========================================================================================
- 窗口TopN
//查詢10秒內(nèi) 每個(gè)窗口銷售總額最高的前三名的種類
String topNWinSql="select * " +
" from(select type,t_price,window_start,window_end,row_number() over(partition by window_start,window_end order by t_price desc) as row_num\n" +
" from (\n" +
" select type,window_start,window_end,sum(price) as t_price\n" +
" from table(tumble(table t_datagen,descriptor(ts),interval '10' second))\n" +
" group by type,window_start,window_end\n" +
" ))where row_num<=3";
tblEnv.sqlQuery(topNWinSql).execute().print();
//查詢10秒內(nèi) 每個(gè)種類中銷售總額最高的前三名的商品
String topNWinSql="select * " +
" from (select gid,type,window_start,window_end,row_number() over(partition by window_start,window_end,type,gid order by price desc) as row_num\n" +
" from (\n" +
" select *\n" +
" from table(tumble(table t_datagen,descriptor(ts),interval '10' second))\n" +
" ) )" +
"where row_num<=3";
tblEnv.sqlQuery(topNWinSql).execute().print();
10、Join窗口聯(lián)結(jié)
與標(biāo)準(zhǔn)SQL一致,F(xiàn)link SQL的常規(guī)聯(lián)結(jié)分為內(nèi)聯(lián)結(jié)(inner join)和外聯(lián)結(jié)(outer join),區(qū)別在于結(jié)果中是否包含不符合聯(lián)結(jié)條件的行。目前僅支持等值條件作為聯(lián)結(jié)條件,也就是關(guān)鍵字ON后面必須是判斷兩表中字段相等的邏輯表達(dá)式。
- 等值內(nèi)聯(lián)結(jié),會(huì)返回兩表中符合聯(lián)接條件的所有行組合(動(dòng)態(tài)表關(guān)聯(lián))
//生成兩股數(shù)據(jù)流
String dataStr="create table dataGen_demo(" +
"gid string," +
"type int," +
"price int," +
"ts1 as localtimestamp," +
"watermark for ts1 as ts1-interval '5' second" +
") with (" +
"'connector'='datagen'," +
"'rows-per-second'='1'," +
"'fields.gid.length'='10'," +
"'fields.type.min'='1'," +
"'fields.type.max'='30'," +
"'fields.price.min'='100'," +
"'fields.price.max'='999')";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
tblEnv.executeSql(dataStr);
// tblEnv.sqlQuery("select * from dataGen_demo").execute().print();
String dataStr1="create table dataGen_demo1(" +
"type int," +
"tname string," +
"price int," +
"ts2 as localtimestamp," +
"watermark for ts2 as ts2-interval '5' second" +
") with (" +
"'connector'='datagen'," +
"'rows-per-second'='1'," +
"'fields.tname.length'='10'," +
"'fields.type.kind'='sequence'," +
"'fields.type.start'='1'," +
"'fields.type.end'='50'," +
"'fields.price.min'='300'," +
"'fields.price.max'='400')";
tblEnv.executeSql(dataStr1);
tblEnv.sqlQuery("select * from dataGen_demo inner join dataGen_demo1 on dataGen_demo.type=dataGen_demo1.type").execute().print();
- 等值外聯(lián)結(jié)
left join: 左外連接 ,左表數(shù)據(jù)全部顯示,在內(nèi)存等待數(shù)據(jù)匹配,匹配后刪除原來未匹配的數(shù)據(jù)重新顯示。
right join: 右外連接,右表數(shù)據(jù)全部顯示,在內(nèi)存等待數(shù)據(jù)匹配,匹配后刪除原來未匹配的數(shù)據(jù)重新顯示。
full join:不管數(shù)據(jù)是否匹配,左右表的數(shù)據(jù)全部顯示,不管哪個(gè)表在內(nèi)存中匹配到數(shù)據(jù)都先刪除未匹配的數(shù)據(jù),重新顯示已經(jīng)匹配的數(shù)據(jù)。
tblEnv.sqlQuery("select * from dataGen_demo left join dataGen_demo1 on dataGen_demo.type=dataGen_demo1.type").execute().print();
tblEnv.sqlQuery("select * from dataGen_demo full join dataGen_demo1 on dataGen_demo.type=dataGen_demo1.type").execute().print();
- 時(shí)間間隔聯(lián)接查詢
兩條流的join對(duì)應(yīng)著SQL中兩個(gè)表的join,是流處理中特有的聯(lián)結(jié)方式。
目前 Flink SQL 還不支持窗口聯(lián)結(jié),而間隔聯(lián)結(jié)則已經(jīng)實(shí)現(xiàn),這里除了符合約束條件的兩條中數(shù)據(jù)的笛卡爾積,多了一個(gè)時(shí)間間隔的限制。
具體語(yǔ)法:間隔聯(lián)結(jié)不需要用join關(guān)鍵字,直接在from后將聯(lián)結(jié)兩表列出來的就可以,用逗號(hào)分割。聯(lián)結(jié)條件用where子句來定義,用一個(gè)等值表達(dá)式描述。交叉聯(lián)結(jié)之后用where進(jìn)行條件篩選,效果跟內(nèi)聯(lián)結(jié)inner join... on ... 非常類似,我們可以在where子句中,聯(lián)結(jié)條件后用and追加一個(gè)時(shí)間間隔的限制條件。
String dataStr1="create table dataGen_demo1(" +
"type int," +
"tname string," +
"price int," +
"ts2 as localtimestamp," +
"watermark for ts2 as ts2-interval '5' second" +
") with (" +
"'connector'='datagen'," +
"'rows-per-second'='1'," +
"'fields.tname.length'='10'," +
"'fields.type.kind'='sequence'," +
"'fields.type.start'='1'," +
"'fields.type.end'='50'," +
"'fields.price.min'='300'," +
"'fields.price.max'='400')";
tblEnv.executeSql(dataStr1);
tblEnv.sqlQuery("select * from dataGen_demo d,dataGen_demo1 g where d.type=g.type and d.ts1 between g.ts2-interval '5' second and g.ts2+interval '5' second").execute().print();
11、FlinkSQL Client
Flink提供了SQL Client,有了它我們可以向hive的beeline一樣直接在控制臺(tái)編寫SQL并提交作業(yè)。
Flink SQL client支持運(yùn)行在standalone集群和yarn集群上。提交任務(wù)的命令有所不同。
- Standalone集群(普通模式啟動(dòng))
##啟動(dòng)集群、前提已經(jīng)配置好flink環(huán)境變量
start-cluster.sh
##啟動(dòng)客戶端
sql-client.sh embedded
- Yarn集群
前提要開啟hadoop-yarn大數(shù)據(jù)架構(gòu)。
flink每次啟動(dòng)yarn-session,都會(huì)創(chuàng)建一個(gè)/temp/.yarn-properties-root文件,記錄了最近一次提交的yarn session對(duì)應(yīng)的Application ID。注意:?jiǎn)?dòng)Yarn Session和SQL client必須使用相同的用戶。
##啟動(dòng)YarnSession模式 前提已經(jīng)配置好flink環(huán)境變量
yarn-session.sh -n 3 -jm 1024 -tm 1024
##啟動(dòng)客戶端 必須與上面命令在同一個(gè)服務(wù)器節(jié)點(diǎn)上
sql-client.sh embedded -s yarn-session
## 客戶端控制臺(tái)測(cè)試
select 'hello word'; #測(cè)試連接是否成功
SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name; #測(cè)試數(shù)據(jù)
# client界面執(zhí)行下面命令
# 在專門的界面展示,使用分頁(yè)table格式。可按照界面下方說明,使用快捷鍵前后翻頁(yè)和退出到SQL命令行
SET sql-client.execution.result-mode = table;
# changelog格式展示,可展示數(shù)據(jù)增(I)刪(D)改(U)
SET sql-client.execution.result-mode = changelog;
# 接近傳統(tǒng)數(shù)據(jù)庫(kù)的展示方式,不使用專門界面
SET sql-client.execution.result-mode = tableau;
- 安裝依賴
如果運(yùn)行sql client時(shí),需要使用第三方依賴包時(shí),就需要將項(xiàng)目中用到的依賴放入flink安裝位置的lib目錄下。
例如:flink-connector-kafka_2.11-1.13.2.jar: 讀寫Kafka支持。
12、FlinkSQL 官方文檔
- Table API
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/tableapi/
Table API 是批處理和流處理的統(tǒng)一的關(guān)系型 API。Table API 的查詢不需要修改代碼就可以采用批 輸入或流輸入來運(yùn)行。Table API 是 SQL 語(yǔ)言的超集,并且是針對(duì) Apache Flink 專門設(shè)計(jì)的。 Table API 集成了 Scala,Java 和 Python 語(yǔ)言的 API。Table API 的查詢是使用 Java,Scala 或 Python 語(yǔ)言嵌入的風(fēng)格定義的,有諸如自動(dòng)補(bǔ)全和語(yǔ)法校驗(yàn)的 IDE 支持,而不是像普通 SQL 一樣 使用字符串類型的值來指定查詢。
- SQL API
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sql/overview/
Flink 所支持的 SQL 語(yǔ)言,包括數(shù)據(jù)定義語(yǔ)言(Data Definition Language,DDL)、數(shù)據(jù)操縱語(yǔ)(Data Manipulation Language,DML)以及查詢語(yǔ)言。Flink 對(duì) SQL 的支持基于實(shí)現(xiàn)了 SQL 標(biāo)準(zhǔn)的 Apache Calcite。
13、FlinkSQL函數(shù)
SQL中,我們可以把一些數(shù)據(jù)的轉(zhuǎn)換操作包裝起來,嵌入到SQL查詢中統(tǒng)一調(diào)用,這是函數(shù)。
Flink的Table API和SQL同樣提供了函數(shù)的功能。兩者在調(diào)用時(shí)略有不同:
- Table API中的函數(shù)是通過數(shù)據(jù)對(duì)象的方法調(diào)用來實(shí)現(xiàn)的
- SQL則是直接引用函數(shù)名稱,傳入數(shù)據(jù)作為參數(shù)。
- Table API是內(nèi)嵌在java語(yǔ)言中,很多方法需要在類中額外添加,目前支持的函數(shù)比較少。
官方文檔:
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/functions/overview/
- 函數(shù)類型
Flink 中的函數(shù)有兩個(gè)劃分標(biāo)準(zhǔn):
- 一個(gè)劃分標(biāo)準(zhǔn)是:系統(tǒng)函數(shù)和catalog函數(shù)。
- 一個(gè)劃分標(biāo)準(zhǔn)是臨時(shí)函數(shù)和持久函數(shù)
- 因此提供了4種函數(shù):臨時(shí)性系統(tǒng)函數(shù)、系統(tǒng)函數(shù)、臨時(shí)性catalog函數(shù)、catalog函數(shù)
flink中可以通過精確、模糊兩種引用方式引用函數(shù):精確函數(shù)允許用戶跨catalog、數(shù)據(jù)庫(kù),也就是指定catalog和database函數(shù);模糊函數(shù)不用指定catalog和database使用默認(rèn)catalog和database。
- 系統(tǒng)函數(shù)
系統(tǒng)函數(shù)(System Functions)也叫內(nèi)置函數(shù)(Built-in Functions),是在系統(tǒng)中預(yù)先實(shí)現(xiàn)好的 功能模塊。可以通過固定的函數(shù)名直接調(diào)用,實(shí)現(xiàn)想要的轉(zhuǎn)換操作。又分為兩大類:標(biāo)量函數(shù)和聚合函數(shù)。
函數(shù)分類:標(biāo)量函數(shù)、聚合函數(shù)、時(shí)間間隔單位和時(shí)間點(diǎn)標(biāo)識(shí)符、列函數(shù)
- 標(biāo)量函數(shù):
- 自定義函數(shù)
Flink 的 Table API 和 SQL 提供了多種自定義函數(shù)的接口,以抽象類的形式定義。
當(dāng)前UDF主要有以下幾類:
- 標(biāo)量函數(shù):將輸入的標(biāo)量值轉(zhuǎn)換成一個(gè)新的標(biāo)量值
- 表函數(shù):將標(biāo)量值轉(zhuǎn)換成一個(gè)或多個(gè)新的行數(shù)據(jù),也就是擴(kuò)展成一個(gè)表。
- 聚合函數(shù):將多行數(shù)據(jù)里的標(biāo)量值轉(zhuǎn)換成一個(gè)新的標(biāo)量值。
- 表聚合函數(shù):將多行數(shù)據(jù)里的標(biāo)量值轉(zhuǎn)換成一個(gè)或多個(gè)新的 行數(shù)據(jù)。
- UDF標(biāo)量函數(shù)
自定義方式:需要自定義一個(gè)類來繼承抽象類 ScalarFunction,并實(shí)現(xiàn)叫作 eval() 的求值方法。
標(biāo)量函數(shù)的行為就取決于求值方法的定義,它必須是公有的(public),而且名字必須是 eval。
求值方法 eval 可以重載多次,任何數(shù)據(jù)類型都可作為求值方法的參數(shù)和返回值類型,寫完后將類注冊(cè)到表環(huán)境就可以直接在SQL中調(diào)用了。
- 代碼實(shí)現(xiàn)
import org.apache.flink.table.functions.ScalarFunction;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-13 21:34
*/
//自定義標(biāo)量函數(shù)
public class ScalarUDFDemo extends ScalarFunction {
// 接受任意類型輸入,返回 INT 型輸出 必須使用公共權(quán)限的eval方法
public String eval(String input) {
//字符串連接字符串長(zhǎng)度
return input.concat(String.valueOf(input.length()));
}
}
//創(chuàng)建模擬數(shù)據(jù)
//執(zhí)行環(huán)境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
//執(zhí)行SQL
tableEnvironment.executeSql("CREATE TABLE t_datagen (\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
" 'fields.f_random_str.length'='10'\n" +
")");
// tableEnvironment.sqlQuery("select * from t_datagen").execute().print();
//使用Table API 直接內(nèi)嵌函數(shù)執(zhí)行 第一種方式
// tableEnvironment.from("t_datagen").select(call(ScalarUDFDemo.class, $("f_random_str"))).execute().print();
//第二種方式
tableEnvironment.createTemporarySystemFunction("sfsl",ScalarUDFDemo.class);
tableEnvironment.sqlQuery("select sfsl(f_random_str) from t_datagen").execute().print();
- UDF表值函數(shù)
自定義方式:
要實(shí)現(xiàn)自定義的表函數(shù),需要自定義類來繼承抽象類 TableFunction,內(nèi)部必須要實(shí)現(xiàn)的也 是一個(gè)名為 eval 的求值方法。
與標(biāo)量函數(shù)不同的是,TableFunction 類本身是有一個(gè)泛型參數(shù)T 的,這就是表函數(shù)返回?cái)?shù)據(jù)的類型。
而eval()方法沒有返回類型,內(nèi)部也沒有 return語(yǔ)句,是通過調(diào)用 collect()方法來發(fā)送想要 輸出的行數(shù)據(jù)的。
- 數(shù)據(jù)
1,尋夢(mèng)環(huán)游記,喜劇:8_動(dòng)畫:7_冒險(xiǎn):3_音樂:9_家庭:6
2,至愛梵高,劇情:8_傳記:7_動(dòng)畫:3
3,小丑回魂,劇情:6_兒童:7_恐怖:9
- 案例代碼
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-13 21:53
*/
/**
* Row<type STRING,score INT> 輸出字段名type、score 數(shù)據(jù)類型分別是STRING、INT
*/
@FunctionHint(output = @DataTypeHint("Row<type STRING,score INT>"))
public class UDFTableFunction extends TableFunction<Row> {
//輸入數(shù)據(jù)類型是字符串
/**
* 喜劇:8_動(dòng)畫:7_冒險(xiǎn):3_音樂:9_家庭:6
* @param line
*/
public void eval(String line){
String[] split = line.split("_");
for (String s : split) {
String[] v = s.split(":");
collect(Row.of(v[0],Integer.parseInt(v[1])));
}
}
}
=========================================================================================
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(environment);
//使用FileSystem讀取文件
String fs="create table t_movie(" +
"id int," +
"name string," +
"types string" +
") with (" +
"'connector'='filesystem'," +
"'path'='data/movie.txt'," +
"'format'='csv')";
//sql讀取數(shù)據(jù)
tblEnv.executeSql(fs);
// tblEnv.sqlQuery("select * from t_movie").execute().print();
//Table API
tblEnv
.from("t_movie")
.joinLateral(call(UDFTableFunction.class, $("types")).as("type", "score"))
.select($("id"),$("name"),$("type"),$("score"))
.execute()
.print();
//SQL API
tblEnv.createTemporarySystemFunction("tbl_f",UDFTableFunction.class);
tblEnv.sqlQuery("select id,name,type,score from t_movie ,lateral table(tbl_f(types))").execute().print();
- UDF聚合函數(shù)
自定義方式:
- 自定義聚合函數(shù)需要繼承抽象類 AggregateFunction。
- AggregateFunction 有兩個(gè)泛型參數(shù),T 表示聚合輸出的結(jié)果類型,ACC 則表示聚 合的中間狀態(tài)類型。
- 每個(gè) AggregateFunction 都 必須 實(shí)現(xiàn)以下幾個(gè)方法:
- createAccumulator():這是創(chuàng)建累加器的方法。沒有輸入?yún)?shù),返回類型為累加器類型 ACC
- accumulate(): 這是進(jìn)行聚合計(jì)算的核心方法,每來一行數(shù)據(jù)都會(huì)調(diào)用。它的第一個(gè)參數(shù)是確定 的,就是當(dāng)前的累加器,類型為 ACC,表示當(dāng)前聚合的中間狀態(tài);
- getValue():這是得到最終返回結(jié)果的方法。輸入?yún)?shù)是 ACC 類型的累加器,輸出類型為 T。 在遇到復(fù)雜類型時(shí),F(xiàn)link 的類型推導(dǎo)可能會(huì)無法得到正確的結(jié)果。
- 代碼實(shí)現(xiàn)
package com.zwf.udf;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.AggregateFunction;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-13 22:31
*/
/**
* AggregateFunction<Double, Tuple2<Integer,Integer>> 輸出類型是Double 中間狀態(tài)類型是Tuple2<Integer,Integer>
* 必須要實(shí)現(xiàn)getValue() createAccumulator() accumulate() 三個(gè)方法
*/
public class UDFAggregationDemo extends AggregateFunction<Double, Tuple2<Integer,Integer>> {
/**
* 輸出的函數(shù)邏輯代碼
* @param integerIntegerTuple2
* @return
*/
@Override
public Double getValue(Tuple2<Integer, Integer> integerIntegerTuple2) {
if (integerIntegerTuple2.f0==0){
return 0.0;
}
return integerIntegerTuple2.f0*1.0/integerIntegerTuple2.f1;
}
/**
*
* @return 初始化中間狀態(tài)值
*/
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return Tuple2.of(0,0);
}
//輸入類型是兩個(gè)int類型數(shù)據(jù)
/**
* 如果不加 @FunctionHint(input = {@DataTypeHint("INT"), @DataTypeHint("INT")})注解
* 傳入的字段數(shù)據(jù)類必須有not null的約束
* @param acc
* @param weight
* @param price
*/
@FunctionHint(input = {@DataTypeHint("INT"), @DataTypeHint("INT")})
public void accumulate(Tuple2<Integer,Integer> acc ,Integer weight,Integer price){
acc.f0+=weight*price;
acc.f1+=weight;
}
}
========================================================================================
package com.zwf.flinkSQL;
import com.zwf.udf.UDFAggregationDemo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-13 22:43
*/
public class UDFDemo3 {
public static void main(String[] args) {
//執(zhí)行環(huán)境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
//執(zhí)行SQL
tableEnvironment.executeSql("CREATE TABLE t_order (\n" +
" id INT,\n" +
" type INT,\n" +
" weight INT,\n" +
" price INT\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.id.kind'='sequence',\n" +
" 'fields.id.start'='1',\n" +
" 'fields.id.end'='1000',\n" +
" 'fields.type.min'='1',\n" +
" 'fields.type.max'='3',\n" +
" 'fields.weight.min'='10',\n" +
" 'fields.weight.max'='20',\n" +
" 'fields.price.min'='100',\n" +
" 'fields.price.max'='200'\n" +
")");
tableEnvironment.createTemporarySystemFunction("aggre", UDFAggregationDemo.class);
tableEnvironment.sqlQuery("select type,aggre(weight,price) from t_order group by type").execute().print();
}
}
- UDF表值聚合函數(shù)
用戶自定義表聚合函數(shù)(UDTAGG)可以把一行或多行數(shù)據(jù)(也就是一個(gè)表)聚合成另一張表,結(jié)果表中可以有多行多列。
自定義方式:
- createAccumulator():創(chuàng)建累加器的方法,與 AggregateFunction 中用法相同
- accumulate():聚合計(jì)算的核心方法,與 AggregateFunction 中用法相同
- emitValue():所有輸入行處理完成后,輸出最終計(jì)算結(jié)果的方法。這個(gè)方法對(duì)應(yīng)著 AggregateFunction中的 getValue()方法;區(qū)別在于 emitValue 沒 有輸出類型,而輸入?yún)?shù)有兩個(gè):第一個(gè)是 ACC類型的累加器 第二個(gè)則是用于輸出數(shù)據(jù)的“收集器”out,它的類型為 Collect。
- 代碼
package com.zwf.udf;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-13 22:56
*/
/**
* TableAggregateFunction<out,acc>: out 輸出類型 acc中間值類型
*/
public class TableAggregateUDF extends TableAggregateFunction<String, Tuple3<Integer,Integer,Boolean>> {
/**
* 初始化中間值
* @return
*/
@Override
public Tuple3<Integer, Integer, Boolean> createAccumulator() {
return Tuple3.of(0,0,false);
}
/**
*
* @param acc 中間值
* @param price 輸入值
*/
public void accumulate(Tuple3<Integer,Integer,Boolean> acc,Integer price){
if(price>acc.f0){
acc.f0=price;
acc.f1=acc.f0;
acc.f2=true;
}else if (price>acc.f1){
acc.f1=price;
acc.f2=true;
}else {
acc.f2=false;
}
}
/**
*
* @param acc 中間值
* @param out 輸出集合
*/
public void emitValue(Tuple3<Integer, Integer, Boolean> acc, Collector<String> out){
if(acc.f2){
acc.f2=false;
out.collect("First[" + acc.f0 + "]Second[" + acc.f1 + "]");
}
}
}
=========================================================================================
package com.zwf.flinkSQL;
import com.zwf.udf.TableAggregateUDF;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author MrZeng
* @version 1.0
* @date 2024-01-13 23:06
*/
public class UDFDemo4 {
public static void main(String[] args) {
//執(zhí)行環(huán)境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
//執(zhí)行SQL
tableEnvironment.executeSql("CREATE TABLE t_order (\n" +
" id INT,\n" +
" type INT,\n" +
" price INT\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.id.kind'='sequence',\n" +
" 'fields.id.start'='1',\n" +
" 'fields.id.end'='1000',\n" +
" 'fields.type.min'='1',\n" +
" 'fields.type.max'='3',\n" +
" 'fields.price.min'='100',\n" +
" 'fields.price.max'='200'\n" +
")");
//普通查詢
// tableEnvironment.sqlQuery("select * from t_order").execute().print();
// 注冊(cè)函數(shù)
tableEnvironment.createTemporarySystemFunction("tafop", TableAggregateUDF.class);
tableEnvironment.sqlQuery("select type,tafop(price) from t_order group by type").execute().print();
}
}
14、FlinkSQL CDC
CDC,Change Data Capture變動(dòng)數(shù)據(jù)獲取的簡(jiǎn)稱,使用CDC從數(shù)據(jù)庫(kù)獲取已提交的更改并將這些更改發(fā)送到下游,供下游使用。
- Flink CDC
在以前的數(shù)據(jù)同步中,如果想實(shí)時(shí)獲取數(shù)據(jù)庫(kù)的數(shù)據(jù),一般采用架構(gòu)就是采用第三方工具,比如canal、debezium等,實(shí)時(shí)采集數(shù)據(jù)庫(kù)的變更日志,然后將數(shù)據(jù)發(fā)送到kafka消息隊(duì)列,最后通過其他組件、比如flink、spark等消費(fèi)kafka中的數(shù)據(jù),計(jì)算之后發(fā)送到下游系統(tǒng)。
新架構(gòu)下flink直接消費(fèi)數(shù)據(jù)庫(kù)的增量日志,替代了原來的數(shù)據(jù)采集層,然后直接對(duì)數(shù)據(jù)進(jìn)行計(jì)算, 最后將計(jì)算結(jié)果發(fā)送到下游.
工作原理:?jiǎn)?dòng)MySQL CDC源時(shí),它將獲取一個(gè)全局讀取鎖(FLUSH TABLES WITH READ LOCK),該 鎖將阻止其他數(shù)據(jù)庫(kù)的寫入。然后,它讀取當(dāng)前binlog位置以及數(shù)據(jù)庫(kù)和表的schema之后, 將釋放 全局讀取鎖。然后,它掃描數(shù)據(jù)庫(kù)表并從先前記錄的位置讀取binlog。Flink將定期執(zhí) 行checkpoints以記錄binlog位置。如果發(fā)生故障,作業(yè)將重新啟動(dòng)并從checkpoint完成的 binlog位置恢復(fù)。因此,它保證了僅一次的語(yǔ)義。
優(yōu)點(diǎn):開箱即用,簡(jiǎn)單易上手 減少維護(hù)的組件,簡(jiǎn)化實(shí)時(shí)鏈路,減輕部署成本 減小端到端延遲
- ChangeLOg
Flink SQL 內(nèi)部支持了完整的 changelog 機(jī)制,所以 Flink 對(duì)接 CDC 數(shù)據(jù)只需要把CDC 數(shù)據(jù)轉(zhuǎn)換成 Flink 認(rèn)識(shí)的數(shù)據(jù),以便更好支持和集成 CDC。
重構(gòu)后的 TableSource 輸出的都是 RowData 數(shù)據(jù)結(jié)構(gòu),代表了一行的數(shù)據(jù)。在RowData 上面會(huì) 有一個(gè)元數(shù)據(jù)的信息,我們稱為 RowKind.
RowKind 里面包括了插入、更新前、更新后、刪除,這樣和數(shù)據(jù)庫(kù)里面的 binlog 概念十分類似。
通過 Debezium 采集的 JSON 格式,包含了舊數(shù)據(jù)和新數(shù)據(jù)行以及原數(shù)據(jù)信息,對(duì)接 Debezium JSON 的數(shù)據(jù),其實(shí)就是將這種原始的 JSON 數(shù)據(jù)轉(zhuǎn)換成 Flink 認(rèn)識(shí)的 RowData。
- mysql CDC
官方文檔:
https://github.com/ververica/flink-cdc-connectorsmysql數(shù)據(jù)庫(kù)的數(shù)據(jù)新增或者修改,將實(shí)時(shí)獲取到flink上進(jìn)行計(jì)算處理并傳輸?shù)较掠危?/mark>
目前支持的數(shù)據(jù)庫(kù)有以下:
- Mysql修改配置文件 (vim /etc/my.cnf)
# 服務(wù)器ID
server_id=12345
log_bin=/var/lib/mysql/mysql-bin
expire_logs_days=7
# 必須為ROW
binlog_format=ROW
binlog_cache_size=16M
max_binlog_size=100M
max_binlog_cache_size=256M
relay_log_recovery=1
# 必須為FULL,MySQL-5.7后才有該參數(shù)
binlog_row_image=FULL
expire_logs_days=30
binlog_do_db=scott
- 創(chuàng)建數(shù)據(jù)庫(kù)表
DROP TABLE IF EXISTS `dept`;
CREATE TABLE `dept` (
`deptno` int(11) NOT NULL,
`dname` varchar(255) DEFAULT NULL,
`loc` varchar(255) DEFAULT NULL,
PRIMARY KEY (`deptno`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
--代碼運(yùn)行之后再開始插入數(shù)據(jù)
INSERT INTO `dept` VALUES ('10', 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES ('20', 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES ('30', 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES ('40', 'OPERATIONS', 'BOSTON');
- pom.xml
<!-- Flink CDC 的依賴 -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<!--驅(qū)動(dòng)包版本必須是8.0.27及其以上版本-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
- 代碼實(shí)現(xiàn)
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
//創(chuàng)建表
tableEnvironment.executeSql("CREATE TABLE flink_cdc_dept (\n" +
" deptno INT,\n" +
" dname STRING,\n" +
" loc STRING,\n" +
" PRIMARY KEY(deptno) NOT ENFORCED\n" +
" ) WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '192.168.147.120',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = 'Root@123456.',\n" +
" 'database-name' = 'scott',\n" +
" 'table-name' = 'dept')");
//簡(jiǎn)單查詢
tableEnvironment.sqlQuery("select * from flink_cdc_dept").execute().print();
15、Flink SQL On Hive
Catalog 提供了元數(shù)據(jù)信息,例如數(shù)據(jù)庫(kù)、表、分區(qū)、視圖以及數(shù)據(jù)庫(kù)或其他外部系統(tǒng)中存儲(chǔ)的函 數(shù)和信息。
元數(shù)據(jù)可以是臨時(shí)的,例如臨時(shí)表、或者通過 TableEnvironment 注冊(cè)的 UDF。 元數(shù)據(jù)也可以是持久化的,例如 Hive Metastore 中的元數(shù)據(jù)。 Catalog 提供了一個(gè)統(tǒng)一的API,用于管理元數(shù)據(jù),并使其可以從 Table API 和 SQL 查詢語(yǔ)句中來 訪問。
GenericInMemoryCatalog: 基于內(nèi)存實(shí)現(xiàn),所有元數(shù)據(jù)只在session聲明周期可用。
JdbcCatalog:將flink通過jdbc協(xié)議連接到關(guān)系數(shù)據(jù)庫(kù)。Postgres Catalog 和 MySQL Catalog 是目前 JDBC Catalog 僅有的兩種實(shí)現(xiàn)。
HiveCatalog:作為原生 Flink 元數(shù)據(jù)的持久化存儲(chǔ),以及作為讀寫現(xiàn)有 Hive 元數(shù)據(jù)的接口。
用戶自定義Catalog:編寫類實(shí)現(xiàn)對(duì)應(yīng)的 CatalogFactory 接口來自定義開發(fā)Catalog。
- 連接hive集群
將flink catalog中的元數(shù)據(jù)信息持久化存儲(chǔ)到hive metastore對(duì)應(yīng)的元數(shù)據(jù)庫(kù)中,flink打通hive集成,如同使用spark SQL或者impala操作hive中的數(shù)據(jù)一樣,直接使用flink直接讀寫hive中的表。
- pom.xml
<!-- Flink On Hive-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>1.15.2</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
- 連接hive寄去哪
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "/opt/hive-conf";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
//獲取hive中元數(shù)據(jù)注冊(cè)flink中的catalog。
tableEnv.registerCatalog("myhive", hive);
// set the HiveCatalog as the current catalog of the session
//使用hive中的catalog
tableEnv.useCatalog("myhive")
- 相關(guān)配置參數(shù)參考:
16、Flink SQL查詢優(yōu)化
flink提供了兩種優(yōu)化器:
- RBO(基于規(guī)則的優(yōu)化器)
- CBO(基于成本的優(yōu)化器)
優(yōu)化方案:
- 基于 Apache Calcite 的子查詢解相關(guān)
- 投影下推(Projection Pushdown)
- 分區(qū)剪裁(Partition Prune)
- 謂詞下推(Predicate Pushdown)
- 常量折疊(Constant Folding)
- 子計(jì)劃消除重復(fù)數(shù)據(jù)以避免重復(fù)計(jì)算
- 特殊子查詢重寫:使用left semi-joins left anti-join
- 可選 join 重新排序: 通過 table.optimizer.join-reorder-enabled 啟用
優(yōu)化器不僅基于計(jì)劃,而且還基于可從數(shù)據(jù)源獲得的豐富統(tǒng)計(jì)信息以及每個(gè)算子(例如 io,cpu, 網(wǎng)絡(luò)和內(nèi)存)的細(xì)粒度成本來做出明智的決策。
- 常量折疊(常量替換)
常量折疊:對(duì)sql中的常量的加減乘除等操作進(jìn)行預(yù)計(jì)算,避免執(zhí)行過程頻繁對(duì)常量重復(fù)執(zhí)行加減 乘除計(jì)算: 折疊前:1+2+t1.value;折疊后:3+t1.value.
- 謂詞下推
在from數(shù)據(jù)源中過濾出重要數(shù)據(jù),降低了數(shù)據(jù)的掃描范圍,提升了數(shù)據(jù)庫(kù)查詢的效率!
- 投影下推(列裁剪)
投影下推:可以用來避免加載不需要的字段,只需要查詢出需要查詢的數(shù)據(jù)庫(kù)字段。由于SQL中沒用到,加載多余字段就是浪費(fèi),所以將project操作下推執(zhí)行,就不需要加載無 用字段。而且此時(shí)假如是列存儲(chǔ),只需要加載指定的列,優(yōu)化更大。
- Hash Join
兩表進(jìn)行join時(shí),先把大表中的重要數(shù)據(jù)過濾出來變成小表,然后通過sortmergejoin, hashjoin, boradcasthashjoin,把表中數(shù)據(jù)過濾后再進(jìn)行join,減少笛卡爾積值。
- Transformation Tree
- 性能調(diào)整
MiniBatch 聚合:MiniBatch 聚合的核心思想是將一組輸入的數(shù)據(jù)緩存在聚合算子內(nèi)部的緩沖區(qū)中。當(dāng)輸入的數(shù)據(jù)被觸發(fā)處理時(shí),每個(gè) key 只需一個(gè)操作即可訪問狀態(tài)。這樣可以大大減少狀態(tài)開銷并獲得更好的吞 吐量。但是,這可能會(huì)增加一些延遲,因?yàn)樗鼤?huì)緩沖一些記錄而不是立即處理它們。這是吞吐量和 延遲之間的權(quán)衡。
- Local-Global 聚合
Local-Global 聚合是為解決數(shù)據(jù)傾斜問題提出的,通過將一組聚合分為兩個(gè)階段,首先在上游進(jìn)行 本地聚合,然后在下游進(jìn)行全局聚合,類似于 MapReduce 中的 Combine + Reduce 模式。
- 拆分distinct 聚合
把要去重的字段中的使用hash shuffle打散到不同分區(qū)中進(jìn)行分區(qū),然后進(jìn)行去重字段聚合計(jì)算!
SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
- distinct 聚合過濾
使用filter對(duì)去重的字段進(jìn)行過濾,過濾后去重字段值后最后進(jìn)行分組聚合!
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone'))
AS app_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS
web_uv
FROM T
GROUP BY day
17、SQL時(shí)間日期轉(zhuǎn)換
-- flinksql里面最常用的事情就是時(shí)間格式轉(zhuǎn)換,比如各種時(shí)間格式轉(zhuǎn)換成TIMESTAMP(3).
now() bigint -- CAST(TO_TIMESTAMP(log_time) as TIMESTAMP(3)) ,log_time=now()
localtimestamp timestamp(3)
timestamp -- 不帶括號(hào)數(shù)字表示timestamp(6)
now() 1403006911000 bigint -- 毫秒時(shí)間戳數(shù)值 1528257600000
localtimestamp 1636272032500 timestamp(3) -- 毫秒時(shí)間戳
timestamp(3) 1636272032500 -- 毫秒時(shí)間戳
timestamp(9)
timestamp(6)
TIMESTAMP(9) TO_TIMESTAMP(BIGINT time)
TIMESTAMP(9) TO_TIMESTAMP(STRING time)
TIMESTAMP(9) TO_TIMESTAMP(STRING time, STRING format)
BIGINT TIMESTAMP_TO_MS(TIMTSTAMP time)
BIGINT TIMESTAMP_TO_MS(STRING time, STRING format)
TO_DATE(CAST(LOCALTIMESTAMP AS VARCHAR))
FROM_UNIXTIME(TIMESTAMP_TO_MS(localtimestamp)/1000, ‘yyyy-MM-dd HH:mm:ss’) event_time -- 6點(diǎn)到6點(diǎn)
time_pt as cast(to_timestamp(eventTime - 6 * 3600 * 1000) as TIMESTAMP(3)) -- 偏移6小時(shí)
總結(jié)
以上是生活随笔為你收集整理的FlinkSQL实战开发的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 作为所有类的顶层父类,没想到Object
- 下一篇: 网络地图服务(WMS)详解