Flink 零基础实战教程:如何计算实时热门商品
在上一篇入門教程中,我們已經(jīng)能夠快速構(gòu)建一個(gè)基礎(chǔ)的 Flink 程序了。本文會(huì)一步步地帶領(lǐng)你實(shí)現(xiàn)一個(gè)更復(fù)雜的 Flink 應(yīng)用程序:實(shí)時(shí)熱門商品。在開始本文前我們建議你先實(shí)踐一遍上篇文章,因?yàn)楸疚臅?huì)沿用上文的my-flink-project項(xiàng)目框架。
通過(guò)本文你將學(xué)到:
- 如何基于 EventTime 處理,如何指定 Watermark
- 如何使用 Flink 靈活的 Window API
- 何時(shí)需要用到 State,以及如何使用
- 如何使用 ProcessFunction 實(shí)現(xiàn) TopN 功能
實(shí)戰(zhàn)案例介紹
“實(shí)時(shí)熱門商品”的需求,我們可以將“實(shí)時(shí)熱門商品”翻譯成程序員更好理解的需求:每隔5分鐘輸出最近一小時(shí)內(nèi)點(diǎn)擊量最多的前 N 個(gè)商品。將這個(gè)需求進(jìn)行分解我們大概要做這么幾件事情:
- 抽取出業(yè)務(wù)時(shí)間戳,告訴 Flink 框架基于業(yè)務(wù)時(shí)間做窗口
- 過(guò)濾出點(diǎn)擊行為數(shù)據(jù)
- 按一小時(shí)的窗口大小,每5分鐘統(tǒng)計(jì)一次,做滑動(dòng)窗口聚合(Sliding Window)
- 按每個(gè)窗口聚合,輸出每個(gè)窗口中點(diǎn)擊量前N名的商品
數(shù)據(jù)準(zhǔn)備
這里我們準(zhǔn)備了一份淘寶用戶行為數(shù)據(jù)集(來(lái)自阿里云天池公開數(shù)據(jù)集,特別感謝)。本數(shù)據(jù)集包含了淘寶上某一天隨機(jī)一百萬(wàn)用戶的所有行為(包括點(diǎn)擊、購(gòu)買、加購(gòu)、收藏)。數(shù)據(jù)集的組織形式和MovieLens-20M類似,即數(shù)據(jù)集的每一行表示一條用戶行為,由用戶ID、商品ID、商品類目ID、行為類型和時(shí)間戳組成,并以逗號(hào)分隔。關(guān)于數(shù)據(jù)集中每一列的詳細(xì)描述如下:
| 用戶ID | 整數(shù)類型,加密后的用戶ID |
| 商品ID | 整數(shù)類型,加密后的商品ID |
| 商品類目ID | 整數(shù)類型,加密后的商品所屬類目ID |
| 行為類型 | 字符串,枚舉類型,包括(‘pv’, ‘buy’, ‘cart’, ‘fav’) |
| 時(shí)間戳 | 行為發(fā)生的時(shí)間戳,單位秒 |
你可以通過(guò)下面的命令下載數(shù)據(jù)集到項(xiàng)目的 resources 目錄下:
$ cd my-flink-project/src/main/resources $ curl https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv > UserBehavior.csv這里是否使用 curl 命令下載數(shù)據(jù)并不重要,你也可以使用 wget 命令或者直接訪問鏈接下載數(shù)據(jù)。關(guān)鍵是,將數(shù)據(jù)文件保存到項(xiàng)目的 resources 目錄下,方便應(yīng)用程序訪問。
編寫程序
在 src/main/java/myflink 下創(chuàng)建 HotItems.java 文件:
package myflink;public class HotItems {public static void main(String[] args) throws Exception {} }與上文一樣,我們會(huì)一步步往里面填充代碼。第一步仍然是創(chuàng)建一個(gè) StreamExecutionEnvironment,我們把它添加到 main 函數(shù)中。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 為了打印到控制臺(tái)的結(jié)果不亂序,我們配置全局的并發(fā)為1,這里改變并發(fā)對(duì)結(jié)果正確性沒有影響 env.setParallelism(1);創(chuàng)建模擬數(shù)據(jù)源
在數(shù)據(jù)準(zhǔn)備章節(jié),我們已經(jīng)將測(cè)試的數(shù)據(jù)集下載到本地了。由于是一個(gè)csv文件,我們將使用 CsvInputFormat 創(chuàng)建模擬數(shù)據(jù)源。
注:雖然一個(gè)流式應(yīng)用應(yīng)該是一個(gè)一直運(yùn)行著的程序,需要消費(fèi)一個(gè)無(wú)限數(shù)據(jù)源。但是在本案例教程中,為了省去構(gòu)建真實(shí)數(shù)據(jù)源的繁瑣,我們使用了文件來(lái)模擬真實(shí)數(shù)據(jù)源,這并不影響下文要介紹的知識(shí)點(diǎn)。這也是一種本地驗(yàn)證 Flink 應(yīng)用程序正確性的常用方式。
我們先創(chuàng)建一個(gè) UserBehavior 的 POJO 類(所有成員變量聲明成public便是POJO類),強(qiáng)類型化后能方便后續(xù)的處理。
/** 用戶行為數(shù)據(jù)結(jié)構(gòu) **/ public static class UserBehavior {public long userId; // 用戶IDpublic long itemId; // 商品IDpublic int categoryId; // 商品類目IDpublic String behavior; // 用戶行為, 包括("pv", "buy", "cart", "fav")public long timestamp; // 行為發(fā)生的時(shí)間戳,單位秒 }接下來(lái)我們就可以創(chuàng)建一個(gè) PojoCsvInputFormat 了, 這是一個(gè)讀取 csv 文件并將每一行轉(zhuǎn)成指定 POJO
類型(在我們案例中是 UserBehavior)的輸入器。
下一步我們用 PojoCsvInputFormat 創(chuàng)建輸入源。
DataStream<UserBehavior> dataSource = env.createInput(csvInput, pojoType);這就創(chuàng)建了一個(gè) UserBehavior 類型的 DataStream。
EventTime 與 Watermark
當(dāng)我們說(shuō)“統(tǒng)計(jì)過(guò)去一小時(shí)內(nèi)點(diǎn)擊量”,這里的“一小時(shí)”是指什么呢? 在 Flink 中它可以是指 ProcessingTime ,也可以是 EventTime,由用戶決定。
- ProcessingTime:事件被處理的時(shí)間。也就是由機(jī)器的系統(tǒng)時(shí)間來(lái)決定。
- EventTime:事件發(fā)生的時(shí)間。一般就是數(shù)據(jù)本身攜帶的時(shí)間。
在本案例中,我們需要統(tǒng)計(jì)業(yè)務(wù)時(shí)間上的每小時(shí)的點(diǎn)擊量,所以要基于 EventTime 來(lái)處理。那么如果讓 Flink 按照我們想要的業(yè)務(wù)時(shí)間來(lái)處理呢?這里主要有兩件事情要做。
第一件是告訴 Flink 我們現(xiàn)在按照 EventTime 模式進(jìn)行處理,Flink 默認(rèn)使用 ProcessingTime 處理,所以我們要顯式設(shè)置下。
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);第二件事情是指定如何獲得業(yè)務(wù)時(shí)間,以及生成 Watermark。Watermark 是用來(lái)追蹤業(yè)務(wù)事件的概念,可以理解成 EventTime 世界中的時(shí)鐘,用來(lái)指示當(dāng)前處理到什么時(shí)刻的數(shù)據(jù)了。由于我們的數(shù)據(jù)源的數(shù)據(jù)已經(jīng)經(jīng)過(guò)整理,沒有亂序,即事件的時(shí)間戳是單調(diào)遞增的,所以可以將每條數(shù)據(jù)的業(yè)務(wù)時(shí)間就當(dāng)做 Watermark。這里我們用 AscendingTimestampExtractor 來(lái)實(shí)現(xiàn)時(shí)間戳的抽取和 Watermark 的生成。
注:真實(shí)業(yè)務(wù)場(chǎng)景一般都是存在亂序的,所以一般使用 BoundedOutOfOrdernessTimestampExtractor。
DataStream<UserBehavior> timedData = dataSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {@Overridepublic long extractAscendingTimestamp(UserBehavior userBehavior) {// 原始數(shù)據(jù)單位秒,將其轉(zhuǎn)成毫秒return userBehavior.timestamp * 1000;}});這樣我們就得到了一個(gè)帶有時(shí)間標(biāo)記的數(shù)據(jù)流了,后面就能做一些窗口的操作。
過(guò)濾出點(diǎn)擊事件
在開始窗口操作之前,先回顧下需求“每隔5分鐘輸出過(guò)去一小時(shí)內(nèi)點(diǎn)擊量最多的前 N 個(gè)商品”。由于原始數(shù)據(jù)中存在點(diǎn)擊、加購(gòu)、購(gòu)買、收藏各種行為的數(shù)據(jù),但是我們只需要統(tǒng)計(jì)點(diǎn)擊量,所以先使用 FilterFunction 將點(diǎn)擊行為數(shù)據(jù)過(guò)濾出來(lái)。
DataStream<UserBehavior> pvData = timedData.filter(new FilterFunction<UserBehavior>() {@Overridepublic boolean filter(UserBehavior userBehavior) throws Exception {// 過(guò)濾出只有點(diǎn)擊的數(shù)據(jù)return userBehavior.behavior.equals("pv");}});窗口統(tǒng)計(jì)點(diǎn)擊量
由于要每隔5分鐘統(tǒng)計(jì)一次最近一小時(shí)每個(gè)商品的點(diǎn)擊量,所以窗口大小是一小時(shí),每隔5分鐘滑動(dòng)一次。即分別要統(tǒng)計(jì) [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)… 等窗口的商品點(diǎn)擊量。是一個(gè)常見的滑動(dòng)窗口需求(Sliding Window)。
DataStream<ItemViewCount> windowedData = pvData.keyBy("itemId").timeWindow(Time.minutes(60), Time.minutes(5)).aggregate(new CountAgg(), new WindowResultFunction());我們使用.keyBy("itemId")對(duì)商品進(jìn)行分組,使用.timeWindow(Time size, Time slide)對(duì)每個(gè)商品做滑動(dòng)窗口(1小時(shí)窗口,5分鐘滑動(dòng)一次)。然后我們使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉數(shù)據(jù),減少 state 的存儲(chǔ)壓力。較之.apply(WindowFunction wf)會(huì)將窗口中的數(shù)據(jù)都存儲(chǔ)下來(lái),最后一起計(jì)算要高效地多。aggregate()方法的第一個(gè)參數(shù)用于
這里的CountAgg實(shí)現(xiàn)了AggregateFunction接口,功能是統(tǒng)計(jì)窗口中的條數(shù),即遇到一條數(shù)據(jù)就加一。
/** COUNT 統(tǒng)計(jì)的聚合函數(shù)實(shí)現(xiàn),每出現(xiàn)一條記錄加一 */ public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(UserBehavior userBehavior, Long acc) {return acc + 1;}@Overridepublic Long getResult(Long acc) {return acc;}@Overridepublic Long merge(Long acc1, Long acc2) {return acc1 + acc2;} }.aggregate(AggregateFunction af, WindowFunction wf) 的第二個(gè)參數(shù)WindowFunction將每個(gè) key每個(gè)窗口聚合后的結(jié)果帶上其他信息進(jìn)行輸出。我們這里實(shí)現(xiàn)的WindowResultFunction將主鍵商品ID,窗口,點(diǎn)擊量封裝成了ItemViewCount進(jìn)行輸出。
/** 用于輸出窗口的結(jié)果 */ public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {@Overridepublic void apply(Tuple key, // 窗口的主鍵,即 itemIdTimeWindow window, // 窗口Iterable<Long> aggregateResult, // 聚合函數(shù)的結(jié)果,即 count 值Collector<ItemViewCount> collector // 輸出類型為 ItemViewCount) throws Exception {Long itemId = ((Tuple1<Long>) key).f0;Long count = aggregateResult.iterator().next();collector.collect(ItemViewCount.of(itemId, window.getEnd(), count));} }/** 商品點(diǎn)擊量(窗口操作的輸出類型) */ public static class ItemViewCount {public long itemId; // 商品IDpublic long windowEnd; // 窗口結(jié)束時(shí)間戳public long viewCount; // 商品的點(diǎn)擊量public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {ItemViewCount result = new ItemViewCount();result.itemId = itemId;result.windowEnd = windowEnd;result.viewCount = viewCount;return result;} }現(xiàn)在我們得到了每個(gè)商品在每個(gè)窗口的點(diǎn)擊量的數(shù)據(jù)流。
TopN 計(jì)算最熱門商品
為了統(tǒng)計(jì)每個(gè)窗口下最熱門的商品,我們需要再次按窗口進(jìn)行分組,這里根據(jù)ItemViewCount中的windowEnd進(jìn)行keyBy()操作。然后使用 ProcessFunction 實(shí)現(xiàn)一個(gè)自定義的 TopN 函數(shù) TopNHotItems 來(lái)計(jì)算點(diǎn)擊量排名前3名的商品,并將排名結(jié)果格式化成字符串,便于后續(xù)輸出。
DataStream<String> topItems = windowedData.keyBy("windowEnd").process(new TopNHotItems(3)); // 求點(diǎn)擊量前3名的商品ProcessFunction 是 Flink 提供的一個(gè) low-level API,用于實(shí)現(xiàn)更高級(jí)的功能。它主要提供了定時(shí)器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我們將利用 timer 來(lái)判斷何時(shí)收齊了某個(gè) window 下所有商品的點(diǎn)擊量數(shù)據(jù)。由于 Watermark 的進(jìn)度是全局的,
在 processElement 方法中,每當(dāng)收到一條數(shù)據(jù)(ItemViewCount),我們就注冊(cè)一個(gè) windowEnd+1 的定時(shí)器(Flink 框架會(huì)自動(dòng)忽略同一時(shí)間的重復(fù)注冊(cè))。windowEnd+1 的定時(shí)器被觸發(fā)時(shí),意味著收到了windowEnd+1的 Watermark,即收齊了該windowEnd下的所有商品窗口統(tǒng)計(jì)值。我們?cè)?onTimer() 中處理將收集的所有商品及點(diǎn)擊量進(jìn)行排序,選出 TopN,并將排名信息格式化成字符串后進(jìn)行輸出。
這里我們還使用了 ListState 來(lái)存儲(chǔ)收到的每條 ItemViewCount 消息,保證在發(fā)生故障時(shí),狀態(tài)數(shù)據(jù)的不丟失和一致性。ListState 是 Flink 提供的類似 Java List 接口的 State API,它集成了框架的 checkpoint 機(jī)制,自動(dòng)做到了 exactly-once 的語(yǔ)義保證。
/** 求某個(gè)窗口中前 N 名的熱門點(diǎn)擊商品,key 為窗口時(shí)間戳,輸出為 TopN 的結(jié)果字符串 */ public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {private final int topSize;public TopNHotItems(int topSize) {this.topSize = topSize;}// 用于存儲(chǔ)商品與點(diǎn)擊數(shù)的狀態(tài),待收齊同一個(gè)窗口的數(shù)據(jù)后,再觸發(fā) TopN 計(jì)算private ListState<ItemViewCount> itemState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 狀態(tài)的注冊(cè)ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>("itemState-state",ItemViewCount.class);itemState = getRuntimeContext().getListState(itemsStateDesc);}@Overridepublic void processElement(ItemViewCount input,Context context,Collector<String> collector) throws Exception {// 每條數(shù)據(jù)都保存到狀態(tài)中itemState.add(input);// 注冊(cè) windowEnd+1 的 EventTime Timer, 當(dāng)觸發(fā)時(shí),說(shuō)明收齊了屬于windowEnd窗口的所有商品數(shù)據(jù)context.timerService().registerEventTimeTimer(input.windowEnd + 1);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {// 獲取收到的所有商品點(diǎn)擊量List<ItemViewCount> allItems = new ArrayList<>();for (ItemViewCount item : itemState.get()) {allItems.add(item);}// 提前清除狀態(tài)中的數(shù)據(jù),釋放空間itemState.clear();// 按照點(diǎn)擊量從大到小排序allItems.sort(new Comparator<ItemViewCount>() {@Overridepublic int compare(ItemViewCount o1, ItemViewCount o2) {return (int) (o2.viewCount - o1.viewCount);}});// 將排名信息格式化成 String, 便于打印StringBuilder result = new StringBuilder();result.append("====================================\n");result.append("時(shí)間: ").append(new Timestamp(timestamp-1)).append("\n");for (int i=0;i<topSize;i++) {ItemViewCount currentItem = allItems.get(i);// No1: 商品ID=12224 瀏覽量=2413result.append("No").append(i).append(":").append(" 商品ID=").append(currentItem.itemId).append(" 瀏覽量=").append(currentItem.viewCount).append("\n");}result.append("====================================\n\n");out.collect(result.toString());} }打印輸出
最后一步我們將結(jié)果打印輸出到控制臺(tái),并調(diào)用env.execute執(zhí)行任務(wù)。
topItems.print(); env.execute("Hot Items Job");運(yùn)行程序
直接運(yùn)行 main 函數(shù),就能看到不斷輸出的每個(gè)時(shí)間點(diǎn)的熱門商品ID。
總結(jié)
本文的完整代碼可以通過(guò) GitHub 訪問到。本文通過(guò)實(shí)現(xiàn)一個(gè)“實(shí)時(shí)熱門商品”的案例,學(xué)習(xí)和實(shí)踐了 Flink 的多個(gè)核心概念和 API 用法。包括 EventTime、Watermark 的使用,State 的使用,Window API 的使用,以及 TopN 的實(shí)現(xiàn)。希望本文能加深大家對(duì) Flink 的理解,幫助大家解決實(shí)戰(zhàn)上遇到的問題。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的Flink 零基础实战教程:如何计算实时热门商品的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 重构技术架构首先解决组织架构
- 下一篇: 2019阿里云910会员节大促主会场全攻