聊聊flink的consecutive windowed operations
為什么80%的碼農都做不了架構師?>>> ??
序
本文主要研究一下flink的consecutive windowed operations
實例
DataStream<Integer> input = ...;DataStream<Integer> resultsPerKey = input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(new Summer());DataStream<Integer> globalResults = resultsPerKey.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new TopKWindowFunction());- 本實例首先根據key進行partition,然后再按指定的window對這些key進行計數,之后對該dataStream進行windowAll操作,其時間WindowAssigner與前面的相同,這樣可以達到在同樣的時間窗口內先partition匯總,再全局匯總的效果(可以解決類似top-k elements的問題)
TimestampsAndPeriodicWatermarksOperator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
public class TimestampsAndPeriodicWatermarksOperator<T>extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {private static final long serialVersionUID = 1L;private transient long watermarkInterval;private transient long currentWatermark;public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {super(assigner);this.chainingStrategy = ChainingStrategy.ALWAYS;}@Overridepublic void open() throws Exception {super.open();currentWatermark = Long.MIN_VALUE;watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();if (watermarkInterval > 0) {long now = getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now + watermarkInterval, this);}}@Overridepublic void processElement(StreamRecord<T> element) throws Exception {final long newTimestamp = userFunction.extractTimestamp(element.getValue(),element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);output.collect(element.replace(element.getValue(), newTimestamp));}@Overridepublic void onProcessingTime(long timestamp) throws Exception {// register next timerWatermark newWatermark = userFunction.getCurrentWatermark();if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {currentWatermark = newWatermark.getTimestamp();// emit watermarkoutput.emitWatermark(newWatermark);}long now = getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now + watermarkInterval, this);}/*** Override the base implementation to completely ignore watermarks propagated from* upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit* watermarks from here).*/@Overridepublic void processWatermark(Watermark mark) throws Exception {// if we receive a Long.MAX_VALUE watermark we forward it since it is used// to signal the end of input and to not block watermark progress downstreamif (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {currentWatermark = Long.MAX_VALUE;output.emitWatermark(mark);}}@Overridepublic void close() throws Exception {super.close();// emit a final watermarkWatermark newWatermark = userFunction.getCurrentWatermark();if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {currentWatermark = newWatermark.getTimestamp();// emit watermarkoutput.emitWatermark(newWatermark);}} }- 假設assignTimestampsAndWatermarks使用的是AssignerWithPeriodicWatermarks類型的參數,那么創建的是TimestampsAndPeriodicWatermarksOperator;它在open的時候根據指定的watermarkInterval注冊了一個延時任務
- 該延時任務會回調onProcessingTime方法,而onProcessingTime在這里則會調用AssignerWithPeriodicWatermarks的getCurrentWatermark方法獲取watermark,然后重新注冊新的延時任務,延時時間為getProcessingTimeService().getCurrentProcessingTime()+watermarkInterval;這里的watermarkInterval即為env.getConfig().setAutoWatermarkInterval設置的值
- AssignerWithPeriodicWatermarks的getCurrentWatermark方法除了注冊延時任務實現不斷定時的效果外,還會在新的watermark值大于currentWatermark的條件下發射watermark
SystemProcessingTimeService
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
public class SystemProcessingTimeService extends ProcessingTimeService {private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);private static final int STATUS_ALIVE = 0;private static final int STATUS_QUIESCED = 1;private static final int STATUS_SHUTDOWN = 2;// ------------------------------------------------------------------------/** The containing task that owns this time service provider. */private final AsyncExceptionHandler task;/** The lock that timers acquire upon triggering. */private final Object checkpointLock;/** The executor service that schedules and calls the triggers of this task. */private final ScheduledThreadPoolExecutor timerService;private final AtomicInteger status;public SystemProcessingTimeService(AsyncExceptionHandler failureHandler, Object checkpointLock) {this(failureHandler, checkpointLock, null);}public SystemProcessingTimeService(AsyncExceptionHandler task,Object checkpointLock,ThreadFactory threadFactory) {this.task = checkNotNull(task);this.checkpointLock = checkNotNull(checkpointLock);this.status = new AtomicInteger(STATUS_ALIVE);if (threadFactory == null) {this.timerService = new ScheduledThreadPoolExecutor(1);} else {this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);}// tasks should be removed if the future is canceledthis.timerService.setRemoveOnCancelPolicy(true);// make sure shutdown removes all pending tasksthis.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);}@Overridepublic long getCurrentProcessingTime() {return System.currentTimeMillis();}@Overridepublic ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {// delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark// T says we won't see elements in the future with a timestamp smaller or equal to T.// With processing time, we therefore need to delay firing the timer by one ms.long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;// we directly try to register the timer and only react to the status on exception// that way we save unnecessary volatile accesses for each timertry {return timerService.schedule(new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);}catch (RejectedExecutionException e) {final int status = this.status.get();if (status == STATUS_QUIESCED) {return new NeverCompleteFuture(delay);}else if (status == STATUS_SHUTDOWN) {throw new IllegalStateException("Timer service is shut down");}else {// something else happened, so propagate the exceptionthrow e;}}}//...... }- SystemProcessingTimeService的registerTimer方法根據指定的timestamp注冊了一個延時任務TriggerTask;timerService為JDK自帶的ScheduledThreadPoolExecutor;TriggerTask的run方法會在service狀態為STATUS_LIVE時,觸發ProcessingTimeCallback(這里為TimestampsAndPeriodicWatermarksOperator)的onProcessingTime方法
WindowOperator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@Internal public class WindowOperator<K, IN, ACC, OUT, W extends Window>extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {//......@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {final Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);//if element is handled by none of assigned elementWindowsboolean isSkippedElement = true;final K key = this.<K>getKeyedStateBackend().getCurrentKey();if (windowAssigner instanceof MergingWindowAssigner) {//......} else {for (W window: elementWindows) {// drop if the window is already lateif (isWindowLate(window)) {continue;}isSkippedElement = false;windowState.setCurrentNamespace(window);windowState.add(element.getValue());triggerContext.key = key;triggerContext.window = window;TriggerResult triggerResult = triggerContext.onElement(element);if (triggerResult.isFire()) {ACC contents = windowState.get();if (contents == null) {continue;}emitWindowContents(window, contents);}if (triggerResult.isPurge()) {windowState.clear();}registerCleanupTimer(window);}}// side output input event if// element not handled by any window// late arriving tag has been set// windowAssigner is event time and current timestamp + allowed lateness no less than element timestampif (isSkippedElement && isElementLate(element)) {if (lateDataOutputTag != null){sideOutput(element);} else {this.numLateRecordsDropped.inc();}}}/*** Emits the contents of the given window using the {@link InternalWindowFunction}.*/@SuppressWarnings("unchecked")private void emitWindowContents(W window, ACC contents) throws Exception {timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());processContext.window = window;userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);}//...... }- WindowOperator的processElement方法會把element添加到windowState,這里為HeapAggregatingState,即在內存中累積,之后調用triggerContext.onElement方法(里頭使用的是trigger.onElement方法,這里的trigger為EventTimeTrigger)獲取TriggerResult,如果需要fire,則會觸發emitWindowContents,如果需要purge則會清空windowState;emitWindowContents則是調用userFunction.process執行用戶定義的窗口操作
EventTimeTrigger
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@PublicEvolving public class EventTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private EventTimeTrigger() {}@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediatelyreturn TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {return time == window.maxTimestamp() ?TriggerResult.FIRE :TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(TimeWindow window,OnMergeContext ctx) {// only register a timer if the watermark is not yet past the end of the merged window// this is in line with the logic in onElement(). If the watermark is past the end of// the window onElement() will fire and setting a timer here would fire the window twice.long windowMaxTimestamp = window.maxTimestamp();if (windowMaxTimestamp > ctx.getCurrentWatermark()) {ctx.registerEventTimeTimer(windowMaxTimestamp);}}@Overridepublic String toString() {return "EventTimeTrigger()";}public static EventTimeTrigger create() {return new EventTimeTrigger();} }- EventTimeTrigger的onElement方法會判斷,如果window.maxTimestamp() <= ctx.getCurrentWatermark()則會返回TriggerResult.FIRE,告知WindowOperator可以emitWindowContents
小結
- flink支持consecutive windowed operations,比如先根據key進行partition,然后再按指定的window對這些key進行計數,之后對該dataStream進行windowAll操作,其時間WindowAssigner與前面的相同,這樣可以達到在同樣的時間窗口內先partition匯總,再全局匯總的效果(可以解決類似top-k elements的問題)
- AssignerWithPeriodicWatermarks或者AssignerWithPunctuatedWatermarks它們有兩個功能,一個是從element提取timestamp作為eventTime,一個就是發射watermark;由于element實際上不一定是嚴格按eventTime時間到來的,可能存在亂序,因而watermark的作用就是限制遲到的數據進入窗口,不讓窗口無限等待遲到的可能屬于該窗口的element,即告知窗口eventTime小于等于該watermark的元素可以認為都到達了(窗口可以根據自己設定的時間范圍,借助trigger判斷是否可以關閉窗口然后開始對該窗口數據執行相關操作);對于consecutive windowed operations來說,上游的watermark會forward給下游的operations
- Trigger的作用就是告知WindowOperator什么時候可以對關閉該窗口開始對該窗口數據執行相關操作(返回TriggerResult.FIRE的情況下),對于EventTimeTrigger來說,其onElement方法的判斷邏輯跟watermark相關,如果window.maxTimestamp() <= ctx.getCurrentWatermark()則會返回TriggerResult.FIRE
doc
- Consecutive windowed operations
轉載于:https://my.oschina.net/go4it/blog/2999461
總結
以上是生活随笔為你收集整理的聊聊flink的consecutive windowed operations的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: TCP/UDP的小事情
- 下一篇: WMI入门(三):我需要的类在哪里?