Apache Flink 零基础入门(十九)Flink windows和Time操作
Time類型
在Flink中常用的Time類型:
- 處理時間
- 攝取時間
- 事件時間
處理時間
是上圖中,最后一步的處理時間,表示服務(wù)器中執(zhí)行相關(guān)操作的處理時間。例如一些算子操作時間,在服務(wù)器上面的時間。
如果你以處理時間作為流處理的時間處理方式,那么所有的基于時間的操作都會使用服務(wù)器的時間,來運行相關(guān)的操作。例如:一個小時的處理時間窗口,將會包含一個小時內(nèi)的到達服務(wù)器內(nèi)的所有數(shù)據(jù)。例如應(yīng)用程序9:15am開始執(zhí)行,第一個小時的時間處理窗口會包含所有的9:15到10:15內(nèi)的事件數(shù)據(jù),下一個時間窗口是10:15到11:15內(nèi)的所有數(shù)據(jù)。
處理時間是最簡單的事件處理方式,并不需要流和機器的時間協(xié)調(diào)。因此提供了高性能和低延遲。然而在分布式環(huán)境中或者異步環(huán)境中處理時間并不能夠提供準(zhǔn)確性(也就是說在處理數(shù)據(jù)時,由于網(wǎng)絡(luò)的抖動在一個處理時間窗口中例如9:15到10:15,很大可能包括9:00的事件數(shù)據(jù))。
事件時間
事件時間是每一個設(shè)備上每一個單獨事件發(fā)生的時間例如手機登錄APP的日志時間。這個時間就是這條數(shù)據(jù)記錄的時間。每一條數(shù)據(jù)都有一個時間戳表示這條數(shù)據(jù)的事件發(fā)生時間。這個時間取決于每條數(shù)據(jù),而并不會依賴于機器的時間。事件時間處理時必須指定如何獲得Event Time watermarks(用來描述Event Time如何處理)。
按照事件時間處理數(shù)據(jù),處理結(jié)果應(yīng)該是完全一致,也就是說無論處理多少次結(jié)果都是一樣的,這就是所謂的大數(shù)據(jù)處理的冪等性。 不管事件到達時間和事件是不是有序到達(在生產(chǎn)環(huán)境中,數(shù)據(jù)往往進入到服務(wù)器中的時間和順序是不一定的,有可能先產(chǎn)生的數(shù)據(jù)后到達服務(wù)器,這取決于很多網(wǎng)絡(luò)因素)
攝取時間
攝取時間表示某個事件數(shù)據(jù)進入到Flink的時間。在source操作中,每條記錄都會得到source的當(dāng)前時間戳,也就是接收到的數(shù)據(jù)自動會有一個攝取時間,也就是例如時間窗都是基于這個時間來處理的。
攝取時間是處于事件時間和處理時間之間。如上圖所示。攝取時間是有成本的,但是卻是結(jié)果可預(yù)測的。因為攝取時間使用了穩(wěn)定的時間戳(在source端只會分配一次),每一條數(shù)據(jù)的時間戳都是固定的。并且同一攝取時間的數(shù)據(jù)有可能被分配到不同的處理時間窗口中。
Windows
Windows使我們處理無限數(shù)據(jù)流(源源不斷的進來)的核心部件。Windows把我們的數(shù)據(jù)流拆成一個個的buckets。我們需要把算子作用到buckets上面去。
第一件事情就是需要指定我們的流數(shù)據(jù)是不是有key,有key和沒有key對應(yīng)的算子是完全不一樣的。
Keyed windows
帶keyby,會結(jié)合windows一起使用。輸入的數(shù)據(jù)內(nèi)容中的任意屬性都可以作為一個key。在這個流上可以允許窗口多任務(wù)并行計算,每一個邏輯key都可以被獨立計算,相同的key的數(shù)據(jù)會被發(fā)送到相同的并行任務(wù)中去處理。
Non-Keyed windows
通過使用windowAll來指定。原始的數(shù)據(jù)流不會被拆分成多個邏輯任務(wù),所有窗口邏輯都是一個窗口任務(wù)來執(zhí)行,所以并行度是1。
windows 生命周期
簡而言之,當(dāng)?shù)谝粋€元素到達對應(yīng)的窗口時,一個windows就會被開始創(chuàng)建。當(dāng)時間(不管是event時間還是processing時間)達到時間戳范圍,就會移除窗口。另外,每一個窗口都有一個Trigger和window Functions,當(dāng)數(shù)據(jù)到達窗口后,執(zhí)行的函數(shù)就是window Functions,這個函數(shù)包含了對這個窗口內(nèi)容的所有計算,當(dāng)Trigger達到一定條件之后,就會觸發(fā)。
Windows Assigners
在指定流數(shù)據(jù)是否帶key之后,下一步就是定義窗口的分配器(windows assigner),windows assigner的職責(zé)是定義每一個傳入的元素如何分配到窗口內(nèi)。對于keyby使用window()方法,對于non-keyby使用windowAll()方法。
A?WindowAssigner?is responsible for assigning each incoming element to one or more windows.?
?每個傳入的數(shù)據(jù)分配給一個或多個窗口。
Flink內(nèi)置的window assigner對于大多數(shù)場景來講基本上是夠用的(tumbling windows滾動窗口, sliding windows滑動窗口, session windows會話窗口 and global windows全局窗口)。也可以通過繼承WindowAssigner來自定義一個window assigner。所有的內(nèi)置window assigner(除了全局窗口)都是基于時間(處理時間或事件時間)來分配數(shù)據(jù)的。
基于時間的窗口有一個開始的timestamp(inclusive)和結(jié)束timestamp(exclusive)表示窗口的大小。
Flink中對于窗口的劃分有兩大類,第一大類是基于time(用的最多),第二大類是基于count。
Tumbling Windows 滾動窗口
滾動窗口分配器將分配每一個元素到一個指定大小的窗口,這種類型的窗口有一個固定的大小而且不會有重疊的。上面這張圖就是隨著時間流按照指定的時間間隔拆開。
簡單實例代碼:
Scala
object WindosApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval text = env.socketTextStream("192.168.227.128", 9999)text.flatMap(_.split(",")).map((_,1)).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1)env.execute("WindosApp")}}上面的代碼表示監(jiān)聽socket數(shù)據(jù)流,每隔5秒獲取一次數(shù)據(jù)。timeWindow表示根據(jù)時間來劃分窗口,(此外還有countWindow根據(jù)數(shù)量來劃分窗口)。默認時間是processTime處理時間。
Java
public class JavaWindowApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999);text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for(String token: tokens) {if(token.length()>0){out.collect(new Tuple2<String, Integer>(token, 1));}}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);env.execute("JavaWindowApp");} }Sliding Windows滑動窗口
滑動窗口分配器分配每一個元素到一個固定大小的窗口,類似于滾動窗口,窗口大小可以通過配置進行修改,但是滑動窗口還有另外一個附加滑動參數(shù)控制滑動窗口什么時候啟動,所以這個窗口是有可能重疊的。
上面圖的意思是window1的窗口大小是10分鐘,滑動大小是5分鐘,也就是每隔5分鐘產(chǎn)生一個窗口,這個窗口的大小是10分鐘,這個窗口就是window2,然后window2又過5分鐘產(chǎn)生一個窗口,窗口的大小是10分鐘 window3,以此類推。所以滑動窗口處理的數(shù)據(jù)可能會有重疊。一個數(shù)據(jù)元素可能會在多個窗口中進行處理。
使用場景:每個半個小時統(tǒng)計前一個小時的TopN。
object WindosApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval text = env.socketTextStream("192.168.227.128", 9999)text.flatMap(_.split(",")).map((_,1)).keyBy(0)//.timeWindow(Time.seconds(5)) # 滾動窗口.timeWindow(Time.seconds(10),Time.seconds(5)).sum(1).print().setParallelism(1)env.execute("WindosApp")}}每隔5秒統(tǒng)計近10秒的數(shù)據(jù)。所以當(dāng)服務(wù)器端輸入:
a,a,a,b,b,b a,a,a,b,b,b a,b,a,b,a,a時,控制臺會打印兩遍結(jié)果:
(a,10) (b,8) (b,8) (a,10)Window Functions
在定義窗口分配器之后,就需要指定基于每一個窗口的計算方法了(在上面的例子中我們做了一個keyby sum操作)。window function會處理窗口中的每一個元素。window function包括如下幾個:
- ReduceFunction
- AggregationFunction
- FoldFunction
- ProcessWindowFunction
ReduceFunction和AggregationFunction的執(zhí)行效率更高,因為Flink會在數(shù)據(jù)到達每一個窗口時首先做一個增量聚合操作。ProcessWindowFunction拿到的是包含在窗口中的所有的元素以及附加信息一個Iterable,是一個全量聚合。因此ProcessWindowFunction的執(zhí)行效率不高,因為Flink會緩存窗口中的所有數(shù)據(jù)。
ReduceFunction
input中的兩個元素進行結(jié)合產(chǎn)生一個同樣類型的輸出。這里我們舉例,通過傳入的數(shù)據(jù)類型是數(shù)值類型來演示增量效果。
Scala
object WindowReduceApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval text = env.socketTextStream("192.168.227.128", 9999)text.flatMap(_.split(",")).map(x=>(1,x.toInt)) // 1,2,3,4,5 => (1,1) (1,2) (1,3) (1,4) (1,5).keyBy(0) //因為key都是1, 所以所有的元素都到一個task去執(zhí)行.timeWindow(Time.seconds(5)) // 滾動窗口.reduce((v1, v2) => { reduce函數(shù)作用在窗口之上,就可以完成窗口中的增量操作,不用等所有的數(shù)據(jù)到達之后進行一次性處理,而是數(shù)據(jù)兩兩處理println(v1 + "....." + v2)(v1._1, v1._2 + v2._2)}).print().setParallelism(1)env.execute("WindowReduceApp")} }服務(wù)器端輸入:
1,2,3,4,5控制臺中輸出如下:
(1,1).....(1,2) (1,3).....(1,3) (1,6).....(1,4) (1,10).....(1,5) (1,15)reduce函數(shù)作用在窗口之上,就可以完成窗口中的增量操作,不用等所有的數(shù)據(jù)到達之后進行一次性處理,而是數(shù)據(jù)兩兩處理。
Java
public class JavaWindowReduceApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999);text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for(String token: tokens) {if(token.length()>0){out.collect(new Tuple2<Integer, Integer>(1, Integer.parseInt(token)));}}}}).keyBy(0).timeWindow(Time.seconds(5)).reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {System.out.println("value1 = [" + value1 + "], value2 = [" + value2 + "]");return new Tuple2<>(value1.f0,value1.f1 + value2.f1);}}).print().setParallelism(1);env.execute("JavaWindowApp");} }輸出結(jié)果如下:
value1 = [(1,1)], value2 = [(1,2)] value1 = [(1,3)], value2 = [(1,3)] value1 = [(1,6)], value2 = [(1,4)] value1 = [(1,10)], value2 = [(1,5)] (1,15)ProcessWindowFunction
ProcessWindowFunction可以拿到一個Iterable,可以拿到窗口中的所有元素,并且有一個上下文對象可以訪問時間和狀態(tài)信息,比reducefunction可以提供更多的功能。但這樣卻可以帶來資源和性能的開銷,因為元素并不能通過增量的方式去聚合,相反,它需要把所有的數(shù)據(jù)都放在一個buffer中。
public class JavaWindowProcessApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999);text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for(String token: tokens) {if(token.length()>0){out.collect(new Tuple2<Integer, Integer>(1, Integer.parseInt(token)));}}}}).keyBy(0).timeWindow(Time.seconds(5)).process(new ProcessWindowFunction<Tuple2<Integer, Integer>, Object, Tuple, TimeWindow>() {@Overridepublic void process(Tuple tuple, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<Object> out) throws Exception {System.out.println("tuple = [" + tuple + "], context = [" + context + "], elements = [" + elements + "], out = [" + out + "]");long count = 0;for(Tuple2<Integer, Integer> in:elements) {count++;}out.collect("window:" + context.window() + "count:" + count);}}).print().setParallelism(1);env.execute("JavaWindowApp");} }服務(wù)器輸入:
1,2,3,4,5控制臺輸出:
tuple = [(1)], context = [org.apache.flink.streaming.runtime.operators.windowing.functions.InternalProcessWindowContext@40e09d6c], elements = [[(1,1), (1,2), (1,3), (1,4), (1,5)]], out = [org.apache.flink.streaming.api.operators.TimestampedCollector@4e277b00] window:TimeWindow{start=1568542160000, end=1568542165000}count:5只輸出一次,說明是等待所有數(shù)據(jù)都拿到之后才進行處理。
使用場景:窗口內(nèi)的數(shù)據(jù)進行排序。在Reduce中是無法進行排序的。
總結(jié)
以上是生活随笔為你收集整理的Apache Flink 零基础入门(十九)Flink windows和Time操作的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Flink 零基础入门(十
- 下一篇: Apache Flink 零基础入门(二