flink 不设置水印_区分理解Flink水印延迟与窗口允许延迟的概念
link 在開(kāi)窗處理事件時(shí)間(Event Time) 數(shù)據(jù)時(shí),可設(shè)置水印延遲以及設(shè)置窗口允許延遲(allowedLateness)以保證數(shù)據(jù)的完整性。這兩者因都是設(shè)置延遲時(shí)間所以剛接觸時(shí)容易混淆。本文接下將展開(kāi)討論分析“水印延遲”與“窗口允許延遲”概念及區(qū)別。
水印延遲(WaterMark)
(1) 水印
由于采用了事件時(shí)間,脫離了物理掛鐘。窗口不知道什么時(shí)候需要關(guān)閉并進(jìn)行計(jì)算,這個(gè)時(shí)候需要借助水印來(lái)解決該問(wèn)題。當(dāng)窗口遇到水位標(biāo)識(shí)時(shí)就默認(rèn)是窗口時(shí)間段內(nèi)的數(shù)據(jù)都到齊了,可以觸發(fā)窗口計(jì)算。
(2) 水印延遲
設(shè)置水印延遲時(shí)間的目的是讓水印延遲到達(dá),從而可以解決亂序問(wèn)題。通過(guò)水印延遲到達(dá)讓在延遲時(shí)間范圍內(nèi)到達(dá)的遲到數(shù)據(jù)可以加入到窗口計(jì)算中,保證了數(shù)據(jù)的完整性。當(dāng)水印到達(dá)后就會(huì)觸發(fā)窗口計(jì)算,在水印之后到達(dá)的遲到數(shù)據(jù)則會(huì)被丟棄。
窗口允許延遲(allowedLateness)
使用 StreamAPI 時(shí),在進(jìn)行開(kāi)窗后可設(shè)置 allowedLateness 窗口延遲。官網(wǎng)中對(duì)其解釋如下:
默認(rèn)情況下,當(dāng)水印到達(dá)窗口末端時(shí),遲到元素將會(huì)被刪除。但Flink允許為window operators指定允許的最大延遲。允許延遲指定元素在被刪除之前延遲的時(shí)間,默認(rèn)值為0。當(dāng)元素在水印經(jīng)過(guò)窗口末端后到達(dá),且它的到達(dá)時(shí)間在窗口末端加上運(yùn)行延遲的時(shí)間之內(nèi),其仍會(huì)被添加到窗口中。根據(jù)所使用的觸發(fā)器,延遲但未被丟棄的元素可能會(huì)再次觸發(fā)窗口計(jì)算。EventTimeTrigger就是這種情況。為了做到這一點(diǎn),Flink保持窗口的狀態(tài),直到它們?cè)试S的延遲到期。一旦發(fā)生這種情況,Flink將刪除窗口并刪除其狀態(tài),正如窗口生命周期部分中所描述的那樣。
簡(jiǎn)單理解:通常在水印到達(dá)之后遲到數(shù)據(jù)將會(huì)被刪除,而窗口的延遲則是指數(shù)據(jù)在被刪除之前的允許保留時(shí)間。也就是說(shuō),在水印達(dá)到之后遲到數(shù)據(jù)本該被刪除,但是如果設(shè)置了窗口延遲,那么在水印之后到窗口延遲時(shí)間段內(nèi)到達(dá)的遲到數(shù)據(jù)還是會(huì)被加入到窗口計(jì)算中,并再次觸發(fā)窗口計(jì)算。
一個(gè)Demo 兩個(gè)猜想
下面我用一個(gè) Demo 和兩個(gè)猜想來(lái)幫助大家加深理解這兩個(gè)概念。
例子:接收 Kafka 數(shù)據(jù),數(shù)據(jù)為 JSON 格式如:{"word":"a","count":1,"time":1604286564}。我們開(kāi)一個(gè) 5 秒的 tumbling windows 滾動(dòng)窗口,以 word 作為 key 在窗口內(nèi)對(duì) count 值進(jìn)行累加。同時(shí)設(shè)置水印延遲 2 秒,窗口延遲 2 秒。代碼如下:
public?class?MyExample?{
public?static?void?main(String[]?args)?throws?Exception?{
//?創(chuàng)建環(huán)境
StreamExecutionEnvironment?env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//?設(shè)置時(shí)間特性為
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//?水印策略,其需要注入Timestamp?Assigner(描述了如何訪問(wèn)事件時(shí)間戳)和?Watermark?Generator?(事件流顯示的超出正常范圍的程度)
WatermarkStrategywatermarkStrategy=WatermarkStrategy
//?forBoundedOutOfOrderness?屬于(periodic周期性),周期生成器通常通過(guò)onEvent()觀察傳入的事件,然后在框架調(diào)用onPeriodicEmit()時(shí)發(fā)出水印。
.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new?SerializableTimestampAssigner()?{
@Override
public?long?extractTimestamp(WC?wc,?long?l)?{
return?wc.getEventTime()?*?1000;
}
});
//?Kafka?配置
Properties?properties=newProperties();
properties.setProperty("bootstrap.servers",?"Kafka地址:9092");
properties.setProperty("group.id",?"test");
//?Flink?需要知道如何轉(zhuǎn)換Kafka消息為Java對(duì)象(反序列化),默認(rèn)提供了?KafkaDeserializationSchema(序列化需要自己編寫(xiě))、JsonDeserializationSchema、AvroDeserializationSchema、TypeInformationSerializationSchema
env.addSource(new?FlinkKafkaConsumer<>("flinktest1",?new?JSONKeyValueDeserializationSchema(true),?properties).setStartFromLatest())
//?map?構(gòu)建?WC?對(duì)象
.map(new?MapFunction()?{
@Override
public?WC?map(ObjectNode?jsonNode)?throws?Exception?{
JsonNode?valueNode=jsonNode.get("value");
WC?wc=newWC(valueNode.get("word").asText(),valueNode.get("count").asInt(),valueNode.get("time").asLong());
return?wc;
}
})
//?設(shè)定水印策略
.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(WC::getWord)
//?窗口設(shè)置,這里設(shè)置為滾動(dòng)窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
//?設(shè)置窗口延遲
.allowedLateness(Time.seconds(2))
.reduce(new?ReduceFunction()?{
@Override
public?WC?reduce(WC?wc,?WC?t1)?throws?Exception?{
return?new?WC(wc.getWord(),?wc.getCount()?+?t1.getCount());
}
})
.print();
env.execute();
}
static?class?WC?{
public?String?word;
public?int?count;
public?long?eventTime;
public?long?getEventTime()?{
return?eventTime;
}
public?void?setEventTime(long?eventTime)?{
this.eventTime=?eventTime;
}
public?String?getWord()?{
return?word;
}
public?void?setWord(String?word)?{
this.word=?word;
}
public?int?getCount()?{
return?count;
}
public?void?setCount(int?count)?{
this.count=?count;
}
public?WC(String?word,?int?count)?{
this.word=?word;
this.count=?count;
}
public?WC(String?word,?int?count,long?eventTime)?{
this.word=?word;
this.count=?count;
this.eventTime=?eventTime;
}
@Override
public?String?toString()?{
return?"WC{"?+
"word='"?+?word?+?'\''?+
",?count="?+?count?+
'}';
}
}
}
猜想1:
水印延遲 2s 達(dá)到,所以會(huì)在第 5 + 2 = 7s 時(shí)認(rèn)為 [ 0 ,5 ) 窗口的數(shù)據(jù)全部到齊,并觸發(fā)窗口計(jì)算。
//?往?Kafka?中寫(xiě)入數(shù)據(jù)
{"word":"a","count":1,"time":1604286560}???//2020-11-02?11:09:20
{"word":"a","count":1,"time":1604286561}???//2020-11-02?11:09:21
{"word":"a","count":1,"time":1604286562}???//2020-11-02?11:09:22
{"word":"a","count":1,"time":1604286566}???//2020-11-02?11:09:26
{"word":"a","count":1,"time":1604286567}???//2020-11-02?11:09:27?(觸發(fā)了窗口計(jì)算)
控制臺(tái)輸出
分析:通過(guò)測(cè)試發(fā)現(xiàn)最后在第 7s 也就是 11:09:27 時(shí)觸發(fā)了窗口計(jì)算,這符合了我們的猜想一。水印延遲 2s 達(dá)到,所以會(huì)在第 5 + 2 = 7s 時(shí)認(rèn)為 [ 0 ,5 ) 窗口的數(shù)據(jù)全部到齊,并觸發(fā)窗口計(jì)算。計(jì)算結(jié)果為3,這是因?yàn)橹挥凶钋懊娴?條數(shù)據(jù)屬于 [0,5) 窗口計(jì)算范圍之內(nèi)。
猜想2:
設(shè)置了窗口延遲2秒,那么只要在水印之后到窗口允許延遲的時(shí)間范圍內(nèi)達(dá)到且屬于 [ 0,5) 窗口的遲到數(shù)據(jù)會(huì)被加入到窗口中,且再次觸發(fā)窗口運(yùn)算:
//?繼續(xù)往?Kafka?中寫(xiě)入數(shù)據(jù)
{"word":"a","count":1,"time":1604286568}???//2020-11-02?11:09:28?時(shí)間到達(dá)了第?8?秒
{"word":"a","count":1,"time":1604286563}???//2020-11-02?11:09:23?模擬一個(gè)在水印之后、在窗口允許延遲范圍內(nèi)、且屬于[0,5)?窗口的遲到數(shù)據(jù),該數(shù)據(jù)還是會(huì)觸發(fā)并參與到[0,5)?窗口的計(jì)算
控制臺(tái)輸出新增了一行
//?我們?cè)倮^續(xù)往?Kafka?中寫(xiě)入數(shù)據(jù)
{"word":"a","count":1,"time":1604286569}??//2020-11-02?11:09:29??時(shí)間到達(dá)第9秒
{"word":"a","count":1,"time":1604286563}??//2020-11-02?11:09:23?模擬一個(gè)在水印之后且超出窗口允許延遲范圍、且屬于[0,5)?窗口的遲到數(shù)據(jù),該數(shù)據(jù)不會(huì)參與和觸發(fā)[0,5)窗口計(jì)算
查看控制臺(tái)并沒(méi)有發(fā)現(xiàn)新的輸出打印。
解析:水印因延遲在第 7s 到達(dá)之后會(huì)觸發(fā)[0,5) 窗口計(jì)算,如果沒(méi)有設(shè)置窗口延遲的情況下,水印之后遲到且屬于 [0,5) 窗口的數(shù)據(jù)會(huì)被丟棄。上面我們實(shí)驗(yàn)設(shè)置窗口延遲 2s,實(shí)現(xiàn)的效果就是在水印之后,窗口允許延遲時(shí)間之內(nèi)(7 + 2 = 9s 之間),遲到且屬于 [0,5) 窗口的數(shù)據(jù)還是會(huì)觸發(fā)一次窗口計(jì)算,并參與到窗口計(jì)算中。而在 9s 之后,也就是超過(guò)窗口允許延時(shí)時(shí)間,那么遲到且屬于[0,5)的數(shù)據(jù)就會(huì)被丟棄。
總結(jié)
WaterMark 到達(dá)之前,窗口在攢數(shù)據(jù),不會(huì)觸發(fā)計(jì)算。
WaterMark 等于 windowEndTime 時(shí),第一次觸發(fā)窗口計(jì)算。
WaterMark 到達(dá)之后,allowlateness之前,如果來(lái)了數(shù)據(jù),每條數(shù)據(jù)都會(huì)觸發(fā)窗口計(jì)算。
超過(guò)了allowlateness之后到達(dá)的遲到數(shù)據(jù)會(huì)丟棄。
水印用于解決亂序問(wèn)題保證數(shù)據(jù)的完整性。而之所以有allowlateness的出現(xiàn)是因?yàn)槿绻鸚aterMark 加大會(huì)導(dǎo)致窗口計(jì)算延遲。WaterMark 設(shè)定的時(shí)間,是第一次觸發(fā)窗口計(jì)算的時(shí)間。allowlateness 表示,WaterMark 觸發(fā)窗口計(jì)算以后,還可以再等多久的遲到數(shù)據(jù),每次符合條件的數(shù)據(jù)到達(dá)都會(huì)再次觸發(fā)一次窗口計(jì)算。allowlateness 是在 Watermark 基礎(chǔ)上再做了一層遲到數(shù)據(jù)的保證。
【責(zé)任編輯:趙寧寧 TEL:(010)68476606】
點(diǎn)贊 0
總結(jié)
以上是生活随笔為你收集整理的flink 不设置水印_区分理解Flink水印延迟与窗口允许延迟的概念的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: hp战60想装成win7怎么办 HP战6
- 下一篇: 联想x1怎么装win7 联想X1如何安装