聊聊flink的Tumbling Window
為什么80%的碼農都做不了架構師?>>> ??
序
本文主要研究一下flink的Tumbling Window
WindowAssigner
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@PublicEvolving public abstract class WindowAssigner<T, W extends Window> implements Serializable {private static final long serialVersionUID = 1L;/*** Returns a {@code Collection} of windows that should be assigned to the element.** @param element The element to which windows should be assigned.* @param timestamp The timestamp of the element.* @param context The {@link WindowAssignerContext} in which the assigner operates.*/public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);/*** Returns the default trigger associated with this {@code WindowAssigner}.*/public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);/*** Returns a {@link TypeSerializer} for serializing windows that are assigned by* this {@code WindowAssigner}.*/public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);/*** Returns {@code true} if elements are assigned to windows based on event time,* {@code false} otherwise.*/public abstract boolean isEventTime();/*** A context provided to the {@link WindowAssigner} that allows it to query the* current processing time.** <p>This is provided to the assigner by its containing* {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},* which, in turn, gets it from the containing* {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.*/public abstract static class WindowAssignerContext {/*** Returns the current processing time.*/public abstract long getCurrentProcessingTime();} }- WindowAssigner定義了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime這幾個抽象方法,同時定義了抽象靜態類WindowAssignerContext;它有兩個泛型,其中T為元素類型,而W為窗口類型
Window
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/Window.java
@PublicEvolving public abstract class Window {/*** Gets the largest timestamp that still belongs to this window.** @return The largest timestamp that still belongs to this window.*/public abstract long maxTimestamp(); }- Window對象代表把無限流數據劃分為有限buckets的集合,它有一個maxTimestamp,代表該窗口數據在該時間點內到達;它有兩個子類,一個是GlobalWindow,一個是TimeWindow
TimeWindow
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@PublicEvolving public class TimeWindow extends Window {private final long start;private final long end;public TimeWindow(long start, long end) {this.start = start;this.end = end;}/*** Gets the starting timestamp of the window. This is the first timestamp that belongs* to this window.** @return The starting timestamp of this window.*/public long getStart() {return start;}/*** Gets the end timestamp of this window. The end timestamp is exclusive, meaning it* is the first timestamp that does not belong to this window any more.** @return The exclusive end timestamp of this window.*/public long getEnd() {return end;}/*** Gets the largest timestamp that still belongs to this window.** <p>This timestamp is identical to {@code getEnd() - 1}.** @return The largest timestamp that still belongs to this window.** @see #getEnd()*/@Overridepublic long maxTimestamp() {return end - 1;}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}TimeWindow window = (TimeWindow) o;return end == window.end && start == window.start;}@Overridepublic int hashCode() {return MathUtils.longToIntWithBitMixing(start + end);}@Overridepublic String toString() {return "TimeWindow{" +"start=" + start +", end=" + end +'}';}/*** Returns {@code true} if this window intersects the given window.*/public boolean intersects(TimeWindow other) {return this.start <= other.end && this.end >= other.start;}/*** Returns the minimal window covers both this window and the given window.*/public TimeWindow cover(TimeWindow other) {return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));}// ------------------------------------------------------------------------// Serializer// ------------------------------------------------------------------------//......// ------------------------------------------------------------------------// Utilities// ------------------------------------------------------------------------/*** Merge overlapping {@link TimeWindow}s. For use by merging* {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}.*/public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {// sort the windows by the start time and then merge overlapping windowsList<TimeWindow> sortedWindows = new ArrayList<>(windows);Collections.sort(sortedWindows, new Comparator<TimeWindow>() {@Overridepublic int compare(TimeWindow o1, TimeWindow o2) {return Long.compare(o1.getStart(), o2.getStart());}});List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>();Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null;for (TimeWindow candidate: sortedWindows) {if (currentMerge == null) {currentMerge = new Tuple2<>();currentMerge.f0 = candidate;currentMerge.f1 = new HashSet<>();currentMerge.f1.add(candidate);} else if (currentMerge.f0.intersects(candidate)) {currentMerge.f0 = currentMerge.f0.cover(candidate);currentMerge.f1.add(candidate);} else {merged.add(currentMerge);currentMerge = new Tuple2<>();currentMerge.f0 = candidate;currentMerge.f1 = new HashSet<>();currentMerge.f1.add(candidate);}}if (currentMerge != null) {merged.add(currentMerge);}for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) {if (m.f1.size() > 1) {c.merge(m.f1, m.f0);}}}/*** Method to get the window start for a timestamp.** @param timestamp epoch millisecond to get the window start.* @param offset The offset which window start would be shifted by.* @param windowSize The size of the generated windows.* @return window start*/public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {return timestamp - (timestamp - offset + windowSize) % windowSize;} }- TimeWindow有start及end屬性,其中start為inclusive,而end為exclusive,所以maxTimestamp返回的是end-1;這里重寫了equals及hashcode方法
- TimeWindow提供了intersects方法用于表示本窗口與指定窗口是否有交叉;而cover方法用于返回本窗口與指定窗口的重疊窗口
- TimeWindow還提供了mergeWindows及getWindowStartWithOffset靜態方法;前者用于合并重疊的時間窗口,后者用于獲取指定timestamp、offset、windowSize的window start
TumblingEventTimeWindows
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@PublicEvolving public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {private static final long serialVersionUID = 1L;private final long size;private final long offset;protected TumblingEventTimeWindows(long size, long offset) {if (offset < 0 || offset >= size) {throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");}this.size = size;this.offset = offset;}@Overridepublic Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {if (timestamp > Long.MIN_VALUE) {// Long.MIN_VALUE is currently assigned when no timestamp is presentlong start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);return Collections.singletonList(new TimeWindow(start, start + size));} else {throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +"'DataStream.assignTimestampsAndWatermarks(...)'?");}}@Overridepublic Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {return EventTimeTrigger.create();}@Overridepublic String toString() {return "TumblingEventTimeWindows(" + size + ")";}public static TumblingEventTimeWindows of(Time size) {return new TumblingEventTimeWindows(size.toMilliseconds(), 0);}public static TumblingEventTimeWindows of(Time size, Time offset) {return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());}@Overridepublic TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {return new TimeWindow.Serializer();}@Overridepublic boolean isEventTime() {return true;} }- TumblingEventTimeWindows繼承了Window,其中元素類型為Object,而窗口類型為TimeWindow;它有兩個參數,一個是size,一個是offset,其中offset必須大于等于0,size必須大于offset
- assignWindows方法獲取的窗口為start及start+size,而start=TimeWindow.getWindowStartWithOffset(timestamp, offset, size);getDefaultTrigger方法返回的是EventTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回true
- TumblingEventTimeWindows提供了of靜態工廠方法,可以指定size及offset參數
TumblingProcessingTimeWindows
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {private static final long serialVersionUID = 1L;private final long size;private final long offset;private TumblingProcessingTimeWindows(long size, long offset) {if (offset < 0 || offset >= size) {throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy 0 <= offset < size");}this.size = size;this.offset = offset;}@Overridepublic Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {final long now = context.getCurrentProcessingTime();long start = TimeWindow.getWindowStartWithOffset(now, offset, size);return Collections.singletonList(new TimeWindow(start, start + size));}public long getSize() {return size;}@Overridepublic Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {return ProcessingTimeTrigger.create();}@Overridepublic String toString() {return "TumblingProcessingTimeWindows(" + size + ")";}public static TumblingProcessingTimeWindows of(Time size) {return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);}public static TumblingProcessingTimeWindows of(Time size, Time offset) {return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());}@Overridepublic TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {return new TimeWindow.Serializer();}@Overridepublic boolean isEventTime() {return false;} }- TumblingProcessingTimeWindows繼承了WindowAssigner,其中元素類型為Object,而窗口類型為TimeWindow;它有兩個參數,一個是size,一個是offset,其中offset必須大于等于0,size必須大于offset
- assignWindows方法獲取的窗口為start及start+size,而start=TimeWindow.getWindowStartWithOffset(now, offset, size),而now值則為context.getCurrentProcessingTime(),則是與TumblingEventTimeWindows的不同之處,TumblingProcessingTimeWindows不使用timestamp參數來計算,它使用now值替代;getDefaultTrigger方法返回的是ProcessingTimeTrigger,而isEventTime方法返回的為false
- TumblingProcessingTimeWindows也提供了of靜態工廠方法,可以指定size及offset參數
小結
- flink的Tumbling Window分為TumblingEventTimeWindows及TumblingProcessingTimeWindows,它們都繼承了WindowAssigner,其中元素類型為Object,而窗口類型為TimeWindow;它有兩個參數,一個是size,一個是offset,其中offset必須大于等于0,size必須大于offset
- WindowAssigner定義了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime這幾個抽象方法,同時定義了抽象靜態類WindowAssignerContext;它有兩個泛型,其中T為元素類型,而W為窗口類型;TumblingEventTimeWindows及TumblingProcessingTimeWindows的窗口類型為TimeWindow,它有start及end屬性,其中start為inclusive,而end為exclusive,maxTimestamp返回的是end-1,它還提供了mergeWindows及getWindowStartWithOffset靜態方法;前者用于合并重疊的時間窗口,后者用于獲取指定timestamp、offset、windowSize的window start
- TumblingEventTimeWindows及TumblingProcessingTimeWindows的不同在于assignWindows、getDefaultTrigger、isEventTime方法;前者assignWindows使用的是參數中的timestamp,而后者使用的是now值;前者的getDefaultTrigger返回的是EventTimeTrigger,而后者返回的是ProcessingTimeTrigger;前者isEventTime方法返回的為true,而后者返回的為false
doc
- Tumbling Windows
轉載于:https://my.oschina.net/go4it/blog/2995872
總結
以上是生活随笔為你收集整理的聊聊flink的Tumbling Window的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python之路(第三十八篇) 并发编程
- 下一篇: 方案没效果,是方法有问题还是人有问题?