利用blink CEP实现流计算中的超时统计问题
案例與解決方案匯總頁:
阿里云實時計算產品案例&解決方案匯總
一. 背景介紹
如<利用blink+MQ實現流計算中的延時統計問題>一文中所描述的場景,我們將其簡化為以下案例:
實時流的數據源結構如下:
| LP1 | 2018-08-01 08:00 | ? | ? |
| LP1 | 2018-08-01 08:00 | 2018-08-01 09:00 | ? |
| LP2 | 2018-08-01 09:10 | ? | ? |
| LP2 | 2018-08-01 09:10 | 2018-08-01 09:50 | ? |
| LP2 | 2018-08-01 09:10 | 2018-08-01 09:50 | ?2018-08-01 12:00 |
我們期望通過以上數據源,按照支付日期統計,每個倉庫的倉接單量、倉出庫量、倉接單超2H未出庫單量、倉接單超6H未出庫單量。可以看出,其中LP1倉接單時間是2018-08-01 09:00,但一直到2018-08-01 12:00點之前,一直都沒有出庫,LP1滿足倉接單超2H未出庫的行為。
該場景的難點就在于:訂單未出庫。而對于TT中的源消息流,訂單未出庫,TT就不會下發新的消息,不下發新的消息,blink就無法被觸發計算。而針對上述的場景,對于LP1,我們需要在倉接單時間是2018-08-01 09:00+2H,也就是2018-08-01 11:00的之后,就要知道LP1已經倉接單但超2H未出庫了。
二. 解決方案
本文主要是利用blink CEP來實現上述場景,具體實現步驟如下所述。
第一步:在source DDL中定義event_timestamp,并定義sink,如下:
第二步:根據blink CEP的標準語義進行改寫,如下:
create view blink_cep_v1 as select '倉接單-倉出庫超時' as timeout_type,lg_order_code,wms_create_time as start_time,wms_consign_create_time as end_time from source_dwd_csn_whc_lgt_fl_ord_ri MATCH_RECOGNIZE (PARTITION BY lg_order_codeORDER BY evtstampMEASURESe1.wms_create_time as wms_create_time,e2.wms_consign_create_time as wms_consign_create_timeONE ROW PER MATCH WITH TIMEOUT ROWS --重要,必須設置延遲也下發AFTER MATCH SKIP TO NEXT ROWPATTERN (e1 -> e2) WITHIN INTERVAL '6' HOUREMIT TIMEOUT (INTERVAL '2' HOUR, INTERVAL '6' HOUR)DEFINEe1 as e1.wms_create_time is not null and e1.wms_consign_create_time is null,e2 as e2.wms_create_time is not null and e2.wms_consign_create_time is not null ) where wms_create_time is not null --重要,可以大大減少進入CEP的消息量 and wms_consign_create_time is null --重要,可以大大減少進入CEP的消息量 ;第三步:根據blink的執行機制,我們通過源實時流sourcett_dwd_ri與超時消息流blink_cep_v1關聯,來觸發blink對超時消息進行聚合操作,如下:
create view blink_cep_v2 as select a.lg_order_code as lg_order_code,last_value(a.store_code ) as store_code,last_value(a.store_name ) as store_name,last_value(a.ded_pay_time ) as ded_pay_time,last_value(a.wms_create_time ) as wms_create_time,last_value(a.real_wms_confirm_time ) as real_wms_confirm_time,last_value(case when coalesce(a.wms_create_time, '') <> ''and coalesce(a.real_wms_confirm_time, '') = '' and now() - unix_timestamp(a.wms_create_time,'yyyy-MM-dd HH:mm:ss') >= 7200then 'Y' else 'N' end) as flag_01,last_value(case when coalesce(a.wms_create_time, '') <> ''and coalesce(a.real_wms_confirm_time, '') = '' and now() - unix_timestamp(a.wms_create_time,'yyyy-MM-dd HH:mm:ss') >= 21600then 'Y' else 'N' end) as flag_02 from(select lg_order_code as lg_order_code,last_value(store_code ) as store_code,last_value(store_name ) as store_name,last_value(ded_pay_time ) as ded_pay_time,last_value(wms_create_time ) as wms_create_time,last_value(wms_consign_create_time) as real_wms_confirm_timefrom sourcett_dwd_rigroup by lg_order_code) a left outer join(select lg_order_code,count(*) as cntfrom blink_cep_v1group by lg_order_code) b on a.lg_order_code = b.lg_order_code group by a.lg_order_code ;insert into sink_hybrid_blink_cep select regexp_replace(substring(a.ded_pay_time, 1, 10), '-', '') as ded_pay_date,a.store_code,max(a.store_name) as store_name,count(case when coalesce(a.wms_create_time, '') <> '' then a.lg_order_code end) as wmsin_ord_cnt,count(case when coalesce(a.real_wms_confirm_time, '') <> '' then a.lg_order_code end) as wmsout_ord_cnt,count(case when a.flag_01 = 'Y' then a.lg_order_code end) as wmsin_nowmsout_2h_ord_cnt,count(case when a.flag_02 = 'Y' then a.lg_order_code end) as wmsin_nowmsout_6h_ord_cnt,cast(regexp_replace(SUBSTRING(ded_pay_time, 1, 10), '-', '') as bigint) as sub_partition from blink_cep_v2 as t1 where coalesce(lg_cancel_time, '') = '' and coalesce(ded_pay_time, '') <> '' group by regexp_replace(substring(ded_pay_time, 1, 10), '-', ''),a.store_code ;三. 問題拓展
風控案例測試數據如下:
| 2018-04-13 12:00:00 | 1 | WW |
| 2018-04-13 12:05:00 | 1 | WW1 |
| 2018-04-13 12:10:00 | 1 | WW2 |
| 2018-04-13 12:20:00 | 1 | WW |
我們認為,當一張銀行卡在10min之內,在不同的地點被刷卡大于等于兩次,我們就期望對消費者出發預警機制。
四. 作者簡介
花名:緣橋,來自菜鳥-CTO-數據部-倉配數據研發,主要負責菜鳥倉配業務的離線和實時數據倉庫建設以及創新數據技術和工具的探索和應用。
?
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的利用blink CEP实现流计算中的超时统计问题的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 到底什么成就了今天的人工智能?(下)
- 下一篇: IDE 插件新版本发布,开发效率 “bi