Flink的Table API 与SQL的流处理
1 流處理與SQL的區別
??Table API和SQL,本質上還是基于關系型表的操作方式;而關系型表、SQL本身,一般是有界的,更適合批處理的場景。所以在流處理的過程中,有一些特殊概念。
| 處理對象 | 字段元組的有界集合 | 字段元組的無限序列 |
| 查詢對數據的訪問 | 可以訪問完整的數據輸入 | 無法訪問所有數據,必須持續等待流式輸入 |
| 查詢終止條件 | 生成固定大小的結果集后終止 | 永不停止,根據持續收到的數據不斷更新查詢結果 |
??盡管存在這些差異,使用關系查詢和SQL處理流并不是不可能的。高級關系數據庫系統提供了一種稱為物化視圖的特性。物化視圖被定義為SQL查詢,就像常規的虛擬視圖一樣。與虛擬視圖相反,物化視圖緩存查詢的結果,以便在訪問視圖時不需要計算查詢。緩存的一個常見挑戰是防止緩存提供過時的結果。當物化視圖的定義查詢的基表被修改時,物化視圖就會過時。即時視圖維護是一種技術,用于在基本表更新后立即更新物化視圖。
2 動態表
??在Flink Table API概念里有個動態表(Dynamic Tables),動態表隨著新數據的到來不停的在之前的基礎上更新結果。這與傳統的關系型數據庫中的表示是截然不同的,因為流處理的數據是源源不斷的,將流數據轉換成Table,然后進行操作結果也就不是一成不變,而是隨著新數據不斷更新。
??動態表是Flink對流數據的Table API和SQL支持的核心概念。動態表可以像傳統關系型數據庫中的表一樣查詢,查詢一個動態表會產生持續查詢,持續查詢永遠不會終止,還會生成另一個動態表,查詢操作會不斷更新動態結果表,反應動態數據表的更改。
??本質上,動態表上的連續查詢與定義物化視圖的查詢非常相似,注意:連續查詢的結果在語義上總是等價于在輸入表的快照上以批處理模式執行相同查詢的結果。
??下圖可視化了流、動態表和連續查詢之間的關系:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-bcLFveIX-1596812198844)(C:\資料\flink\筆記\8 Table API與SQL\assets\1596357675358.png)]
??流式持續查詢的過程為:①流被轉換為動態表②對動態表計算連續查詢,生成新的動態表③ 生成的動態表被轉換回流。
??注意:動態表首先是一個邏輯概念。動態表不一定在查詢執行期間物化
3 將流轉換成表
??為了處理帶有關系查詢的流,必須先將其轉換為表。從概念上講,流的每個數據記錄,都被解釋為對結果表的插入(Insert)修改。因為流式持續不斷的,而且之前的輸出結果無法改變。本質上,我們其實是從一個、只有插入操作的changelog(更新日志)流,來構建一個表。
??使用具有以下模式的單擊事件流解釋動態表和連續查詢的概念:
[user: VARCHAR, // the name of the usercTime: TIMESTAMP, // the time when the URL was accessedurl: VARCHAR // the URL that was accessed by the user ]??下圖可視化了單擊事件流(左側)如何轉換為表(右側)。隨著插入更多單擊流的記錄,結果表不斷增長。
??注意:定義在流上的表在內部不是物化的
4 持續查詢
??對動態表進行持續查詢,并生成新的動態表作為結果。與批處理查詢不同,持續查詢從不根據輸入表上的更新終止而更新其結果表。在任何時間點,持續查詢的結果在語義上等同于輸入表快照上以批處理模式執行的同一查詢的結果。
??在下面的示例中,我們將在clicks表,該表是在單擊事件流上定義的。第一個查詢是一個簡單的GROUP BY COUNT聚合查詢,它將clicks表上user字段,并計算訪問的URL數。下面的圖顯示了查詢是如何在一段時間內作為clicks表使用其他行更新。
??當查詢開始時,clicks表(左側)示空的,這個查詢開始計算結果表,當第一行[Mary, ./home]插入到clicks表,結果表(右側)由一行組成[Mary, 1];當第二行[Bob, ./cart]插入到clicks表,結果表(右側)插入一個新的行[Bob, 1];當第三行[Mary, ./prod?id=1]`插入到clicks表,生成已計算結果行的更新[Mary, 1]更新成[Mary, 2];最后[Liz, 1]插入到結果表,當第四行追加到clicks表中。
??第二個查詢類似于第一個查詢,但將clicks表之外的user屬性的每小時滾動在計算URL數量之前(基于時間的計算(如windows)是基于特殊的)。同樣,圖顯示了不同時間點的輸入和輸出,以可視化動態表的變化性質。
??和前面一樣,輸入表clicks顯示在左邊。查詢每小時連續計算結果并更新結果表。在12:00:00和12:59:59clicks表包含帶有時間戳的四行(cTime)。該查詢從此輸入計算兩個結果行(每個輸入一行),并將它們附加到結果表中。之間的下一個窗口13:00:00和13:59:59,clicks表包含三行,這將導致將另兩行附加到結果表。隨著時間的推移,結果表將被更新,因為會追加更多的行。
??雖然上面兩個示例查詢看起來非常相似(兩者都計算分組計數聚合),但它們在一個重要方面有所不同:
??①第一個查詢更新先前發出的結果,即定義結果表的Changelog流包含INSERT和UPDATE改變;
??②第二個查詢僅附加到結果表,即結果表的Changelog流僅由INSERT改變
??查詢是生成僅附加的表還是更新的表有一些含義:
??①產生更新更改的查詢通常需要維護更多的狀態
??②將僅追加的表轉換為流與對更新的表的轉換不同
5 將動態表轉換成流
??與常規的數據庫表一樣,動態表可以通過插入(Insert)、更新(Update)和刪除(Delete)更改,進行持續的修改。將動態表轉換為流或將其寫入外部系統時,需要對這些更改進行編碼。Flink的Table API和SQL支持三種方式對動態表的更改進行編碼:
5.1 僅追加(Append-only)
??僅通過插入(Insert)更改,來修改的動態表,可以直接轉換為“僅追加”流。這個流中發出的數據,就是動態表中新增的每一行。
5.2 撤回(Retract)
??Retract流是包含兩類消息的流,添加(Add)消息和撤回(Retract)消息。
??動態表通過將INSERT 編碼為add消息、DELETE 編碼為retract消息、UPDATE編碼為被更改行(前一行)的retract消息和更新后行(新行)的add消息,轉換為retract流。
下圖顯示了將動態表轉換為Retract流的過程
5.3 Upsert(更新插入)
??Upsert流包含兩種類型的消息:Upsert消息和delete消息。轉換為upsert流的動態表,**需要有唯一的鍵(key)。**通過將INSERT和UPDATE更改編碼為upsert消息,將DELETE更改編碼為DELETE消息,就可以將具有唯一鍵(Unique Key)的動態表轉換為流。
??下圖顯示了將動態表轉換為upsert流的過程:
??注意:在代碼里將動態表轉換為DataStream時,僅支持Append和Retract流。而向外部系統輸出動態表的TableSink接口,則可以有不同的實現。
6 時間特性
??基于時間的操作(如Table API和SQL中窗口操作),需要定義相關的時間語義和時間數據來源的信息。所以,Table可以提供一個邏輯上的時間字段,用于在表處理程序中,指示時間和訪問相應的時間戳。
??時間屬性,可以是每個表schema的一部分。一旦定義了時間屬性,它就可以作為一個字段引用,并且可以在基于時間的操作中使用。**只要時間屬性沒有被修改,只是從查詢的一個部分轉發到另一個部分,它仍然是一個有效的時間屬性。**時間屬性的行為類似于常規時間戳,可以訪問,并且進行計算。如果在計算中使用時間屬性,則它將被物化并成為常規時間戳。常規時間戳不配合Flink的時間和水印系統,因此不能再用于基于時間的操作。
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default// alternatively: // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)6.1 處理時間(Processing Time)
??處理時間語義下,允許表處理程序根據機器的本地時間生成結果。它是時間的最簡單概念。它既不需要提取時間戳,也不需要生成watermark。
??定義處理時間屬性有三種方法:在DataStream轉化時直接指定;在定義Table Schema時指定;在創建表的DDL中指定。
6.1.1 創建表的DDL中指定
??在創建表的DDL中,增加一個字段并指定成proctime,也可以指定當前的時間字段。代碼如下:
CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time AS PROCTIME() -- declare an additional field as a processing time attribute ) WITH ('connector.type' = 'filesystem','connector.path' = 'file:///D:\\..\\sensor.txt','format.type' = 'csv' );SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) FROM user_actions GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);??注意:運行這段DDL,必須使用Blink Planner
6.1.2 DataStream轉化成Table時指定
??由DataStream轉換成表時,可以在后面指定字段名來定義Schema。在定義Schema期間,可以使用.proctime,定義處理時間字段。
注意,這個proctime屬性只能通過附加邏輯字段,來擴展物理schema。因此,只能在schema定義的末尾定義它。
val stream: DataStream[(String, String)] = ...// declare an additional logical field as a processing time attribute val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'user_name, 'data, 'user_action_time.proctime)val windowedTable = table.window(Tumble over 10.minutes on 'user_action_time as 'userActionWindow)6.1.3 定義Table Schema時指定
??只要在定義Schema的時候,加上一個新的字段,并指定成proctime就可以了。
tableEnv.connect(new FileSystem().path("file_path")).withFormat(new Csv()).withSchema(new Schema().field("user_name", DataTypes.STRING()).field("UserActionTimestamp", DataTypes.BIGINT()).field("data", DataTypes.STRING()).field("pt", DataTypes.TIMESTAMP(3)).proctime() // 指定 pt字段為處理時間) // 定義表結構.createTemporaryTable("inputTable") // 創建臨時表6.2 事件時間(Event Time)
??事件時間語義,允許表處理程序根據每個記錄中包含的時間生成結果。這樣即使在有亂序事件或者延遲事件時,也可以獲得正確的結果。
??為了處理無序事件,并區分流中的準時和遲到事件;Flink需要從事件數據中,提取時間戳,并用來推進事件時間的進展(watermark)。事件時間屬性可以在CREATE TABLE DDL中定義,也可以在Datastream到表轉換期間或者通過使用TableSource來定義。
6.2.1 創建表的DDL中指定
??事件時間屬性,是使用CREATE TABLE DDL中的WARDMARK語句定義的。watermark語句,定義現有事件時間字段上的watermark生成表達式,該表達式將事件時間字段標記為事件時間屬性。
CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategyWATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND ) WITH (... );SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) FROM user_actions GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);6.2.2 DataStream轉化成Table時指定
??在DataStream轉換成Table,schema的定義期間,使用.rowtime可以定義事件時間屬性。必須在轉換的數據流中分配時間戳和watermark。
??在將數據流轉換為表時,有兩種定義時間屬性的方法。根據指定的.rowtime字段名是否存在于數據流的架構中,timestamp字段可以作為新字段追加到schema替換現有字段
??在這兩種情況下,定義的事件時間戳字段,都將保存DataStream中事件時間戳的值。
// Option 1:// extract timestamp and assign watermarks based on knowledge of the stream val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)// declare an additional logical field as an event time attribute val table = tEnv.fromDataStream(stream, 'user_name, 'data, 'user_action_time.rowtime)// Option 2:// extract timestamp from first field, and assign watermarks based on knowledge of the stream val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)// the first field has been used for timestamp extraction, and is no longer necessary // replace first field with a logical event time attribute val table = tEnv.fromDataStream(stream, 'user_action_time.rowtime, 'user_name, 'data)// Usage:val windowedTable = table.window(Tumble over 10.minutes on 'user_action_time as 'userActionWindow)6.2.3 定義Table Schema時指定
?這種方法只要在定義Schema的時候,將事件時間字段,并指定成rowtime就可以了。
tableEnv.connect(new FileSystem().path("flie_path")).withFormat(new Csv()).withSchema(new Schema().field("user_name", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).rowtime(new Rowtime().timestampsFromField("timestamp") // 從字段中提取時間戳.watermarksPeriodicBounded(1000) // watermark延遲1秒).field("data", DataTypes.STRING())) // 定義表結構.createTemporaryTable("inputTable") // 創建臨時表7 窗口
??時間語義要配合窗口操作,根據時間段做計算了才能發揮作用。在Table API和SQL中,主要有兩種窗口:Group Windows(分組窗口)和Over Windows。
7.1 Table中的Group Windows
??Group Windows會根據時間或行計數間隔,將行聚合到有限的組(Group)中,并對每個組的數據執行一次聚合函數。
??Table API中的Group Windows都是使用.window(w:GroupWindow)子句定義的,必須由as子句指定一個別名。為了按窗口對表進行分組,窗口的別名必須在group by子句中,像常規的分組字段一樣引用。
val table = input.window([w: GroupWindow] as 'w) // 定義窗口,別名 w.groupBy('w, 'a) // 以屬性a和窗口w作為分組的key .select('a, 'b.sum) // 聚合字段b的值,求和??也可以把窗口的相關信息,作為字段添加到結果表中:
val table = input.window([w: GroupWindow] as 'w) .groupBy('w, 'a) .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count)??Table API提供了一組具有特定語義的預定義Window類,這些類會被轉換為底層DataStream或DataSet的窗口操作。Table API支持的窗口定義,主要也是三種:滾動(Tumbling)、滑動(Sliding)和會話(Session)。
7.1.1 滾動窗口
滾動窗口(Tumbling windows)要用Tumble類來定義,另外還有三個方法:①over:定義窗口長度;②on:用來分組(按時間間隔)或者排序(按行數)的時間字段;③as:別名,必須出現在后面的groupBy中
// Tumbling Event-time Window(事件時間字段rowtime) .window(Tumble over 10.minutes on 'rowtime as 'w)// Tumbling Processing-time Window(處理時間字段proctime) .window(Tumble over 10.minutes on 'proctime as 'w)// Tumbling Row-count Window (類似于計數窗口,按處理時間排序,10行一組) .window(Tumble over 10.rows on 'proctime as 'w)7.1.2 滑動窗口
??滑動窗口(Sliding windows)要用Slide類來定義,另外還有四個方法:①over:定義窗口長度②every:定義滑動步長③on:用來分組(按時間間隔)或者排序(按行數)的時間字段④as:別名,必須出現在后面的groupBy中
// Sliding Event-time Window .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)// Sliding Processing-time window .window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)// Sliding Row-count window .window(Slide over 10.rows every 5.rows on 'proctime as 'w)7.1.3 會話窗口
??會話窗口(Session windows)要用Session類來定義,另外還有三個方法:①withGap:會話時間間隔②on:用來分組(按時間間隔)或者排序(按行數)的時間字段③as:別名,必須出現在后面的groupBy中
// Session Event-time Window .window(Session withGap 10.minutes on 'rowtime as 'w)// Session Processing-time Window .window(Session withGap 10.minutes on 'proctime as 'w)7.2 Table中的Over Windows
??Over window聚合是標準SQL中已有的(Over子句),可以在查詢的SELECT子句中定義。Over window 聚合,會針對每個輸入行,計算相鄰行范圍內的聚合。Over windows使用.window(w:overwindows*)子句定義,并在select()方法中通過別名來引用。
val table = input.window([w: OverWindow] as 'w).select('a, 'b.sum over 'w, 'c.min over 'w)??Table API提供了Over類,來配置Over窗口的屬性。可以在事件時間或處理時間,以及指定為時間間隔、或行計數的范圍內,定義Over windows。無界的over window是使用常量指定的。也就是說,時間間隔要指定UNBOUNDED_RANGE,或者行計數間隔要指定UNBOUNDED_ROW。而有界的over window是用間隔的大小指定的。
??(1) 無界的 over window
// 無界的事件時間over window (時間字段 "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)//無界的處理時間over window (時間字段"proctime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)// 無界的事件時間Row-count over window (時間字段 "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)//無界的處理時間Row-count over window (時間字段 "rowtime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)??(2)有界的 over window
// 有界的事件時間over window (時間字段 "rowtime",之前1分鐘) .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)// 有界的處理時間over window (時間字段 "rowtime",之前1分鐘) .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)// 有界的事件時間Row-count over window (時間字段 "rowtime",之前10行) .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)// 有界的處理時間Row-count over window (時間字段 "rowtime",之前10行) .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)7.3 SQL中的Group Windows
??Group Windows在SQL查詢的Group BY子句中定義。與使用常規GROUP BY子句的查詢一樣,使用GROUP BY子句的查詢會計算每個組的單個結果行。
??SQL支持以下Group窗口函數:
TUMBLE(time_attr, interval)??定義一個滾動窗口,第一個參數是時間字段,第二個參數是窗口長度。
HOP(time_attr, interval, interval)??定義一個滑動窗口,第一個參數是時間字段,第二個參數是窗口滑動步長,第三個是窗口長度。
SESSION(time_attr, interval)??另外還有一些輔助函數,可以用來選擇Group Window的開始和結束時間戳,以及時間屬性。
TUMBLE_START(time_attr, interval)TUMBLE_END(time_attr, interval)TUMBLE_ROWTIME(time_attr, interval)TUMBLE_PROCTIME(time_attr, interval)7.4 SQL中的Over Windows
??由于Over本來就是SQL內置支持的語法,所以這在SQL中屬于基本的聚合操作。所有聚合必須在同一窗口上定義,也就是說,必須是相同的分區、排序和范圍。目前僅支持在當前行范圍之前的窗口(無邊界和有邊界)。
??注意,ORDER BY必須在單一的時間屬性上指定。
SELECT COUNT(amount) OVER (PARTITION BY userORDER BY proctimeROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM Orders// 也可以做多個聚合 SELECT COUNT(amount) OVER w, SUM(amount) OVER w FROM Orders WINDOW w AS (PARTITION BY userORDER BY proctimeROWS BETWEEN 2 PRECEDING AND CURRENT ROW)總結
以上是生活随笔為你收集整理的Flink的Table API 与SQL的流处理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Educational Codeforc
- 下一篇: 计算机简单故障时的排除方法,电脑简单故障