阿里技术:基于Kafka+Flink+Redis的电商大屏实时计算案例
作者:zhisheng
cloud.tencent.com/developer/article/1558372
阿里的雙11銷量大屏可以說是一道特殊的風景線。實時大屏(real-time dashboard)正在被越來越多的企業采用,用來及時呈現關鍵的數據指標。并且在實際操作中,肯定也不會僅僅計算一兩個維度。由于Flink的“真·流式計算”這一特點,它比Spark Streaming要更適合大屏應用。本文從筆者的實際工作經驗抽象出簡單的模型,并簡要敘述計算流程(當然大部分都是源碼)。
前言
數據格式與接入
簡化的子訂單消息體如下。
由于訂單可能會包含多種商品,故會被拆分成子訂單來表示,每條JSON消息表示一個子訂單。現在要按照自然日來統計以下指標,并以1秒的刷新頻率呈現在大屏上:
每個站點(站點ID即siteId)的總訂單數、子訂單數、銷量與GMV;
當前銷量排名前N的商品(商品ID即merchandiseId)與它們的銷量。
由于大屏的最大訴求是實時性,等待遲到數據顯然不太現實,因此我們采用處理時間作為時間特征,并以1分鐘的頻率做checkpointing。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(30 * 1000);然后訂閱Kafka的訂單消息作為數據源。
Properties consumerProps = ParameterUtil.getFromResourceFile("kafka.properties");DataStream<String> sourceStream = env.addSource(new FlinkKafkaConsumer011<>(ORDER_EXT_TOPIC_NAME, // topicnew SimpleStringSchema(), // deserializerconsumerProps // consumer properties)).setParallelism(PARTITION_COUNT).name("source_kafka_" + ORDER_EXT_TOPIC_NAME).uid("source_kafka_" + ORDER_EXT_TOPIC_NAME);給帶狀態的算子設定算子ID(通過調用uid()方法)是個好習慣,能夠保證Flink應用從保存點重啟時能夠正確恢復狀態現場。為了盡量穩妥,Flink官方也建議為每個算子都顯式地設定ID,參考:https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job
接下來將JSON數據轉化為POJO,JSON框架采用FastJSON。
DataStream<SubOrderDetail> orderStream = sourceStream.map(message -> JSON.parseObject(message, SubOrderDetail.class)).name("map_sub_order_detail").uid("map_sub_order_detail");JSON已經是預先處理好的標準化格式,所以POJO類SubOrderDetail的寫法可以通過Lombok極大地簡化。如果JSON的字段有不規范的,那么就需要手寫Getter和Setter,并用@JSONField注解來指明。
@Getter @Setter @NoArgsConstructor @AllArgsConstructor @ToString public class SubOrderDetail implements Serializable {private static final long serialVersionUID = 1L;private long userId;private long orderId;private long subOrderId;private long siteId;private String siteName;private long cityId;private String cityName;private long warehouseId;private long merchandiseId;private long price;private long quantity;private int orderStatus;private int isNewOrder;private long timestamp; }統計站點指標
將子訂單流按站點ID分組,開1天的滾動窗口,并同時設定ContinuousProcessingTimeTrigger觸發器,以1秒周期觸發計算。注意處理時間的時區問題,這是老生常談了。
接下來寫個聚合函數。
DataStream<OrderAccumulator> siteAggStream = siteDayWindowStream.aggregate(new OrderAndGmvAggregateFunc()).name("aggregate_site_order_gmv").uid("aggregate_site_order_gmv"); public static final class OrderAndGmvAggregateFuncimplements AggregateFunction<SubOrderDetail, OrderAccumulator, OrderAccumulator> {private static final long serialVersionUID = 1L;@Overridepublic OrderAccumulator createAccumulator() {return new OrderAccumulator();}@Overridepublic OrderAccumulator add(SubOrderDetail record, OrderAccumulator acc) {if (acc.getSiteId() == 0) {acc.setSiteId(record.getSiteId());acc.setSiteName(record.getSiteName());}acc.addOrderId(record.getOrderId());acc.addSubOrderSum(1);acc.addQuantitySum(record.getQuantity());acc.addGmv(record.getPrice() * record.getQuantity());return acc;}@Overridepublic OrderAccumulator getResult(OrderAccumulator acc) {return acc;}@Overridepublic OrderAccumulator merge(OrderAccumulator acc1, OrderAccumulator acc2) {if (acc1.getSiteId() == 0) {acc1.setSiteId(acc2.getSiteId());acc1.setSiteName(acc2.getSiteName());}acc1.addOrderIds(acc2.getOrderIds());acc1.addSubOrderSum(acc2.getSubOrderSum());acc1.addQuantitySum(acc2.getQuantitySum());acc1.addGmv(acc2.getGmv());return acc1;}}累加器類OrderAccumulator的實現很簡單,看源碼就大概知道它的結構了,因此不再多廢話。唯一需要注意的是訂單ID可能重復,所以需要用名為orderIds的HashSet來保存它。HashSet應付我們目前的數據規模還是沒太大問題的,如果是海量數據,就考慮換用HyperLogLog吧。
接下來就該輸出到Redis供呈現端查詢了。這里有個問題:一秒內有數據變化的站點并不多,而ContinuousProcessingTimeTrigger每次觸發都會輸出窗口里全部的聚合數據,這樣做了很多無用功,并且還會增大Redis的壓力。所以,我們在聚合結果后再接一個ProcessFunction,代碼如下。
DataStream<Tuple2<Long, String>> siteResultStream = siteAggStream.keyBy(0).process(new OutputOrderGmvProcessFunc(), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {})).name("process_site_gmv_changed").uid("process_site_gmv_changed"); public static final class OutputOrderGmvProcessFuncextends KeyedProcessFunction<Tuple, OrderAccumulator, Tuple2<Long, String>> {private static final long serialVersionUID = 1L;private MapState<Long, OrderAccumulator> state;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);state = this.getRuntimeContext().getMapState(new MapStateDescriptor<>("state_site_order_gmv",Long.class,OrderAccumulator.class));}@Overridepublic void processElement(OrderAccumulator value, Context ctx, Collector<Tuple2<Long, String>> out) throws Exception {long key = value.getSiteId();OrderAccumulator cachedValue = state.get(key);if (cachedValue == null || value.getSubOrderSum() != cachedValue.getSubOrderSum()) {JSONObject result = new JSONObject();result.put("site_id", value.getSiteId());result.put("site_name", value.getSiteName());result.put("quantity", value.getQuantitySum());result.put("orderCount", value.getOrderIds().size());result.put("subOrderCount", value.getSubOrderSum());result.put("gmv", value.getGmv());out.collect(new Tuple2<>(key, result.toJSONString());state.put(key, value);}}@Overridepublic void close() throws Exception {state.clear();super.close();}}說來也簡單,就是用一個MapState狀態緩存當前所有站點的聚合數據。由于數據源是以子訂單為單位的,因此如果站點ID在MapState中沒有緩存,或者緩存的子訂單數與當前子訂單數不一致,表示結果有更新,這樣的數據才允許輸出。
最后就可以安心地接上Redis Sink了,結果會被存進一個Hash結構里。
// 看官請自己構造合適的FlinkJedisPoolConfigFlinkJedisPoolConfig jedisPoolConfig = ParameterUtil.getFlinkJedisPoolConfig(false, true);siteResultStream.addSink(new RedisSink<>(jedisPoolConfig, new GmvRedisMapper())).name("sink_redis_site_gmv").uid("sink_redis_site_gmv").setParallelism(1); public static final class GmvRedisMapper implements RedisMapper<Tuple2<Long, String>> {private static final long serialVersionUID = 1L;private static final String HASH_NAME_PREFIX = "RT:DASHBOARD:GMV:";@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, HASH_NAME_PREFIX);}@Overridepublic String getKeyFromData(Tuple2<Long, String> data) {return String.valueOf(data.f0);}@Overridepublic String getValueFromData(Tuple2<Long, String> data) {return data.f1;}@Overridepublic Optional<String> getAdditionalKey(Tuple2<Long, String> data) {return Optional.of(HASH_NAME_PREFIX +new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) +"SITES");}}商品Top N
我們可以直接復用前面產生的orderStream,玩法與上面的GMV統計大同小異。這里用1秒滾動窗口就可以了。
聚合函數與窗口函數的實現更加簡單了,最終返回的是商品ID與商品銷量的二元組。
public static final class MerchandiseSalesAggregateFuncimplements AggregateFunction<SubOrderDetail, Long, Long> {private static final long serialVersionUID = 1L;@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(SubOrderDetail value, Long acc) {return acc + value.getQuantity();}@Overridepublic Long getResult(Long acc) {return acc;}@Overridepublic Long merge(Long acc1, Long acc2) {return acc1 + acc2;}}public static final class MerchandiseSalesWindowFuncimplements WindowFunction<Long, Tuple2<Long, Long>, Tuple, TimeWindow> {private static final long serialVersionUID = 1L;@Overridepublic void apply(Tuple key,TimeWindow window,Iterable<Long> accs,Collector<Tuple2<Long, Long>> out) throws Exception {long merchId = ((Tuple1<Long>) key).f0;long acc = accs.iterator().next();out.collect(new Tuple2<>(merchId, acc));}}既然數據最終都要落到Redis,那么我們完全沒必要在Flink端做Top N的統計,直接利用Redis的有序集合(zset)就行了,商品ID作為field,銷量作為分數值,簡單方便。不過flink-redis-connector項目中默認沒有提供ZINCRBY命令的實現(必須再吐槽一次),我們可以自己加,步驟參照之前寫過的那篇加SETEX的命令的文章,不再贅述。RedisMapper的寫法如下。
public static final class RankingRedisMapper implements RedisMapper<Tuple2<Long, Long>> {private static final long serialVersionUID = 1L;private static final String ZSET_NAME_PREFIX = "RT:DASHBOARD:RANKING:";@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.ZINCRBY, ZSET_NAME_PREFIX);}@Overridepublic String getKeyFromData(Tuple2<Long, Long> data) {return String.valueOf(data.f0);}@Overridepublic String getValueFromData(Tuple2<Long, Long> data) {return String.valueOf(data.f1);}@Overridepublic Optional<String> getAdditionalKey(Tuple2<Long, Long> data) {return Optional.of(ZSET_NAME_PREFIX +new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) + ":" +"MERCHANDISE");}}后端取數時,用ZREVRANGE命令即可取出指定排名的數據了。只要數據規模不是大到難以接受,并且有現成的Redis,這個方案完全可以作為各類Top N需求的通用實現。
The End
大屏的實際呈現需要保密,截圖自然是沒有的。以下是提交執行時Flink Web UI給出的執行計劃(實際有更多的統計任務,不止3個Sink)。通過復用源數據,可以在同一個Flink job內實現更多統計需求。
END
? ? ? ?? #關注架構師的點點滴滴#精彩推薦1.?一文讓你搞懂分布式事務2.?“網紅” WebAssembly 與 K8s 如何實現雙劍合璧?3.?FBI WARNING:架構師不能碰的禁忌 4.Java后端架構之微服務雜談5.?一個全世界最大成人網站的爬蟲 6. 漫畫:互聯網公司黑話防騙指北7.?囚犯學會編程之后會發生什么?8.?山哥面試心經:想進入BAT WSJ其實很簡單總結
以上是生活随笔為你收集整理的阿里技术:基于Kafka+Flink+Redis的电商大屏实时计算案例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JimuReport积木报表 — API
- 下一篇: 3 天撸了个数据中台出来,我飘了~