对es搜索使用fork/join优化搜索
需求:查詢某個日期區間關于吉利相關輿情
es索引庫有12個,吉利相關詞有50+個
查詢12個es索引庫,查詢出日期區間的結果 且 滿足這50+詞中其中一個詞則將對應的文章查詢出來
?? ?1個索引 ?
?? ??? ?查詢?
?? ??? ??? ?一個月區間日期 2019-10-01 2019-10-31
?? ??? ??? ?多個詞(吉利、帝豪....)包含這50個詞的都查出來
?? ?思路:按索引、日期、詞維度來拆分
?? ?如:按5天時間 和 10個詞 一組去es查詢返回,將日期區間縮短和查詢的詞量減少
?? ?
?? ?5天日期區間查詢 + 10個詞,
?? ?第一組日期分組
?? ??? ?2019-10-01、2019-10-02、2019-10-03、2019-10-04、2019-10-05
?? ??? ?吉利、帝豪、Geely、博越、帝豪GL、帝豪GS、帝豪RS、帝豪吉利、帝豪汽車、遠景X1
?? ??? ?2019-10-01、2019-10-02、2019-10-03、2019-10-04、2019-10-05
?? ??? ?遠景SUV、遠景X3、吉利自由艦、吉利熊貓、吉利金剛、吉利博瑞、吉利新博瑞、吉利英倫、英倫C5、英倫兩廂
?? ??? ?2019-10-01、2019-10-02、2019-10-03、2019-10-04、2019-10-05
?? ??? ?吉利海景.......
?? ??? ?2019-10-01、2019-10-02、2019-10-03、2019-10-04、2019-10-05
?? ??? ?新美日汽車......
?? ??? ?2019-10-01、2019-10-02、2019-10-03、2019-10-04、2019-10-05
?? ??? ?吉利新美日.....
?? ?第二組日期分組
?? ??? ?2019-10-06、2019-10-07、2019-10-08、2019-10-09、2019-10-10
?? ??? ?吉利、帝豪、Geely、博越、帝豪GL、帝豪GS、帝豪RS、帝豪吉利、帝豪汽車、遠景X1
?? ??? ?2019-10-06、2019-10-07、2019-10-08、2019-10-09、2019-10-10
?? ??? ?遠景SUV、遠景X3、吉利自由艦、吉利熊貓、吉利金剛、吉利博瑞、吉利新博瑞、吉利英倫、英倫C5、英倫兩廂
?? ??? ?2019-10-06、2019-10-07、2019-10-08、2019-10-09、2019-10-10
?? ??? ?吉利海景.......
?? ??? ?2019-10-06、2019-10-07、2019-10-08、2019-10-09、2019-10-10
?? ??? ?新美日汽車......
?? ??? ?2019-10-06、2019-10-07、2019-10-08、2019-10-09、2019-10-10
?? ??? ?吉利新美日.....
?? ?第三組日期分組.......
?? ??? ?......
?? ??? ?......
?? ?第四組日期分組.......
?? ??? ?......
?? ??? ?......
?? ?第五組日期分組.......
?? ??? ?......
?? ??? ?......
?? ?(30/5)(日期組) * 5(詞組) * 12(索引) = 360(一個月區組成360次查詢es)
?? ?
使用fork/join 對這360條數據做分解,以30個為基準提交搜索到es,360/30=12,需要查詢12次es,如果每一次需要花費2秒,最終執行時間不會超過3秒。?
本地驗證時,發現使用fork/join比單次慢,原因是因為本地cpu大多都是保持在60%使用,fork/join是cpu密集型,當cpu被暫滿時就會出現線程之間的競爭等待,
所以移到支持cpu密集型的服務器測試,是會比較快。?
未優化前
優化后
public BucketHit getBrand(SentimentParam param, QueryBuilder qb, String... fieldNames) {List<String> dateTimes = resolveDate(param.getStartDate(),param.getEndDate());String startDate = param.getStartDate();//搜索條件list,根據日期分解搜索次數List<SearchSourceBuilder> searchBuilders = Lists.newArrayListWithExpectedSize(dateTimes.size() * 4);for (String date : dateTimes){//每一個查詢 日期間隔5天 且 查詢 10個詞searchBuilders.add(getSearchBuilder(startDate,date,TitleTemplateUtil.geelyOrTitle().subList(0,10),param));searchBuilders.add(getSearchBuilder(startDate,date,TitleTemplateUtil.geelyOrTitle().subList(10,20),param));searchBuilders.add(getSearchBuilder(startDate,date,TitleTemplateUtil.geelyOrTitle().subList(20,30),param));searchBuilders.add(getSearchBuilder(startDate,date,TitleTemplateUtil.geelyOrTitle().subList(30,40),param));searchBuilders.add(getSearchBuilder(startDate,date,TitleTemplateUtil.geelyOrTitle().subList(40,TitleTemplateUtil.geelyOrs.size()),param));startDate = date;}/*循環索引列表,每一個索引都對應 searchBuilders.size()次 搜索如 索引 12 個, 時間區間 36 條總共查詢次數 12 * 36*/List<SearchRequest> srs = Lists.newArrayList();for (SentimentIndexEnum index : SentimentIndexEnum.values()) {for (SearchSourceBuilder ssb : searchBuilders) {SearchRequest searchRequest = new SearchRequest(index.getIndex());searchRequest.source(ssb);srs.add(searchRequest);}}ForkJoinPool forkJoinPool = new ForkJoinPool(20);long st3 = System.currentTimeMillis(); //獲取開始時間//使用forkJoin分解任務List<SearchResponse> responses = forkJoinPool.invoke(new IndexForkRecursiveTask(srs));//同步調用/*MultiSearchRequest request = new MultiSearchRequest();for (SearchRequest sr : srs) {request.add(sr);}List<SearchResponse> responses = getMultiSearchResponse(request);*/forkJoinPool.shutdown();long et3 = System.currentTimeMillis(); //獲取結束時間LOGGER.info("-------總耗時--------"+Thread.currentThread().getName() + "-耗時:" + (st3 - et3) + "ms");BucketHit bucketHit = doGetResponse(responses, fieldNames);return bucketHit;}/*** 分解日期,按 5天維度* 如 sd=2019-10-01,ed=2019-10-31* 返回 [2019-10-06, 2019-10-11, 2019-10-16, 2019-10-21, 2019-10-26, 2019-10-31]* @param sd* @param ed* @return*/private static List<String> resolveDate(String sd, String ed ) {Set<String> dateTimes = Sets.newHashSet();try {Date afterFiveDate = DateUtil.DAY.parse(sd);while (afterFiveDate.before(DateUtil.DAY.parse(ed))){afterFiveDate = DateUtil.getFetureDate(afterFiveDate,5);dateTimes.add(DateUtil.DAY.format(afterFiveDate));}dateTimes.add(ed);return dateTimes.stream().sorted().collect(Collectors.toList());}catch (Exception e){}return null;} public class IndexForkRecursiveTask extends RecursiveTask<List<SearchResponse>> {private static final Logger LOGGER = LoggerFactory.getLogger(IndexForkRecursiveTask.class);private int num = 20;private List<SearchRequest> searchRequests;private static EsClient esClient;List<IndexForkRecursiveTask> tasks;public IndexForkRecursiveTask(List<SearchRequest> searchRequests) {this.searchRequests = searchRequests;}@Overrideprotected List<SearchResponse> compute() {tasks = Lists.newArrayList();if (searchRequests.size() <= num) {return responses(searchRequests);}// 將搜索條件分組,將大任務拆分小任務List<List<SearchRequest>> ssbs = getList();for (List<SearchRequest> l : ssbs) {IndexForkRecursiveTask frt = new IndexForkRecursiveTask(l);tasks.add(frt);}/* 執行所有任務 并匯總結果*/invokeAll(tasks);List<SearchResponse> responses = Lists.newCopyOnWriteArrayList();for (IndexForkRecursiveTask fr : tasks) {responses.addAll(fr.join());}return responses;}private List<SearchResponse> responses(List<SearchRequest> srs) {long st3 = System.currentTimeMillis(); //獲取開始時間MultiSearchRequest request = new MultiSearchRequest();for (SearchRequest sr : srs) {request.add(sr);}List<SearchResponse> responses = Lists.newArrayList();MultiSearchResponse sr;RestHighLevelClient client = getEsClient().getRhlClient();try {LOGGER.info("client =========="+client);sr = client.multiSearch(request);for (MultiSearchResponse.Item item : sr.getResponses()) {SearchResponse response = item.getResponse();responses.add(response);}} catch (Exception e) {LOGGER.error("IndexRecursiveTask compute error ", e);} finally {esClient.close(client);}long et3 = System.currentTimeMillis(); //獲取結束時間LOGGER.info(Thread.currentThread().getName() + "-耗時:" + (st3 - et3) + "ms");return responses;}//將多個查詢拆小,每組num條private List<List<SearchRequest>> getList() {List<List<SearchRequest>> ret = Lists.newArrayList();int size = searchRequests.size() - 1;if (size <= num) {ret.add(searchRequests);return ret;}int start = 0;int end = num;for (int i = start; i < end; ) {if (end <= size) {ret.add(searchRequests.subList(start, end));start = end;end = end + num;} else {ret.add(searchRequests.subList(start, size));break;}}return ret;}private EsClient getEsClient() {if (esClient == null) {esClient = (EsClient) SpringBeanUtils.getApplicationContext().getBean("esClient");}return esClient;} }
使用fork/join問題:
如果分解出A、B、C線程,A先執行完,去竊取C線程的任務,A先執行花費1ms、B、C執行也花費1ms,但是A先執行完去竊取C任務導致A現在多負擔了一個任務,最后A執行花費2ms,
需要每個任務對應一個線程解決這問題?但是fork/join是不是失去意義。”任務太少拆解問題“
如果不分解則會查詢出4秒。
總結
以上是生活随笔為你收集整理的对es搜索使用fork/join优化搜索的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 有效解绑:TeamViewer使用的设备
- 下一篇: Chrome浏览器插件开发-淘宝自动登录