Lucene学习总结之五:Lucene段合并(merge)过程分析
一、段合并過程總論
IndexWriter中與段合并有關的成員變量有:
- HashSet<SegmentInfo> mergingSegments = new HashSet<SegmentInfo>(); //保存正在合并的段,以防止合并期間再次選中被合并。
- MergePolicy mergePolicy = new LogByteSizeMergePolicy(this);//合并策略,也即選取哪些段來進行合并。
- MergeScheduler mergeScheduler = new ConcurrentMergeScheduler();//段合并器,背后有一個線程負責合并。
- LinkedList<MergePolicy.OneMerge> pendingMerges = new LinkedList<MergePolicy.OneMerge>();//等待被合并的任務
- Set<MergePolicy.OneMerge> runningMerges = new HashSet<MergePolicy.OneMerge>();//正在被合并的任務
和段合并有關的一些參數有:
- mergeFactor:當大小幾乎相當的段的數量達到此值的時候,開始合并。
- minMergeSize:所有大小小于此值的段,都被認為是大小幾乎相當,一同參與合并。
- maxMergeSize:當一個段的大小大于此值的時候,就不再參與合并。
- maxMergeDocs:當一個段包含的文檔數大于此值的時候,就不再參與合并。
段合并一般發生在添加完一篇文檔的時候,當一篇文檔添加完后,發現內存已經達到用戶設定的ramBufferSize,則寫入文件系統,形成一個新的段。新段的加入可能造成差不多大小的段的個數達到mergeFactor,從而開始了合并的過程。
合并過程最重要的是兩部分:
- 一個是選擇哪些段應該參與合并,這一步由MergePolicy來決定。
- 一個是將選擇出的段合并成新段的過程,這一步由MergeScheduler來執行。段的合并也主要包括:
- 對正向信息的合并,如存儲域,詞向量,標準化因子等。
- 對反向信息的合并,如詞典,倒排表。
在總論中,我們重點描述合并策略對段的選擇以及反向信息的合并。
1.1、合并策略對段的選擇
在LogMergePolicy中,選擇可以合并的段的基本邏輯是這樣的:
- 選擇的可以合并的段都是在硬盤上的,不再存在內存中的段,也不是像早期的版本一樣每添加一個Document就生成一個段,然后進行內存中的段合并,然后再合并到硬盤中。
- 由于從內存中flush到硬盤上是按照設置的內存大小來DocumentsWriter.ramBufferSize觸發的,所以每個剛flush到硬盤上的段大小差不多,當然不排除中途改變內存設置,接下來的算法可以解決這個問題。
- 合并的過程是盡量按照合并幾乎相同大小的段這一原則,只有大小相當的mergeFacetor個段出現的時候,才合并成一個新的段。
- 在硬盤上的段基本應該是大段在前,小段在后,因為大段總是由小段合并而成的,當小段湊夠mergeFactor個的時候,就合并成一個大段,小段就被刪除了,然后新來的一定是新的小段。
- 比如mergeFactor=3,開始來的段大小為10M,當湊夠3個10M的時候,0.cfs, 1.cfs, 2.cfs則合并成一個新的段3.cfs,大小為30M,然后再來4.cfs, 5.cfs, 6.cfs,合并成7.cfs,大小為30M,然后再來8.cfs, 9.cfs, a.cfs合并成b.cfs, 大小為30M,這時候又湊夠了3個30M的,合并成90M的c.cfs,然后又來d.cfs, e.cfs, f.cfs合并成10.cfs,大小為30M,然后11.cfs大小為10M,這時候硬盤上的段為:c.cfs(90M) 10.cfs(30M),11.cfs(10M)。
所以LogMergePolicy對合并段的選擇過程如下:
- 將所有的段按照生成的順序,將段的大小以mergeFactor為底取對數,放入數組中,作為選擇的標準。
- 從頭開始,選擇一個值最大的段,然后將此段的值減去0.75(LEVEL_LOG_SPAN) ,之間的段被認為是大小差不多的段,屬于同一階梯,此處稱為第一階梯。
- 然后從后向前尋找第一個屬于第一階梯的段,從start到此段之間的段都被認為是屬于這一階梯的。也包括之間生成較早但大小較小的段,因為考慮到以下幾點:
- 防止較早生成的段由于人工flush或者人工調整ramBufferSize,因而很小,卻破壞了基本從大到小的規則。
- 如果運行較長時間后,致使段的大小參差不齊,很難合并相同大小的段。
- 也防止一個段由于較小,而不斷的都有大的段生成從而始終不能參與合并。
- 第一階梯總共4個段,小于mergeFactor因而不合并,接著start=end從而選擇下一階梯。
- 從start開始,選擇一個值最大的段,然后將此段的值減去0.75(LEVEL_LOG_SPAN) ,之間的段被認為屬于同一階梯,此處稱為第二階梯。
- 然后從后向前尋找第一個屬于第二階梯的段,從start到此段之間的段都被認為是屬于這一階梯的。
- 第二階梯總共4個段,小于mergeFactor因而不合并,接著start=end從而選擇下一階梯。
?
- 從start開始,選擇一個值最大的段,然后將此段的值減去0.75(LEVEL_LOG_SPAN) ,之間的段被認為屬于同一階梯,此處稱為第三階梯。
- 由于最大的段減去0.75后為負的,因而從start到此段之間的段都被認為是屬于這一階梯的。
- 第三階梯總共5個段,等于mergeFactor,因而進行合并。
?
- 第三階梯的五個段合并成一個較大的段。
- 然后從頭開始,依然先考察第一階梯,仍然是4個段,不合并。
- 然后是第二階梯,因為有了新生成的段,并且大小足夠屬于第二階梯,從而第二階梯有5個段,可以合并。
- 第二階段的五個段合并成一個較大的段。
- 然后從頭開始,考察第一階梯,因為有了新生成的段,并且大小足夠屬于第一階梯,從而第一階梯有5個段,可以合并。
- 第一階梯的五個段合并成一個大的段。
?
1.2、反向信息的合并
反向信息的合并包括兩部分:
- 對字典的合并,詞典中的Term是按照字典順序排序的,需要對詞典中的Term進行重新排序
- 對于相同的Term,對包含此Term的文檔號列表進行合并,需要對文檔號重新編號。
對詞典的合并需要找出兩個段中相同的詞,Lucene是通過一個稱為match的SegmentMergeInfo類型的數組以及稱為queue的 SegmentMergeQueue實現的,SegmentMergeQueue是繼承于 PriorityQueue<SegmentMergeInfo>,是一個優先級隊列,是按照字典順序排序的。 SegmentMergeInfo保存要合并的段的詞典及倒排表信息,在SegmentMergeQueue中用來排序的key是它代表的段中的第一個 Term。
我們來舉一個例子來說明合并詞典的過程,以便后面解析代碼的時候能夠很好的理解:
- 假設要合并五個段,每個段包含的Term也是按照字典順序排序的,如下圖所示。
- 首先把五個段全部放入優先級隊列中,段在其中也是按照第一個Term的字典順序排序的,如下圖。
- 從優先級隊列中彈出第一個Term("a")相同的段到match數組中,如下圖。
- 合并這些段的第一個Term("a")的倒排表,并把此Term和它的倒排表一同加入新生成的段中。
- 對于match數組中的每個段取下一個Term
?
- 將match數組中還有Term的段重新放入優先級隊列中,這些段也是按照第一個Term的字典順序排序。
?
- 從優先級隊列中彈出第一個Term("b")相同的段到match數組中。
- 合并這些段的第一個Term("b")的倒排表,并把此Term和它的倒排表一同加入新生成的段中。
- 對于match數組中的每個段取下一個Term
?
- 將match數組中還有Term的段重新放入優先級隊列中,這些段也是按照第一個Term的字典順序排序。
?
- 從優先級隊列中彈出第一個Term("c")相同的段到match數組中。
- 合并這些段的第一個Term("c")的倒排表,并把此Term和它的倒排表一同加入新生成的段中。
- 對于match數組中的每個段取下一個Term
- 將match數組中還有Term的段重新放入優先級隊列中,這些段也是按照第一個Term的字典順序排序。
?
- 從優先級隊列中彈出第一個Term("d")相同的段到match數組中。
- 合并這些段的第一個Term("d")的倒排表,并把此Term和它的倒排表一同加入新生成的段中。
- 對于match數組中的每個段取下一個Term
- 將match數組中還有Term的段重新放入優先級隊列中,這些段也是按照第一個Term的字典順序排序。
?
- 從優先級隊列中彈出第一個Term("e")相同的段到match數組中。
- 合并這些段的第一個Term("e")的倒排表,并把此Term和它的倒排表一同加入新生成的段中。
- 對于match數組中的每個段取下一個Term
- 將match數組中還有Term的段重新放入優先級隊列中,這些段也是按照第一個Term的字典順序排序。
- 從優先級隊列中彈出第一個Term("f")相同的段到match數組中。
- 合并這些段的第一個Term("f")的倒排表,并把此Term和它的倒排表一同加入新生成的段中。
- 對于match數組中的每個段取下一個Term
?
- 合并完畢。
?
二、段合并的詳細過程
2.1、將緩存寫入新的段
IndexWriter在添加文檔的時候調用函數addDocument(Document doc, Analyzer analyzer),包含如下步驟:
- doFlush = docWriter.addDocument(doc, analyzer);//DocumentsWriter添加文檔,最后返回是否進行向硬盤寫入
- return state.doFlushAfter || timeToFlushDeletes();//這取決于timeToFlushDeletes
timeToFlushDeletes返回return (bufferIsFull || deletesFull()) && setFlushPending(),而在Lucene索引過程分析(2)的DocumentsWriter的緩存管理部分提到,當numBytesUsed+deletesRAMUsed > ramBufferSize的時候bufferIsFull設為true,也即當使用的內存大于ramBufferSize的時候,則由內存向硬盤寫入。ramBufferSize可以用IndexWriter.setRAMBufferSizeMB(double mb)設定。
- if (doFlush) flush(true, false, false);//如果內存中緩存滿了,則寫入硬盤
- if (doFlush(flushDocStores, flushDeletes) && triggerMerge)? maybeMerge();//doFlush將緩存寫入硬盤,此過程在Lucene索引過程分析(4)中關閉IndexWriter一節已經描述。
當緩存寫入硬盤,形成了新的段后,就有可能觸發一次段合并,所以調用maybeMerge()
| IndexWriter.maybeMerge() --> maybeMerge(false); ????? --> maybeMerge(1, optimize); ??????????? --> updatePendingMerges(maxNumSegmentsOptimize, optimize); ??????????? --> mergeScheduler.merge(this); |
IndexWriter.updatePendingMerges(int maxNumSegmentsOptimize, boolean optimize)主要負責找到可以合并的段,并生產段合并任務對象,并向段合并器注冊這個任務。
ConcurrentMergeScheduler.merge(IndexWriter)主要負責進行段的合并。
2.2、選擇合并段,生成合并任務
IndexWriter.updatePendingMerges(int maxNumSegmentsOptimize, boolean optimize)主要包括兩部分:
- 選擇能夠合并段:MergePolicy.MergeSpecification spec = mergePolicy.findMerges(segmentInfos);
- 向段合并器注冊合并任務,將任務加到pendingMerges中:
- for(int i=0;i<spec.merges.size();i++)
- registerMerge(spec.merges.get(i));
- for(int i=0;i<spec.merges.size();i++)
2.2.1、用合并策略選擇合并段
默認的段合并策略是LogByteSizeMergePolicy,其選擇合并段由LogMergePolicy.findMerges(SegmentInfos infos) 完成,包含以下過程:
(1) 生成levels數組,每個段一項。然后根據每個段的大小,計算每個項的值,levels[i]和段的大小的關系為Math.log(size)/Math.log(mergeFactor),代碼如下:
| ? final int numSegments = infos.size(); ? float[] levels = new float[numSegments]; ? final float norm = (float) Math.log(mergeFactor); ? for(int i=0;i<numSegments;i++) { ??? final SegmentInfo info = infos.info(i); ??? long size = size(info); ??? levels[i] = (float) Math.log(size)/norm; ? } |
(2) 由于段基本是按照由大到小排列的,而且合并段應該大小差不多的段中進行。我們把大小差不多的段稱為屬于同一階梯,因而此處從第一個段開始找屬于相同階梯的段,如果屬于此階梯的段數量達到mergeFactor個,則生成合并任務,否則繼續向后尋找下一階梯。
| ??//計算最低階梯值,所有小于此值的都屬于最低階梯 ? final float levelFloor = (float) (Math.log(minMergeSize)/norm); ? MergeSpecification spec = null; ? int start = 0; ? while(start < numSegments) { ????//找到levels數組的最大值,也即當前階梯中的峰值 ??? float maxLevel = levels[start]; ??? for(int i=1+start;i<numSegments;i++) { ????? final float level = levels[i]; ????? if (level > maxLevel) ??????? maxLevel = level; ??? } ??? //計算出此階梯的谷值,也即最大值減去0.75,之間的都屬于此階梯。如果峰值小于最低階梯值,則所有此階梯的段都屬于最低階梯。如果峰值大于最低階梯值,谷值小于最低階梯值,則設置谷值為最低階梯值,以保證所有小于最低階梯值的段都屬于最低階梯。 ??? float levelBottom; ??? if (maxLevel < levelFloor) ??????? levelBottom = -1.0F; ??? else { ??????? levelBottom = (float) (maxLevel - LEVEL_LOG_SPAN); ??????? if (levelBottom < levelFloor && maxLevel >= levelFloor) ??????????? levelBottom = levelFloor; ??? } ??? float levelBottom = (float) (maxLevel - LEVEL_LOG_SPAN); ??? //從最后一個段向左找,當然段越來越大,找到第一個大于此階梯的谷值的段,從start的段開始,一直到upto這個段,都屬于此階梯了。盡管upto 左面也有的段由于內存設置原因,雖形成較早,但是沒有足夠大,也作為可合并的一員考慮在內了,將被并入一個大的段,從而保證了基本上左大右小的關系。從 upto這個段向右都是比此階梯小的多的段,應該屬于下一階梯。 ??? int upto = numSegments-1; ??? while(upto >= start) { ????? if (levels[upto] >= levelBottom) { ??????? break; ????? } ????? upto--; ??? } ???//從start段開始,數mergeFactor個段,如果不超過upto段,說明此階梯已經足夠mergeFactor個了,可以合 并了。當然如果此階梯包含太多要合并的段,也是每mergeFactor個段進行一次合并,然后再依次數mergeFactor段進行合并,直到此階梯的 段合并完畢。 ??? int end = start + mergeFactor; ??? while(end <= 1+upto) { ????? boolean anyTooLarge = false; ????? for(int i=start;i<end;i++) { ??????? final SegmentInfo info = infos.info(i); ????????//如果一個段的大小超過maxMergeSize或者一個段包含的文檔數量超過maxMergeDocs則不再合并。 ??????? anyTooLarge |= (size(info) >= maxMergeSize || sizeDocs(info) >= maxMergeDocs); ????? } ????? if (!anyTooLarge) { ??????? if (spec == null) ????????? spec = new MergeSpecification(); ????????//如果確認要合并,則從start到end生成一個段合并任務OneMerge. ??????? spec.add(new OneMerge(infos.range(start, end), useCompoundFile)); ????? } ??????//剛剛合并的是從start到end共mergeFactor和段,此階梯還有更多的段,則再依次數mergeFactor個段。 ????? start = end; ????? end = start + mergeFactor; ??? } ????//從start到upto是此階梯的所有的段,已經選擇完畢,下面選擇更小的下一個階梯的段 ??? start = 1+upto; ? } |
選擇的結果保存在MergeSpecification中,結構如下:
| spec??? MergePolicy$MergeSpecification? (id=25)???? |
2.2.2、注冊段合并任務
注冊段合并任務由IndexWriter.registerMerge(MergePolicy.OneMerge merge)完成:
(1) 如果選擇出的段正在被合并,或者不存在,則退出。
| final int count = merge.segments.size(); boolean isExternal = false; for(int i=0;i<count;i++) { ? final SegmentInfo info = merge.segments.info(i); ? if (mergingSegments.contains(info)) ??? return false; ? if (segmentInfos.indexOf(info) == -1) ??? return false; ? if (info.dir != directory) ??? isExternal = true; } |
(2) 將合并任務加入pendingMerges:pendingMerges.add(merge);
(3) 將要合并的段放入mergingSegments以防正在合并又被選為合并段。
| for(int i=0;i<count;i++)? |
2.3、段合并器進行段合并
段合并器默認為ConcurrentMergeScheduler,段的合并工作由ConcurrentMergeScheduler.merge(IndexWriter) 完成,它包含while(true)的循環,在循環中不斷做以下事情:
- 得到下一個合并任務:MergePolicy.OneMerge merge = writer.getNextMerge();
- 初始化合并任務:writer.mergeInit(merge);
- 將刪除文檔寫入硬盤:applyDeletes();
- 是否合并存儲域:mergeDocStores = false。按照Lucene的索引文件格式(2)中段的元數據信息(segments_N)中提到 的,IndexWriter.flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes)中第二個參數flushDocStores會影響到是否單獨或是共享存儲。其實最終影響的是 DocumentsWriter.closeDocStore()。每當flushDocStores為false時,closeDocStore不被調 用,說明下次添加到索引文件中的域和詞向量信息是同此次共享一個段的。直到flushDocStores為true的時候,closeDocStore被 調用,從而下次添加到索引文件中的域和詞向量信息將被保存在一個新的段中,不同此次共享一個段。如2.1節中說的那樣,在addDocument中,如果 內存中緩存滿了,則寫入硬盤,調用的是flush(true, false, false),也即所有的存儲域都存儲在共享的域中(_0.fdt),因而不需要合并存儲域。
- 生成新的段:merge.info = new SegmentInfo(newSegmentName(),…)
- 將新的段加入mergingSegments
- 如果已經有足夠多的段合并線程,則等待while (mergeThreadCount() >= maxThreadCount) wait();
- 生成新的段合并線程:
- merger = getMergeThread(writer, merge);
- mergeThreads.add(merger);
- 啟動段合并線程:merger.start();
段合并線程的類型為MergeThread,MergeThread.run()包含while(truy)循環,在循環中做以下事情:
- 合并當前的任務:doMerge(merge);
- 得到下一個段合并任務:merge = writer.getNextMerge();
ConcurrentMergeScheduler.doMerge(OneMerge) 最終調用IndexWriter.merge(OneMerge) ,主要做以下事情:
- 初始化合并任務:mergeInit(merge);
- 進行合并:mergeMiddle(merge);
- 完成合并任務:mergeFinish(merge);
- 從mergingSegments中移除被合并的段和合并新生成的段:
- for(int i=0;i<end;i++) mergingSegments.remove(sourceSegments.info(i));
- mergingSegments.remove(merge.info);
- 從runningMerges中移除此合并任務:runningMerges.remove(merge);
- 從mergingSegments中移除被合并的段和合并新生成的段:
IndexWriter.mergeMiddle(OneMerge)主要做以下幾件事情:
- 生成用于合并段的對象SegmentMerger merger = new SegmentMerger(this, mergedName, merge);
- 打開Reader指向要合并的段:
| merge.readers = new SegmentReader[numSegments]; merge.readersClone = new SegmentReader[numSegments]; for (int i = 0; i < numSegments; i++) { ??? final SegmentInfo info = sourceSegments.info(i); ?? // Hold onto the "live" reader; we will use this to ?? // commit merged deletes ?? SegmentReader reader = merge.readers[i] = readerPool.get(info, merge.mergeDocStores,MERGE_READ_BUFFER_SIZE,-1); ??? // We clone the segment readers because other ??? // deletes may come in while we're merging so we ??? // need readers that will not change ??? SegmentReader clone = merge.readersClone[i] = (SegmentReader) reader.clone(true); ??? merger.add(clone); } |
- 進行段合并:mergedDocCount = merge.info.docCount = merger.merge(merge.mergeDocStores);
- 合并生成的段生成為cfs:merger.createCompoundFile(compoundFileName);
SegmentMerger.merge(boolean) 包含以下幾部分:
- 合并域:mergeFields()
- 合并詞典和倒排表:mergeTerms();
- 合并標準化因子:mergeNorms();
- 合并詞向量:mergeVectors();
下面依次分析者幾部分。
2.3.1、合并存儲域
合并存儲域主要包含兩部分:一部分是合并fnm信息,也即域元數據信息,一部分是合并fdt,fdx信息,也即域數據信息。
(1) 合并fnm信息
- 首先生成新的域元數據信息:fieldInfos = new FieldInfos();
- 依次用reader讀取每個合并段的域元數據信息,加入上述對象
| for (IndexReader reader : readers) { ??? SegmentReader segmentReader = (SegmentReader) reader; ??? FieldInfos readerFieldInfos = segmentReader.fieldInfos(); ??? int numReaderFieldInfos = readerFieldInfos.size(); ??? for (int j = 0; j < numReaderFieldInfos; j++) { ????? FieldInfo fi = readerFieldInfos.fieldInfo(j); ??????//在通常情況下,所有的段中的文檔都包含相同的域,比如添加文檔的時候,每篇文檔都包 含"title","description","author","time"等,不會為某一篇文檔添加或減少與其他文檔不同的域。但也不排除特殊情況 下有特殊的文檔有特殊的域。因而此處的add是無則添加,有則更新。 ????? fieldInfos.add(fi.name, fi.isIndexed, fi.storeTermVector, ????????? fi.storePositionWithTermVector, fi.storeOffsetWithTermVector, ????????? !reader.hasNorms(fi.name), fi.storePayloads, ????????? fi.omitTermFreqAndPositions); ??? } } |
- 將域元數據信息fnm寫入文件:fieldInfos.write(directory, segment + ".fnm");
(2) 合并段數據信息fdt, fdx
在合并段的數據信息的時候,有兩種情況:
- 情況一:通常情況,要合并的段和新生成段包含的域的名稱,順序都是一樣的,這樣就可以把要合并的段的fdt信息直接拷貝到新生成段的最后,以提高合并效率。
- 情況二:要合并的段包含特殊的文檔,其包含的域多于或者少于新生成段的域,這樣就不能夠直接拷貝,而是一篇文檔一篇文檔的添加。這樣合并效率大大降低,因而不鼓勵添加文檔的時候,不同的文檔使用不同的域。
具體過程如下:
- 首先檢查要合并的各個段,其包含域的名稱,順序是否同新生成段的一致,也即是否屬于第一種情況:setMatchingSegmentReaders();
| private void setMatchingSegmentReaders() { ? int numReaders = readers.size(); ? matchingSegmentReaders = new SegmentReader[numReaders]; ??//遍歷所有的要合并的段 ? for (int i = 0; i < numReaders; i++) { ??? IndexReader reader = readers.get(i); ??? if (reader instanceof SegmentReader) { ????? SegmentReader segmentReader = (SegmentReader) reader; ????? boolean same = true; ????? FieldInfos segmentFieldInfos = segmentReader.fieldInfos(); ????? int numFieldInfos = segmentFieldInfos.size(); ??????//依次比較要合并的段和新生成的段的段名,順序是否一致。 ????? for (int j = 0; same && j < numFieldInfos; j++) { ??????? same = fieldInfos.fieldName(j).equals(segmentFieldInfos.fieldName(j)); ????? } ??????//最后生成matchingSegmentReaders數組,如果此數組的第i項不是null,則說明第i個段同新生成的段名稱,順序完全一致,可以采取情況一得方式。如果此數組的第i項是null,則說明第i個段包含特殊的域,則采取情況二的方式。 ????? if (same) { ??????? matchingSegmentReaders[i] = segmentReader; ????? } ??? } ? } } |
- 生成存儲域的寫對象:FieldsWriter fieldsWriter = new FieldsWriter(directory, segment, fieldInfos);
- 依次遍歷所有的要合并的段,按照上述兩種情況,使用不同策略進行合并
| int idx = 0; for (IndexReader reader : readers) { ? final SegmentReader matchingSegmentReader = matchingSegmentReaders[idx++]; ? FieldsReader matchingFieldsReader = null; ??//如果matchingSegmentReader!=null,表示此段屬于情況一,得到matchingFieldsReader ? if (matchingSegmentReader != null) { ??? final FieldsReader fieldsReader = matchingSegmentReader.getFieldsReader(); ??? if (fieldsReader != null && fieldsReader.canReadRawDocs()) {??????????? ????? matchingFieldsReader = fieldsReader; ??? } ? } ??//根據此段是否包含刪除的文檔采取不同的策略 ? if (reader.hasDeletions()) { ??? docCount += copyFieldsWithDeletions(fieldsWriter, reader, matchingFieldsReader); ? } else { ??? docCount += copyFieldsNoDeletions(fieldsWriter,reader, matchingFieldsReader); ? } } |
- 合并包含刪除文檔的段
| private int copyFieldsWithDeletions(final FieldsWriter fieldsWriter, final IndexReader reader, ??????????????????????????????????? final FieldsReader matchingFieldsReader) ? throws IOException, MergeAbortedException, CorruptIndexException { ? int docCount = 0; ? final int maxDoc = reader.maxDoc(); ??//matchingFieldsReader!=null,說明此段屬于情況一, 則可以直接拷貝。 ? if (matchingFieldsReader != null) { ??? for (int j = 0; j < maxDoc;) { ????? if (reader.isDeleted(j)) { ????????// 如果文檔被刪除,則跳過此文檔。 ??????? ++j; ??????? continue; ????? } ????? int start = j, numDocs = 0; ????? do { ??????? j++; ??????? numDocs++; ??????? if (j >= maxDoc) break; ??????? if (reader.isDeleted(j)) { ????????? j++; ????????? break; ??????? } ????? } while(numDocs < MAX_RAW_MERGE_DOCS); ????? //從要合并的段中從第start篇文檔開始,依次讀取numDocs篇文檔的文檔長度到rawDocLengths中。 ????? IndexInput stream = matchingFieldsReader.rawDocs(rawDocLengths, start, numDocs); ????? //用fieldsStream.copyBytes(…)直接將fdt信息從要合并的段拷貝到新生成的段,然后將上面讀出的rawDocLengths轉換成為每篇文檔在fdt中的偏移量,寫入fdx文件。 ????? fieldsWriter.addRawDocuments(stream, rawDocLengths, numDocs); ????? docCount += numDocs; ????? checkAbort.work(300 * numDocs); ??? } ? } else { ??? //matchingFieldsReader==null,說明此段屬于情況二,必須每篇文檔依次添加。 ??? for (int j = 0; j < maxDoc; j++) { ????? if (reader.isDeleted(j)) { ????????// 如果文檔被刪除,則跳過此文檔。 ??????? continue; ????? } ??????//同addDocument的過程中一樣,重新將文檔添加一遍。 ????? Document doc = reader.document(j); ????? fieldsWriter.addDocument(doc); ????? docCount++; ????? checkAbort.work(300); ??? } ? } ? return docCount; } |
- 合并不包含刪除文檔的段:除了跳過刪除的文檔的部分,同上述過程一樣。
- 關閉存儲域的寫對象:fieldsWriter.close();
2.3.2、合并標準化因子
合并標準化因子的過程比較簡單,基本就是對每一個域,用指向合并段的reader讀出標準化因子,然后再寫入新生成的段。
| private void mergeNorms() throws IOException { ? byte[] normBuffer = null; ? IndexOutput output = null; ? try { ??? int numFieldInfos = fieldInfos.size(); ???//對于每一個域 ??? for (int i = 0; i < numFieldInfos; i++) { ????? FieldInfo fi = fieldInfos.fieldInfo(i); ????? if (fi.isIndexed && !fi.omitNorms) { ??????? if (output == null) { ????????? //指向新生成的段的nrm文件的寫入流 ????????? output = directory.createOutput(segment + "." + IndexFileNames.NORMS_EXTENSION); ????????? //寫nrm文件頭 ????????? output.writeBytes(NORMS_HEADER,NORMS_HEADER.length); ??????? } ??????? //對于每一個合并段的reader ??????? for ( IndexReader reader : readers) { ????????? int maxDoc = reader.maxDoc(); ????????? if (normBuffer == null || normBuffer.length < maxDoc) { ??????????? // the buffer is too small for the current segment ??????????? normBuffer = new byte[maxDoc]; ????????? } ????????? //讀出此段的nrm信息。 ????????? reader.norms(fi.name, normBuffer, 0); ????????? if (!reader.hasDeletions()) { ??????????? //如果沒有文檔被刪除則寫入新生成的段。 ??????????? output.writeBytes(normBuffer, maxDoc); ????????? } else { ??????????? //如果有文檔刪除則跳過刪除的文檔寫入新生成的段。 ??????????? for (int k = 0; k < maxDoc; k++) { ????????????? if (!reader.isDeleted(k)) { ??????????????? output.writeByte(normBuffer[k]); ????????????? } ??????????? } ????????? } ????????? checkAbort.work(maxDoc); ??????? } ????? } ??? } ? } finally { ??? if (output != null) { ????? output.close(); ??? } ? } } |
2.3.3、合并詞向量
合并詞向量的過程同合并存儲域的過程非常相似,也包括兩種情況:
- 情況一:通常情況,要合并的段和新生成段包含的域的名稱,順序都是一樣的,這樣就可以把要合并的段的詞向量信息直接拷貝到新生成段的最后,以提高合并效率。
- 情況二:要合并的段包含特殊的文檔,其包含的域多于或者少于新生成段的域,這樣就不能夠直接拷貝,而是一篇文檔一篇文檔的添加。這樣合并效率大大降低,因而不鼓勵添加文檔的時候,不同的文檔使用不同的域。
具體過程如下:
- 生成詞向量的寫對象:TermVectorsWriter termVectorsWriter = new TermVectorsWriter(directory, segment, fieldInfos);
- 依次遍歷所有的要合并的段,按照上述兩種情況,使用不同策略進行合并
| int idx = 0; for (final IndexReader reader : readers) { ? final SegmentReader matchingSegmentReader = matchingSegmentReaders[idx++]; ? TermVectorsReader matchingVectorsReader = null; ? //如果matchingSegmentReader!=null,表示此段屬于情況一,得到matchingFieldsReader ? if (matchingSegmentReader != null) { ??? TermVectorsReader vectorsReader = matchingSegmentReader.getTermVectorsReaderOrig(); ??? if (vectorsReader != null && vectorsReader.canReadRawDocs()) { ????? matchingVectorsReader = vectorsReader; ??? } ? } ? //根據此段是否包含刪除的文檔采取不同的策略 ? if (reader.hasDeletions()) { ??? copyVectorsWithDeletions(termVectorsWriter, matchingVectorsReader, reader); ? } else { ??? copyVectorsNoDeletions(termVectorsWriter, matchingVectorsReader, reader); ? } } |
- 合并包含刪除文檔的段
| private void copyVectorsWithDeletions(final TermVectorsWriter termVectorsWriter, final TermVectorsReader matchingVectorsReader, final IndexReader reader) ? throws IOException, MergeAbortedException { ? final int maxDoc = reader.maxDoc(); ? //matchingFieldsReader!=null,說明此段屬于情況一, 則可以直接拷貝。 ? if (matchingVectorsReader != null) { ??? for (int docNum = 0; docNum < maxDoc;) { ????? if (reader.isDeleted(docNum)) { ??????? // 如果文檔被刪除,則跳過此文檔。 ??????? ++docNum; ??????? continue; ????? } ????? int start = docNum, numDocs = 0; ????? do { ??????? docNum++; ??????? numDocs++; ??????? if (docNum >= maxDoc) break; ??????? if (reader.isDeleted(docNum)) { ????????? docNum++; ????????? break; ??????? } ????? } while(numDocs < MAX_RAW_MERGE_DOCS); ??????//從要合并的段中從第start篇文檔開始,依次讀取numDocs篇文檔的tvd到rawDocLengths中,tvf到rawDocLengths2。 ????? matchingVectorsReader.rawDocs(rawDocLengths, rawDocLengths2, start, numDocs); ??????//用tvd.copyBytes(…)直接將tvd信息從要合并的段拷貝到新生成的段,然后將上面讀出的rawDocLengths轉 換成為每篇文檔在tvd文件中的偏移量,寫入tvx文件。用tvf.copyBytes(…)直接將tvf信息從要合并的段拷貝到新生成的段,然后將上面 讀出的rawDocLengths2轉換成為每篇文檔在tvf文件中的偏移量,寫入tvx文件。 ????? termVectorsWriter.addRawDocuments(matchingVectorsReader, rawDocLengths, rawDocLengths2, numDocs); ????? checkAbort.work(300 * numDocs); ??? } ? } else { ??? //matchingFieldsReader==null,說明此段屬于情況二,必須每篇文檔依次添加。 ??? for (int docNum = 0; docNum < maxDoc; docNum++) { ????? if (reader.isDeleted(docNum)) { ??????? // 如果文檔被刪除,則跳過此文檔。 ??????? continue; ????? } ????? //同addDocument的過程中一樣,重新將文檔添加一遍。 ????? TermFreqVector[] vectors = reader.getTermFreqVectors(docNum); ????? termVectorsWriter.addAllDocVectors(vectors); ????? checkAbort.work(300); ??? } ? } } |
- 合并不包含刪除文檔的段:除了跳過刪除的文檔的部分,同上述過程一樣。
- 關閉詞向量的寫對象:termVectorsWriter.close();
2.3.4、合并詞典和倒排表
以上都是合并正向信息,相對過程比較清晰。而合并詞典和倒排表就不這么簡單了,因為在詞典中,Lucene要求按照字典順序排序,在倒排表中,文檔號要按照從小到大順序排序排序,在每個段中,文檔號都是從零開始編號的。
所以反向信息的合并包括兩部分:
- 對字典的合并,需要對詞典中的Term進行重新排序
- 對于相同的Term,對包含此Term的文檔號列表進行合并,需要對文檔號重新編號。
后者相對簡單,假設如果第一個段的編號是0~N,第二個段的編號是0~M,當兩個段合并成一個段的時候,第一個段的編號依然是0~N,第二個段的編號變成N~N+M就可以了,也即增加一個偏移量(前一個段的文檔個數)。
對詞典的合并需要找出兩個段中相同的詞,Lucene是通過一個稱為match的SegmentMergeInfo類型的數組以及稱為queue的 SegmentMergeQueue實現的,SegmentMergeQueue是繼承于 PriorityQueue<SegmentMergeInfo>,是一個優先級隊列,是按照字典順序排序的。 SegmentMergeInfo保存要合并的段的詞典及倒排表信息,在SegmentMergeQueue中用來排序的key是它代表的段中的第一個 Term。
在總論部分,舉了一個例子表明詞典和倒排表合并的過程。
下面讓我們深入代碼看一看具體的實現:
(1) 生成優先級隊列,并將所有的段都加入優先級隊列。
| //在Lucene索引過程分析(4)中提到過,FormatPostingsFieldsConsumer 是用來寫入倒排表信息的。 //FormatPostingsFieldsWriter.addField(FieldInfo field)用于添加索引域信息,其返回FormatPostingsTermsConsumer用于添加詞信息。 //FormatPostingsTermsConsumer.addTerm(char[] text, int start)用于添加詞信息,其返回FormatPostingsDocsConsumer用于添加freq信息 //FormatPostingsDocsConsumer.addDoc(int docID, int termDocFreq)用于添加freq信息,其返回FormatPostingsPositionsConsumer用于添加prox信息 //FormatPostingsPositionsConsumer.addPosition(int position, byte[] payload, int payloadOffset, int payloadLength)用于添加prox信息 FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state, fieldInfos); //優先級隊列 queue = new SegmentMergeQueue(readers.size()); //對于每一個段 final int readerCount = readers.size(); for (int i = 0; i < readerCount; i++) { ??? IndexReader reader = readers.get(i); ??? TermEnum termEnum = reader.terms(); ??? //生成SegmentMergeInfo對象,termEnum就是此段的詞典及倒排表。 ??? SegmentMergeInfo smi = new SegmentMergeInfo(base, termEnum, reader); ?? //base就是下一個段的文檔號偏移量,等于此段的文檔數目。 ??? base += reader.numDocs(); ??? if (smi.next())?//得到段的第一個Term ??????? queue.add(smi);?//將此段放入優先級隊列。 ??? else ??????? smi.close(); } |
(2) 生成match數組
SegmentMergeInfo[] match = new SegmentMergeInfo[readers.size()];
(3) 合并詞典
| //如果隊列不為空,則合并尚未結束 while (queue.size() > 0) { ? int matchSize = 0; ? //取出優先級隊列的第一個段,放到match數組中 ? match[matchSize++] = queue.pop(); ? Term term = match[0].term; ? SegmentMergeInfo top = queue.top(); ? //如果優先級隊列的最頂端和已經彈出的match中的段的第一個Term相同,則全部彈出。 ? while (top != null && term.compareTo(top.term) == 0) { ??? match[matchSize++] =? queue.pop(); ??? top =? queue.top(); ? } ? if (currentField != term.field) { ??? currentField = term.field; ??? if (termsConsumer != null) ????? termsConsumer.finish(); ??? final FieldInfo fieldInfo = fieldInfos.fieldInfo(currentField); ??? //FormatPostingsFieldsWriter.addField(FieldInfo field)用于添加索引域信息,其返回FormatPostingsTermsConsumer用于添加詞信息。 ??? termsConsumer = consumer.addField(fieldInfo); ??? omitTermFreqAndPositions = fieldInfo.omitTermFreqAndPositions; ? } ? //合并match數組中的所有的段的第一個Term的倒排表信息,并寫入新生成的段。 ? int df = appendPostings(termsConsumer, match, matchSize); ? checkAbort.work(df/3.0); ? while (matchSize > 0) { ??? SegmentMergeInfo smi = match[—matchSize]; ??? //如果match中的段還有下一個Term,則放回優先級隊列,進行下一輪的循環。 ??? if (smi.next()) ????? queue.add(smi); ??? else ????? smi.close(); ? } } |
(4) 合并倒排表
| private final int appendPostings(final FormatPostingsTermsConsumer termsConsumer, SegmentMergeInfo[] smis, int n) ????? throws CorruptIndexException, IOException { //FormatPostingsTermsConsumer.addTerm(char[] text, int start)用于添加詞信息,其返回FormatPostingsDocsConsumer用于添加freq信息 ? //將match數組中段的第一個Term添加到新生成的段中。 ? final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(smis[0].term.text); ? int df = 0; ? for (int i = 0; i < n; i++) { ??? SegmentMergeInfo smi = smis[i]; ??? //得到要合并的段的位置信息(prox) ??? TermPositions postings = smi.getPositions(); ??? //此段的文檔號偏移量 ??? int base = smi.base; ??? //在要合并的段中找到Term的倒排表位置。 ??? postings.seek(smi.termEnum); ??? //不斷得到下一篇文檔號 ??? while (postings.next()) { ????? df++; ????? int doc = postings.doc(); ????? //文檔號都要加上偏移量 ????? doc += base; ???? //得到詞頻信息(frq) ????? final int freq = postings.freq(); ???? //FormatPostingsDocsConsumer.addDoc(int docID, int termDocFreq)用于添加freq信息,其返回FormatPostingsPositionsConsumer用于添加prox信息 ????? final FormatPostingsPositionsConsumer posConsumer = docConsumer.addDoc(doc, freq); ????? //如果位置信息需要保存 ????? if (!omitTermFreqAndPositions) { ??????? for (int j = 0; j < freq; j++) { ????????? //得到位置信息(prox)以及payload信息 ????????? final int position = postings.nextPosition(); ????????? final int payloadLength = postings.getPayloadLength(); ????????? if (payloadLength > 0) { ??????????? if (payloadBuffer == null || payloadBuffer.length < payloadLength) ????????????? payloadBuffer = new byte[payloadLength]; ??????????? postings.getPayload(payloadBuffer, 0); ????????? } ???????? //FormatPostingsPositionsConsumer.addPosition(int position, byte[] payload, int payloadOffset, int payloadLength)用于添加prox信息 ????????? posConsumer.addPosition(position, payloadBuffer, 0, payloadLength); ??????? } ??????? posConsumer.finish(); ????? } ??? } ? } ? docConsumer.finish(); ? return df; } |
總結
以上是生活随笔為你收集整理的Lucene学习总结之五:Lucene段合并(merge)过程分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Lucene学习总结之四:Lucene索
- 下一篇: Lucene学习总结之七:Lucene搜