1.18.5.流式概念、动态表(Dynamic Table)、DataStream上的关系查询、动态表 连续查询(Continuous Query)、在流上定义表、处理时间
1.18.5.流式概念
1.18.5.1.動(dòng)態(tài)表(Dynamic Table)
1.18.5.1.1.DataStream上的關(guān)系查詢
1.18.5.1.2.動(dòng)態(tài)表 & 連續(xù)查詢(Continuous Query)
1.18.5.1.3.在流上定義表
1.18.5.1.4.連續(xù)查詢
1.18.5.1.5.處理時(shí)間
1.18.5.1.5.1.在創(chuàng)建表的DDL中定義
1.18.5.1.5.2.在DataStream到Table轉(zhuǎn)換時(shí)定義
1.18.5.1.5.3.使用TableSource定義
1.18.5.1.5.4.更新和追加查詢
1.18.5.1.5.5.查詢限制
1.18.5.1.5.6.表到流的轉(zhuǎn)換
1.18.5.流式概念
1.18.5.1.動(dòng)態(tài)表(Dynamic Table)
SQL 和關(guān)系代數(shù)在設(shè)計(jì)時(shí)并未考慮流數(shù)據(jù)。因此,在關(guān)系代數(shù)(和 SQL)之間幾乎沒有概念上的差異。
本文會(huì)討論這種差異,并介紹 Flink 如何在無界數(shù)據(jù)集上實(shí)現(xiàn)與數(shù)據(jù)庫引擎在有界數(shù)據(jù)上的處理具有相同的語義。
1.18.5.1.1.DataStream上的關(guān)系查詢
下表比較了傳統(tǒng)的關(guān)系代數(shù)和流處理與輸入數(shù)據(jù)、執(zhí)行和輸出結(jié)果的關(guān)系。
| 關(guān)系(或表)是有界(多)元組集合。 | 流是一個(gè)無限元組序列。 |
| 對(duì)批數(shù)據(jù)(例如關(guān)系數(shù)據(jù)庫中的表)執(zhí)行的查詢可以訪問完整的輸入數(shù)據(jù)。 | 流式查詢?cè)趩?dòng)時(shí)不能訪問所有數(shù)據(jù),必須”等待”數(shù)據(jù)流入。 |
| 批處理查詢?cè)诋a(chǎn)生固定大小的結(jié)果后終止。 | 流查詢不斷地根據(jù)接收到的記錄更新其結(jié)果,并且始終不會(huì)結(jié)束。 |
盡管存在這些差異,但是使用關(guān)系查詢和 SQL 處理流并不是不可能的。高級(jí)關(guān)系數(shù)據(jù)庫系統(tǒng)提供了一個(gè)稱為 物化視圖(Materialized Views) 的特性。物化視圖被定義為一條 SQL 查詢,就像常規(guī)的虛擬視圖一樣。與虛擬視圖相反,物化視圖緩存查詢的結(jié)果,因此在訪問視圖時(shí)不需要對(duì)查詢進(jìn)行計(jì)算。緩存的一個(gè)常見難題是防止緩存為過期的結(jié)果提供服務(wù)。當(dāng)其定義查詢的基表被修改時(shí),物化視圖將過期。 即時(shí)視圖維護(hù)(Eager View Maintenance) 是一種一旦更新了物化視圖的基表就立即更新視圖的技術(shù)。
如果我們考慮以下問題,那么即時(shí)視圖維護(hù)和流上的SQL查詢之間的聯(lián)系就會(huì)變得顯而易見:
?數(shù)據(jù)庫表是 INSERT、UPDATE 和 DELETE DML 語句的 stream 的結(jié)果,通常稱為 changelog stream。
?物化視圖被定義為一條 SQL 查詢。為了更新視圖,查詢不斷地處理視圖的基本關(guān)系的changelog流。
?物化視圖是流式SQL查詢的結(jié)果。
了解了這些要點(diǎn)之后,我們將在下一節(jié)中介紹 動(dòng)態(tài)表(Dynamic tables) 的概念。
1.18.5.1.2.動(dòng)態(tài)表 & 連續(xù)查詢(Continuous Query)
動(dòng)態(tài)表 是 Flink 的支持流數(shù)據(jù)的 Table API 和 SQL 的核心概念。與表示批處理數(shù)據(jù)的靜態(tài)表不同,動(dòng)態(tài)表是隨時(shí)間變化的。可以像查詢靜態(tài)批處理表一樣查詢它們。查詢動(dòng)態(tài)表將生成一個(gè) 連續(xù)查詢 。一個(gè)連續(xù)查詢永遠(yuǎn)不會(huì)終止,結(jié)果會(huì)生成一個(gè)動(dòng)態(tài)表。查詢不斷更新其(動(dòng)態(tài))結(jié)果表,以反映其(動(dòng)態(tài))輸入表上的更改。本質(zhì)上,動(dòng)態(tài)表上的連續(xù)查詢非常類似于定義物化視圖的查詢。
需要注意的是,連續(xù)查詢的結(jié)果在語義上總是等價(jià)于以批處理模式在輸入表快照上執(zhí)行的相同查詢的結(jié)果。
下圖顯示了流、動(dòng)態(tài)表和連續(xù)查詢之間的關(guān)系:
1.將流轉(zhuǎn)換為動(dòng)態(tài)表。
2.在動(dòng)態(tài)表上計(jì)算一個(gè)連續(xù)查詢,生成一個(gè)新的動(dòng)態(tài)表。
3.生成的動(dòng)態(tài)表被轉(zhuǎn)換回流。
注意: 動(dòng)態(tài)表首先是一個(gè)邏輯概念。在查詢執(zhí)行期間不一定(完全)物化動(dòng)態(tài)表。
在下面,我們將解釋動(dòng)態(tài)表和連續(xù)查詢的概念,并使用具有以下模式的單擊事件流:
1.18.5.1.3.在流上定義表
為了使用關(guān)系查詢處理流,必須將其轉(zhuǎn)換成 Table。從概念上講,流的每條記錄都被解釋為對(duì)結(jié)果表的 INSERT 操作。本質(zhì)上我們正在從一個(gè) INSERT-only 的 changelog 流構(gòu)建表。
下圖顯示了單擊事件流(左側(cè))如何轉(zhuǎn)換為表(右側(cè))。當(dāng)插入更多的單擊流記錄時(shí),結(jié)果表將不斷增長。
**注意:**在流上定義的表內(nèi)部沒有物化。
1.18.5.1.4.連續(xù)查詢
在動(dòng)態(tài)表上計(jì)算一個(gè)連續(xù)查詢,并生成一個(gè)新的動(dòng)態(tài)表。與批處理查詢不同,連續(xù)查詢從不終止,并根據(jù)其輸入表上的更新更新其結(jié)果表。在任何時(shí)候,連續(xù)查詢的結(jié)果在語義上與以批處理模式在輸入表快照上執(zhí)行的相同查詢的結(jié)果相同。
在接下來的代碼中,我們將展示 clicks 表上的兩個(gè)示例查詢,這個(gè)表是在點(diǎn)擊事件流上定義的。
第一個(gè)查詢是一個(gè)簡單的 GROUP-BY COUNT 聚合查詢。它基于 user 字段對(duì) clicks 表進(jìn)行分組,并統(tǒng)計(jì)訪問的 URL 的數(shù)量。下面的圖顯示了當(dāng)clicks表被附加的行更新時(shí),查詢是如何被評(píng)估的。
當(dāng)查詢開始,clicks 表(左側(cè))是空的。當(dāng)?shù)谝恍袛?shù)據(jù)被插入到 clicks 表時(shí),查詢開始計(jì)算結(jié)果表。第一行數(shù)據(jù) [Mary,./home] 插入后,結(jié)果表(右側(cè),上部)由一行 [Mary, 1] 組成。當(dāng)?shù)诙?[Bob, ./cart] 插入到 clicks 表時(shí),查詢會(huì)更新結(jié)果表并插入了一行新數(shù)據(jù) [Bob, 1]。第三行 [Mary, ./prod?id=1] 將產(chǎn)生已計(jì)算的結(jié)果行的更新,[Mary, 1] 更新成 [Mary, 2]。最后,當(dāng)?shù)谒男袛?shù)據(jù)加入 clicks 表時(shí),查詢將第三行 [Liz, 1] 插入到結(jié)果表中。
第二條查詢與第一條類似,但是除了用戶屬性之外,還將 clicks 分組至每小時(shí)滾動(dòng)窗口中,然后計(jì)算 url 數(shù)量(基于時(shí)間的計(jì)算,例如基于特定時(shí)間屬性的窗口,后面會(huì)討論)。同樣,該圖顯示了不同時(shí)間點(diǎn)的輸入和輸出,以可視化動(dòng)態(tài)表的變化特性。
與前面一樣,左邊顯示了輸入表 clicks。查詢每小時(shí)持續(xù)計(jì)算結(jié)果并更新結(jié)果表。clicks表包含四行帶有時(shí)間戳(cTime)的數(shù)據(jù),時(shí)間戳在 12:00:00 和 12:59:59 之間。查詢從這個(gè)輸入計(jì)算出兩個(gè)結(jié)果行(每個(gè) user 一個(gè)),并將它們附加到結(jié)果表中。對(duì)于 13:00:00 和 13:59:59 之間的下一個(gè)窗口,clicks 表包含三行,這將導(dǎo)致另外兩行被追加到結(jié)果表。隨著時(shí)間的推移,更多的行被添加到 click 中,結(jié)果表將被更新。
1.18.5.1.5.處理時(shí)間
處理時(shí)間是基于機(jī)器的本地時(shí)間來處理數(shù)據(jù),它是最簡單的一種時(shí)間概念,但是它不能提供確定性。它既不需要從數(shù)據(jù)里獲取時(shí)間,也不需要生成 watermark。
共三種方法可以定義處理時(shí)間。
1.18.5.1.5.1.在創(chuàng)建表的DDL中定義
處理時(shí)間屬性可以在創(chuàng)建表的DDL中用計(jì)算列的方式定義,用PROCTIME()就可以定義處理時(shí)間。關(guān)于計(jì)算列,更多信息可以參考:CREATE TABLE DDL (https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sql/create.html#create-table)
CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time AS PROCTIME() -- 聲明一個(gè)額外的列作為處理時(shí)間屬性 ) WITH (... );SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) FROM user_actions GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);1.18.5.1.5.2.在DataStream到Table轉(zhuǎn)換時(shí)定義
處理時(shí)間屬性可以在 schema 定義的時(shí)候用 .proctime 后綴來定義。時(shí)間屬性一定不能定義在一個(gè)已有字段上,所以它只能定義在 schema 定義的最后。
Java代碼:
package com.toto.demo.sql;import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.*; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.lit;public class Demo {public static void main(String[] args) {StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);DataStream<Tuple2<String, String>> stream = ...;//聲明一個(gè)額外的字段作為時(shí)間屬性字段Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"),$("user_action_time").proctime());GroupWindowedTable windowedTable = table.window(Tumble.over(lit(10).minute()).on($("user_action_time")).as("userActionWindow"));}}Scala代碼:
package com.toto.learn.sqlimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.{EnvironmentSettings, Tumble} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject Demo {def main(args: Array[String]): Unit = {val bsEnv = StreamExecutionEnvironment.getExecutionEnvironmentval bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tEnv = StreamTableEnvironment.create(bsEnv, bsSettings)// or val bsTableEnv = TableEnvironment.create(bsSettings)val stream: DataStream[(String, String)] = ...// 聲明一個(gè)額外的字段作為時(shí)間屬性字段val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", $"data", $"user_action_time".proctime)val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")}}1.18.5.1.5.3.使用TableSource定義
處理時(shí)間屬性可以在實(shí)現(xiàn)了 DefinedProctimeAttribute 的 TableSource 中定義。邏輯的時(shí)間屬性會(huì)放在 TableSource 已有物理字段的最后。
// 定義一個(gè)由處理時(shí)間屬性的 table source public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {@Overridepublic TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name" , "data"};TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};return Types.ROW(names, types);}@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {// create streamDataStream<Row> stream = ...;return stream;}@Overridepublic String getProctimeAttribute() {// 這個(gè)名字的列會(huì)被追加到最后,作為第三列return "user_action_time";} }// register table source tEnv.registerTableSource("user_actions", new UserActionSource());WindowedTable windowedTable = tEnv.from("user_actions").window(Tumble.over(lit(10).minutes()).on($("user_action_time")).as("userActionWindow"));Scala代碼:
// 定義一個(gè)由處理時(shí)間屬性的 table source class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {override def getReturnType = {val names = Array[String]("user_name" , "data")val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)Types.ROW(names, types)}override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {// create streamval stream = ...stream}override def getProctimeAttribute = {// 這個(gè)名字的列會(huì)被追加到最后,作為第三列"user_action_time"} }// register table source tEnv.registerTableSource("user_actions", new UserActionSource)val windowedTable = tEnv.from("user_actions").window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")1.18.5.1.5.4.更新和追加查詢
雖然這兩個(gè)示例查詢看起來非常相似(都計(jì)算分組計(jì)數(shù)聚合),但它們?cè)谝粋€(gè)重要方面不同:
?第一個(gè)查詢更新先前輸出的結(jié)果,即定義結(jié)果表的 changelog 流包含 INSERT 和 UPDATE 操作。
?第二個(gè)查詢只附加到結(jié)果表,即結(jié)果表的 changelog 流只包含 INSERT 操作。
一個(gè)查詢是產(chǎn)生一個(gè)只追加的表還是一個(gè)更新的表有一些含義:
?產(chǎn)生更新更改的查詢通常必須維護(hù)更多的狀態(tài)(請(qǐng)參閱以下部分)。
?將 append-only 的表轉(zhuǎn)換為流與將已更新的表轉(zhuǎn)換為流是不同的(參閱表到流的轉(zhuǎn)換章節(jié))。
1.18.5.1.5.5.查詢限制
許多(但不是全部)語義上有效的查詢可以作為流上的連續(xù)查詢進(jìn)行評(píng)估。有些查詢代價(jià)太高而無法計(jì)算,這可能是由于它們需要維護(hù)的狀態(tài)大小,也可能是由于計(jì)算更新代價(jià)太高。
狀態(tài)大小:連續(xù)查詢?cè)跓o界流上計(jì)算,通常應(yīng)該運(yùn)行數(shù)周或數(shù)月。因此,連續(xù)查詢處理的數(shù)據(jù)總量可能非常大。必須更新先前輸出的結(jié)果的查詢需要維護(hù)所有輸出的行,以便能夠更新它們。例如,第一個(gè)查詢示例需要存儲(chǔ)每個(gè)用戶的 URL 計(jì)數(shù),以便能夠增加該計(jì)數(shù)并在輸入表接收新行時(shí)發(fā)送新結(jié)果。如果只跟蹤注冊(cè)用戶,則要維護(hù)的計(jì)數(shù)數(shù)量可能不會(huì)太高。但是,如果未注冊(cè)的用戶分配了一個(gè)惟一的用戶名,那么要維護(hù)的計(jì)數(shù)數(shù)量將隨著時(shí)間增長,并可能最終導(dǎo)致查詢失敗。
計(jì)算更新:有些查詢需要重新計(jì)算和更新大量已輸出的結(jié)果行,即使只添加或更新一條輸入記錄。顯然,這樣的查詢不適合作為連續(xù)查詢執(zhí)行。下面的查詢就是一個(gè)例子,它根據(jù)最后一次單擊的時(shí)間為每個(gè)用戶計(jì)算一個(gè) RANK。一旦 click 表接收到一個(gè)新行,用戶的 lastAction 就會(huì)更新,并必須計(jì)算一個(gè)新的排名。然而,由于兩行不能具有相同的排名,所以所有較低排名的行也需要更新。
SELECT user, RANK() OVER (ORDER BY lastAction) FROM (SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user );查詢配置(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/query_configuration.html)章節(jié)討論了控制連續(xù)查詢執(zhí)行的參數(shù)。一些參數(shù)可以用來在維持狀態(tài)的大小和獲得結(jié)果的準(zhǔn)確性之間做取舍。
1.18.5.1.5.6.表到流的轉(zhuǎn)換
動(dòng)態(tài)表可以像普通數(shù)據(jù)庫表一樣通過 INSERT、UPDATE 和 DELETE 來不斷修改。它可能是一個(gè)只有一行、不斷更新的表,也可能是一個(gè) insert-only 的表,沒有 UPDATE 和 DELETE 修改,或者介于兩者之間的其他表。
在將動(dòng)態(tài)表轉(zhuǎn)換為流或?qū)⑵鋵懭胪獠肯到y(tǒng)時(shí),需要對(duì)這些更改進(jìn)行編碼。Flink的 Table API 和 SQL 支持三種方式來編碼一個(gè)動(dòng)態(tài)表的變化:
Append-only 流:僅通過INSERT操作修改的動(dòng)態(tài)表可以通過輸出插入的行轉(zhuǎn)換為流。
Retract流:retract 流包含兩種類型的 message: add messages 和 retract messages 。通過將INSERT 操作編碼為 add message、將 DELETE 操作編碼為 retract message、將 UPDATE 操作編碼為更新(先前)行的 retract message 和更新(新)行的 add message,將動(dòng)態(tài)表轉(zhuǎn)換為 retract 流。下圖顯示了將動(dòng)態(tài)表轉(zhuǎn)換為 retract 流的過程。
Upsert流:upsert 流包含兩種類型的 message: upsert messages 和delete messages。轉(zhuǎn)換為 upsert 流的動(dòng)態(tài)表需要(可能是組合的)唯一鍵。通過將 INSERT 和 UPDATE 操作編碼為 upsert message,將 DELETE 操作編碼為 delete message ,將具有唯一鍵的動(dòng)態(tài)表轉(zhuǎn)換為流。消費(fèi)流的算子需要知道唯一鍵的屬性,以便正確地應(yīng)用 message。與 retract 流的主要區(qū)別在于 UPDATE 操作是用單個(gè) message 編碼的,因此效率更高。下圖顯示了將動(dòng)態(tài)表轉(zhuǎn)換為 upsert 流的過程。
在通用概念(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/common.html#convert-a-table-into-a-datastream)中討論了將動(dòng)態(tài)表轉(zhuǎn)換為 DataStream 的 API。請(qǐng)注意,在將動(dòng)態(tài)表轉(zhuǎn)換為 DataStream 時(shí),只支持 append 流和 retract 流。在 TableSources 和 TableSinks (https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/sourceSinks.html#define-a-tablesink)章節(jié)討論向外部系統(tǒng)輸出動(dòng)態(tài)表的 TableSink 接口。
總結(jié)
以上是生活随笔為你收集整理的1.18.5.流式概念、动态表(Dynamic Table)、DataStream上的关系查询、动态表 连续查询(Continuous Query)、在流上定义表、处理时间的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深圳软银支付有牌照吗
- 下一篇: 社会保障卡有什么用 这种功能很少有人使