【原创】StreamInsight查询系列(十九)——查询模式之检测异常
上篇文章介紹了查詢模式中如何發(fā)現(xiàn)趨勢,這篇博文將介紹StreamInsight中如何檢測異常。
測試數(shù)據(jù)準備
為了方便測試查詢,我們首先準備一個靜態(tài)的測試數(shù)據(jù)源:
var now = DateTime.Parse("09/12/2011 8:57:00 PM"); var input = new[] {new { Time = now + TimeSpan.FromSeconds(1), Value = 20},new { Time = now + TimeSpan.FromSeconds(2), Value = 30},new { Time = now + TimeSpan.FromSeconds(3), Value = 120},new { Time = now + TimeSpan.FromSeconds(4), Value = 200},new { Time = now + TimeSpan.FromSeconds(5), Value = 20},new { Time = now + TimeSpan.FromSeconds(6), Value = 110},new { Time = now + TimeSpan.FromSeconds(7), Value = 110},new { Time = now + TimeSpan.FromSeconds(8), Value = 210},new { Time = now + TimeSpan.FromSeconds(9), Value = 120},new { Time = now + TimeSpan.FromSeconds(10), Value = 130},new { Time = now + TimeSpan.FromSeconds(11), Value = 20},new { Time = now + TimeSpan.FromSeconds(12), Value = 30}, };接下去將上述數(shù)據(jù)源轉(zhuǎn)變?yōu)辄c類型復(fù)雜事件流:
var inputStream = input.ToPointStream(Application, t =>PointEvent.CreateInsert(t.Time.ToLocalTime(), new { Value = t.Value }),AdvanceTimeSettings.IncreasingStartTime);異常檢測
問題:怎樣每秒1次的計算過去5秒內(nèi)Value字段值超過閾值100的事件數(shù)超過事件總數(shù)目80%的異常事件?
首先我們定義一下結(jié)果事件流中的負載類型SpikeEvent如下:
struct SpikeEvent {public double Ratio { get; set; } }我們最終希望調(diào)用查詢的方式如下:
int threshold = 100; double ratio = 0.8;var resultStream = DetectSpikes(inputStream,threshold, // 指定閾值(超過該閾值的事件被認為是“特殊事件”)ratio, // “特殊事件”占事件總數(shù)的百分比TimeSpan.FromSeconds(5), // 窗口大小TimeSpan.FromSeconds(1), // 跳躍大小e => e.Value); // 指定的比較字段因此最關(guān)鍵的部分就是如何實現(xiàn)DetectSpikes。閱讀過
《StreamInsight查詢系列(十五)——查詢模式之窗口比率》文章的讀者應(yīng)該對此類查詢并不陌生。
這里不加過多描述地給出DetectSpikes的實現(xiàn):
/// <summary> /// 在輸入流中檢測異常 /// </summary> /// <typeparam name="TInput">輸入流事件類型</typeparam> /// <param name="inputStream">輸入流</param> /// <param name="threshold">異常定義閾值</param> /// <param name="ratio">異常事件占事件總數(shù)的百分比</param> /// <param name="windowSize">衡量事件數(shù)目的窗口大小</param> /// <param name="hopSize">跳躍大小</param> /// <param name="fieldSelector">選擇輸入事件中的某個字段來檢測事件類型</param> /// <returns>query that detects the spikes</returns> private static CepStream<SpikeEvent> DetectSpikes<TInput>(CepStream<TInput> inputStream, int threshold, double ratio,TimeSpan windowSize, TimeSpan hopSize,Expression<Func<TInput, int>> fieldSelector) {// 統(tǒng)計跳躍窗口內(nèi)所有事件的數(shù)目var totalValues = from w in inputStream.HoppingWindow(windowSize,hopSize,HoppingWindowOutputPolicy.ClipToWindowEnd)select new{Count = w.Count(),};// 構(gòu)造包含過濾條件的LINQ語句var parameter = fieldSelector.Parameters.First();var field = fieldSelector.Body;Expression<Func<TInput, bool>> filterExpression = (Expression<Func<TInput, bool>>)Expression.Lambda(Expression.GreaterThan(field, Expression.Constant(threshold)),parameter);// 統(tǒng)計跳躍窗口內(nèi)異常事件的數(shù)目var bigValues = from w in inputStream.Where(filterExpression).HoppingWindow(windowSize,hopSize,HoppingWindowOutputPolicy.ClipToWindowEnd)select new{Count = w.Count(),};// 選擇異常事件數(shù)目超過事件總數(shù)一定百分比的事件var output = from total in totalValues.ToPointEventStream()join big in bigValues.ToPointEventStream()on true equals truewhere big.Count * 1.0 / total.Count >= ratioselect new SpikeEvent { Ratio = big.Count * 1.0 / (total.Count) };return output; }輸出結(jié)果如下:
下一篇將介紹StreamInsight查詢模式中如何檢測間隙事件。
轉(zhuǎn)載于:https://www.cnblogs.com/StreamInsight/archive/2011/09/12/StreamInsight-Query-Series-Part19-Query-Patterns-Detect-Spikes.html
總結(jié)
以上是生活随笔為你收集整理的【原创】StreamInsight查询系列(十九)——查询模式之检测异常的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电影推荐系统kaggle
- 下一篇: 毕业设计 - 题目: 基于协同过滤的电影