Flink 双流 Join 的3种操作示例
在數據庫中的靜態表上做 OLAP 分析時,兩表 join 是非常常見的操作。同理,在流式處理作業中,有時也需要在兩條流上做 join 以獲得更豐富的信息。Flink DataStream API 為用戶提供了3個算子來實現雙流 join,分別是:
?
- join()
- coGroup()
- intervalJoin()
?
本文舉例說明它們的使用方法,順便聊聊比較特殊的 interval join 的原理。
?
準備數據
?
從 Kafka 分別接入點擊流和訂單流,并轉化為 POJO。
?
DataStream<String> clickSourceStream = env.addSource(new FlinkKafkaConsumer011<>("ods_analytics_access_log",new SimpleStringSchema(),kafkaProps).setStartFromLatest()); DataStream<String> orderSourceStream = env.addSource(new FlinkKafkaConsumer011<>("ods_ms_order_done",new SimpleStringSchema(),kafkaProps).setStartFromLatest());DataStream<AnalyticsAccessLogRecord> clickRecordStream = clickSourceStream.map(message -> JSON.parseObject(message, AnalyticsAccessLogRecord.class)); DataStream<OrderDoneLogRecord> orderRecordStream = orderSourceStream.map(message -> JSON.parseObject(message, OrderDoneLogRecord.class));?
join()
?
join() 算子提供的語義為"Window join",即按照指定字段和(滾動/滑動/會話)窗口進行 inner join,支持處理時間和事件時間兩種時間特征。以下示例以10秒滾動窗口,將兩個流通過商品 ID 關聯,取得訂單流中的售價相關字段。
?
clickRecordStream.join(orderRecordStream).where(record -> record.getMerchandiseId()).equalTo(record -> record.getMerchandiseId()).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {@Overridepublic String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception {return StringUtils.join(Arrays.asList(accessRecord.getMerchandiseId(),orderRecord.getPrice(),orderRecord.getCouponMoney(),orderRecord.getRebateAmount()), '\t');}}).print().setParallelism(1);?
簡單易用。
?
coGroup()
?
只有 inner join 肯定還不夠,如何實現 left/right outer join 呢?答案就是利用 coGroup() 算子。它的調用方式類似于 join() 算子,也需要開窗,但是 CoGroupFunction 比 JoinFunction 更加靈活,可以按照用戶指定的邏輯匹配左流和/或右流的數據并輸出。
?
以下的例子就實現了點擊流 left join 訂單流的功能,是很樸素的 nested loop join 思想(二重循環)。
?
clickRecordStream.coGroup(orderRecordStream).where(record -> record.getMerchandiseId()).equalTo(record -> record.getMerchandiseId()).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).apply(new CoGroupFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, Tuple2<String, Long>>() {@Overridepublic void coGroup(Iterable<AnalyticsAccessLogRecord> accessRecords, Iterable<OrderDoneLogRecord> orderRecords, Collector<Tuple2<String, Long>> collector) throws Exception {for (AnalyticsAccessLogRecord accessRecord : accessRecords) {boolean isMatched = false;for (OrderDoneLogRecord orderRecord : orderRecords) {// 右流中有對應的記錄collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice()));isMatched = true;}if (!isMatched) {// 右流中沒有對應的記錄collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null));}}}}).print().setParallelism(1);?
intervalJoin()
?
join() 和 coGroup() 都是基于窗口做關聯的。但是在某些情況下,兩條流的數據步調未必一致。例如,訂單流的數據有可能在點擊流的購買動作發生之后很久才被寫入,如果用窗口來圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的語義,按照指定字段以及右流相對左流偏移的時間區間進行關聯,即:
?
right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]?
interval join 也是 inner join,雖然不需要開窗,但是需要用戶指定偏移區間的上下界,并且只支持事件時間。
?
示例代碼如下。注意在運行之前,需要分別在兩個流上應用 assignTimestampsAndWatermarks() 方法獲取事件時間戳和水印。
?
clickRecordStream.keyBy(record -> record.getMerchandiseId()).intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId())).between(Time.seconds(-30), Time.seconds(30)).process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {@Overridepublic void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {collector.collect(StringUtils.join(Arrays.asList(accessRecord.getMerchandiseId(),orderRecord.getPrice(),orderRecord.getCouponMoney(),orderRecord.getRebateAmount()), '\t'));}}).print().setParallelism(1);?
由上可見,interval join 與 window join 不同,是兩個 KeyedStream 之上的操作,并且需要調用 between() 方法指定偏移區間的上下界。如果想令上下界是開區間,可以調用 upperBoundExclusive()/lowerBoundExclusive() 方法。
?
interval join 的實現原理
?
以下是 KeyedStream.process(ProcessJoinFunction) 方法調用的重載方法的邏輯。
?
public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,TypeInformation<OUT> outputType) {Preconditions.checkNotNull(processJoinFunction);Preconditions.checkNotNull(outputType);final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =new IntervalJoinOperator<>(lowerBound,upperBound,lowerBoundInclusive,upperBoundInclusive,left.getType().createSerializer(left.getExecutionConfig()),right.getType().createSerializer(right.getExecutionConfig()),cleanedUdf);return left.connect(right).keyBy(keySelector1, keySelector2).transform("Interval Join", outputType, operator); }?
可見是先對兩條流執行 connect() 和 keyBy() 操作,然后利用 IntervalJoinOperator 算子進行轉換。在 IntervalJoinOperator 中,會利用兩個 MapState 分別緩存左流和右流的數據。
?
private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer; private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;@Override public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(LEFT_BUFFER,LongSerializer.INSTANCE,new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))));this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(RIGHT_BUFFER,LongSerializer.INSTANCE,new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer)))); }?
其中 Long 表示事件時間戳,List> 表示該時刻到來的數據記錄。當左流和右流有數據到達時,會分別調用 processElement1() 和 processElement2() 方法,它們都調用了 processElement() 方法,代碼如下。
?
@Override public void processElement1(StreamRecord<T1> record) throws Exception {processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true); }@Override public void processElement2(StreamRecord<T2> record) throws Exception {processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false); }@SuppressWarnings("unchecked") private <THIS, OTHER> void processElement(final StreamRecord<THIS> record,final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,final long relativeLowerBound,final long relativeUpperBound,final boolean isLeft) throws Exception {final THIS ourValue = record.getValue();final long ourTimestamp = record.getTimestamp();if (ourTimestamp == Long.MIN_VALUE) {throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +"interval stream joins need to have timestamps meaningful timestamps.");}if (isLate(ourTimestamp)) {return;}addToBuffer(ourBuffer, ourValue, ourTimestamp);for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {final long timestamp = bucket.getKey();if (timestamp < ourTimestamp + relativeLowerBound ||timestamp > ourTimestamp + relativeUpperBound) {continue;}for (BufferEntry<OTHER> entry: bucket.getValue()) {if (isLeft) {collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);} else {collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);}}}long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;if (isLeft) {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);} else {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);} }?
這段代碼的思路是:
?
?
private?void?collect(T1?left,?T2?right,?long?leftTimestamp,?long?rightTimestamp)?throws?Exception?{final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);collector.setAbsoluteTimestamp(resultTimestamp);context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);userFunction.processElement(left, right, context, collector); }?
?
@Override public void onEventTime(InternalTimer<K, String> timer) throws Exception {long timerTimestamp = timer.getTimestamp();String namespace = timer.getNamespace();logger.trace("onEventTime @ {}", timerTimestamp);switch (namespace) {case CLEANUP_NAMESPACE_LEFT: {long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;logger.trace("Removing from left buffer @ {}", timestamp);leftBuffer.remove(timestamp);break;}case CLEANUP_NAMESPACE_RIGHT: {long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;logger.trace("Removing from right buffer @ {}", timestamp);rightBuffer.remove(timestamp);break;}default:throw new RuntimeException("Invalid namespace " + namespace);} }?
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的Flink 双流 Join 的3种操作示例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深度 | 面向云原生数据湖的元数据管理技
- 下一篇: 实时计算pv/uv Demo