mysql 百亿级_ES实现百亿级数据实时分析实战案例
背景
我們小組前段時間接到一個需求,希望能夠按照小時為單位,看到每個實驗中各種特征(單個或組合)的覆蓋率、正樣本占比、負樣本占比。我簡單解釋一下這三種指標的定義:
覆蓋率:所有樣本中出現某一特征的樣本的比例
正樣本占比:所有出現該特征的樣本中,正樣本的比例
負樣本占比:所有出現該特征的樣本中,負樣本的比例
光看這三個指標,大家可能會覺得這個需求很簡單,無非就是一個簡單的篩選、聚合而已。
如果真的這么簡單,我也沒必要寫這篇文章單獨記錄了。問題的關鍵就在于,每小時有將近1億的數據量,而我們需要保存7天的數據,數據總量預計超過了100億。
技術方案
在了解清楚需求后,我們小組馬上對技術方案展開討論,討論過程中出現了3種方案:
第一種:用Spark流式計算,計算每一種可能單個或組合特征的相關指標
第二種:收到客戶端請求后,遍歷HDFS中相關數據,進行離線計算
第三種:將數據按照實驗+小時分索引存入ES,收到客戶端請求后,實時計算返回
首先,第一種方案直接被diss,原因是一個實驗一般會出現幾百、上千個特征,而這些特征的組合何止幾億種,全部計算的話,可行性暫且不論,光是對資源的消耗就無法承受。
第二種方案,雖然技術上是可行的,但離線計算所需時間較長,對用戶來說,體驗并不理想。并且,為了計算目標1%的數據而要遍歷所有數據,對資源也存在很大浪費。
第三種方案,將數據按照實驗+小時分索引后,可以將每個索引包含的數據量降到1000萬以下,再借助ES在查詢、聚合方面高效的能力,應該可以實現秒級響應,并且用戶體驗也會非常好。
技術方案由此確定。
技術架構
1.用Spark從Kafka中接入原始數據,之后對數據進行解析,轉換成我們的目標格式
2.將數據按照實驗+小時分索引存入ES中
3.接受到用戶請求后,將請求按照實驗+特征+小時組合,創建多個異步任務,由這些異步任務并行從ES中過濾并聚合相關數據,得到結果
4.將異步任務的結果進行合并,返回給前端進行展示
代碼實現
異步任務
// 啟動并行任務
final Map>> futures = Maps.newHashMap();
for(String metric : metrics) { // 遍歷要計算的指標
final SampleRatio sampleRatio = getSampleRatio(metric);
for (String exptId : expts) { // 遍歷目標實驗列表
for (String id : features) { // 遍歷要分析的特征
final String name = getMetricsName(exptId, sampleRatio, id);
final List> resultList = Lists.newArrayList();
for (Date hour : coveredHours) { // 將時間按照小時進行拆分
final String fieldName = getFieldName(isFect ? Constants.FACET_COLLECT : Constants.FEATURE_COLLECT, id);
final GetCoverageTask task = new GetCoverageTask(exptId, fieldName, sampleRatio, hour);
// 啟動并行任務
final Future future = TaskExecutor.submit(task);
resultList.add(future);
}
futures.put(name, resultList);
}
}
}
final QueryRes queryRes = new QueryRes();
final Iterator>>> it = futures.entrySet().iterator();
while (it.hasNext()){
// 省略結果處理流程
}
指標計算
// 1\. 對文檔進行聚合運行,分別得到基礎文檔的數量,以及目標文檔數量
final AggregationBuilder[] agg = getAggregationBuilder(sampleRatio, fieldName);
final SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
searchBuilder.aggregation(agg[0]).aggregation(agg[1]).size(0);
// 2\. 得到覆蓋率
final String indexName = getIndexName(exptId, hour);
final Search search = new Search.Builder(searchBuilder.toString())
.addIndex(indexName).addType(getType()).build();
final SearchResult result = jestClient.execute(search);
if(result.getResponseCode() != HttpUtils.STATUS_CODE_200){
// 請求出錯
log.warn(result.getErrorMessage());
return 0f;
}
final MetricAggregation aggregations = result.getAggregations();
// 3\. 解析結果
final long dividend ;
if(SampleRatio.ALL == sampleRatio){
dividend = aggregations.getValueCountAggregation(Constants.DIVIDEND).getValueCount();
}else {
dividend = aggregations.getFilterAggregation(Constants.DIVIDEND).getCount();
}
// 防止出現被除數為0時程序異常
if(dividend <= 0){
return 0f;
}
long divisor = aggregations.getFilterAggregation(Constants.DIVISOR).getCount();
return divisor / (float)dividend;
聚合
int label = 0;
final ExistsQueryBuilder existsQuery = QueryBuilders.existsQuery(fieldName);
// 包含指定特征的正樣本數量
final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
final List must = boolQuery.must();
// 計算樣本數量
TermQueryBuilder labelQuery = null;
if(SampleRatio.POSITIVE == sampleRatio) {
// 計算正樣本數量
label = 1;
labelQuery = QueryBuilders.termQuery(Constants.LABEL, label);
must.add(labelQuery);
}else if(SampleRatio.NEGATIVE == sampleRatio) {
// 計算負樣本數量
labelQuery = QueryBuilders.termQuery(Constants.LABEL, label);
must.add(labelQuery);
}
must.add(existsQuery);
final ValueCountAggregationBuilder existsCountAgg = AggregationBuilders.count(sampleRatio.getField());
existsCountAgg.field(fieldName);
final FilterAggregationBuilder filterAgg = AggregationBuilders.filter(aggName, boolQuery);
filterAgg.subAggregation(existsCountAgg);
return filterAgg;
上線效果
上線后表現完全滿足預期,平均請求耗時在3秒左右,用戶體驗良好。感謝各位小伙伴的辛苦付出~~
下圖是ES中部分索引的信息:
總結
以上是生活随笔為你收集整理的mysql 百亿级_ES实现百亿级数据实时分析实战案例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: amoled led 排列_AMOLED
- 下一篇: STC8F2K08S2