Lucene学习总结之四:Lucene索引过程分析
對于Lucene的索引過程,除了將詞(Term)寫入倒排表并最終寫入Lucene的索引文件外,還包括分詞(Analyzer)和合并段(merge segments)的過程,本次不包括這兩部分,將在以后的文章中進行分析。
Lucene的索引過程,很多的博客,文章都有介紹,推薦大家上網搜一篇文章:《Annotated Lucene》,好像中文名稱叫《Lucene源碼剖析》是很不錯的。
想要真正了解Lucene索引文件過程,最好的辦法是跟進代碼調試,對著文章看代碼,這樣不但能夠最詳細準確的掌握索引過程(描述都是有偏差的,而代碼是不會騙你的),而且還能夠學習Lucene的一些優秀的實現,能夠在以后的工作中為我所用,畢竟Lucene是比較優秀的開源項目之一。
由于Lucene已經升級到3.0.0了,本索引過程為Lucene 3.0.0的索引過程。
一、索引過程體系結構
Lucene 3.0的搜索要經歷一個十分復雜的過程,各種信息分散在不同的對象中分析,處理,寫入,為了支持多線程,每個線程都創建了一系列類似結構的對象集,為了提高效率,要復用一些對象集,這使得索引過程更加復雜。
其實索引過程,就是經歷下圖中所示的索引鏈的過程,索引鏈中的每個節點,負責索引文檔的不同部分的信息 ,當經歷完所有的索引鏈的時候,文檔就處理完畢了。最初的索引鏈,我們稱之基本索引鏈?。
為了支持多線程,使得多個線程能夠并發處理文檔,因而每個線程都要建立自己的索引鏈體系,使得每個線程能夠獨立工作,在基本索引鏈基礎上建立起來的每個線程獨立的索引鏈體系,我們稱之線程索引鏈?。線程索引鏈的每個節點是由基本索引鏈中的相應的節點調用函數addThreads創建的。
為了提高效率,考慮到對相同域的處理有相似的過程,應用的緩存也大致相當,因而不必每個線程在處理每一篇文檔的時候都重新創建一系列對象,而是復用這些對象。所以對每個域也建立了自己的索引鏈體系,我們稱之域索引鏈?。域索引鏈的每個節點是由線程索引鏈中的相應的節點調用addFields創建的。
當完成對文檔的處理后,各部分信息都要寫到索引文件中,寫入索引文件的過程是同步的,不是多線程的,也是沿著基本索引鏈將各部分信息依次寫入索引文件的。
下面詳細分析這一過程。
?
二、詳細索引過程
1、創建IndexWriter對象
代碼:
| IndexWriter writer = new IndexWriter(FSDirectory.open(INDEX_DIR), new StandardAnalyzer(Version.LUCENE_CURRENT), true, IndexWriter.MaxFieldLength.LIMITED); |
IndexWriter對象主要包含以下幾方面的信息:
- 用于索引文檔
- Directory directory;? 指向索引文件夾
- Analyzer analyzer;??? 分詞器
- Similarity similarity = Similarity.getDefault(); 影響打分的標準化因子(normalization factor)部分,對文檔的打分分兩個部分,一部分是索引階段計算的,與查詢語句無關,一部分是搜索階段計算的,與查詢語句相關。
- SegmentInfos segmentInfos = new SegmentInfos(); 保存段信息,大家會發現,和segments_N中的信息幾乎一一對應。
- IndexFileDeleter deleter; 此對象不是用來刪除文檔的,而是用來管理索引文件的。
- Lock writeLock; 每一個索引文件夾只能打開一個IndexWriter,所以需要鎖。
- Set segmentsToOptimize = new HashSet(); 保存正在最優化(optimize)的段信息。當調用optimize的時候,當前所有的段信息加入此Set,此后新生成的段并不參與此次最優化。
- 用于合并段,在合并段的文章中將詳細描述
- SegmentInfos localRollbackSegmentInfos;
- HashSet mergingSegments = new HashSet();
- MergePolicy mergePolicy = new LogByteSizeMergePolicy(this);
- MergeScheduler mergeScheduler = new ConcurrentMergeScheduler();
- LinkedList pendingMerges = new LinkedList();
- Set runningMerges = new HashSet();
- List mergeExceptions = new ArrayList();
- long mergeGen;
- 為保持索引完整性,一致性和事務性
- SegmentInfos rollbackSegmentInfos; 當IndexWriter對索引進行了添加,刪除文檔操作后,可以調用commit將修改提交到文件中去,也可以調用rollback取消從上次commit到此時的修改。
- SegmentInfos localRollbackSegmentInfos; 此段信息主要用于將其他的索引文件夾合并到此索引文件夾的時候,為防止合并到一半出錯可回滾所保存的原來的段信息。?
- 一些配置
- long writeLockTimeout; 獲得鎖的時間超時。當超時的時候,說明此索引文件夾已經被另一個IndexWriter打開了。
- int termIndexInterval; 同tii和tis文件中的indexInterval。
?
有關SegmentInfos對象所保存的信息:
- 當索引文件夾如下的時候,SegmentInfos對象如下表
| segmentInfos??? SegmentInfos? (id=37)???? |
有關IndexFileDeleter:
- 其不是用來刪除文檔的,而是用來管理索引文件的。
- 在對文檔的添加,刪除,對段的合并的處理過程中,會生成很多新的文件,并需要刪除老的文件,因而需要管理。
- 然而要被刪除的文件又可能在被用,因而要保存一個引用計數,僅僅當引用計數為零的時候,才執行刪除。
- 下面這個例子能很好的說明IndexFileDeleter如何對文件引用計數并進行添加和刪除的。
| (1) 創建IndexWriter時???? IndexWriter writer = new IndexWriter(FSDirectory.open(indexDir), new StandardAnalyzer(Version.LUCENE_CURRENT), true, IndexWriter.MaxFieldLength.LIMITED);? 索引文件夾如下:
引用計數如下: refCounts??? HashMap? (id=101)????? (2) 添加第一個段時 indexDocs(writer, docDir);? 首先生成的不是compound文件
因而引用計數如下: refCounts??? HashMap? (id=101)????? 然后會合并成compound文件,并加入引用計數
refCounts??? HashMap? (id=101)????? 然后會用IndexFileDeleter.decRef()來刪除[_0.nrm, _0.tis, _0.fnm, _0.tii, _0.frq, _0.fdx, _0.prx, _0.fdt]文件
refCounts??? HashMap? (id=101)????? 然后為建立新的segments_2 ? ? refCounts??? HashMap? (id=77)????? 然后IndexFileDeleter.decRef() 刪除segments_1文件 ? refCounts??? HashMap? (id=77)????? (3) 添加第二個段 indexDocs(writer, docDir);? ?? (4) 添加第三個段,由于MergeFactor為3,則會進行一次段合并。 indexDocs(writer, docDir);? 首先和其他的段一樣,生成_2.cfs以及segments_4 ? 同時創建了一個線程來進行背后進行段合并(ConcurrentMergeScheduler$MergeThread.run()) ? 這時候的引用計數如下 refCounts??? HashMap? (id=84)????? (5) 關閉writer writer.close(); 通過IndexFileDeleter.decRef()刪除被合并的段 ? |
有關SimpleFSLock進行JVM之間的同步:
- 有時候,我們寫java程序的時候,也需要不同的JVM之間進行同步,來保護一個整個系統中唯一的資源。
- 如果唯一的資源僅僅在一個進程中,則可以使用線程同步的機制
- 然而如果唯一的資源要被多個進程進行訪問,則需要進程間同步的機制,無論是Windows和Linux在操作系統層面都有很多的進程間同步的機制。
- 但進程間的同步卻不是Java的特長,Lucene的SimpleFSLock給我們提供了一種方式。
| Lock的抽象類? public abstract class Lock { ? public static long LOCK_POLL_INTERVAL = 1000; ? public static final long LOCK_OBTAIN_WAIT_FOREVER = -1; ? public abstract boolean obtain() throws IOException; ? public boolean obtain(long lockWaitTimeout) throws LockObtainFailedException, IOException { ??? boolean locked = obtain(); ??? if (lockWaitTimeout < 0 && lockWaitTimeout != LOCK_OBTAIN_WAIT_FOREVER)? ??? long maxSleepCount = lockWaitTimeout / LOCK_POLL_INTERVAL; ??? long sleepCount = 0; ??? while (!locked) { ????? if (lockWaitTimeout != LOCK_OBTAIN_WAIT_FOREVER && sleepCount++ >= maxSleepCount) {? ? public abstract void release() throws IOException; ? public abstract boolean isLocked() throws IOException; } LockFactory的抽象類 public abstract class LockFactory { ? public abstract Lock makeLock(String lockName); ? abstract public void clearLock(String lockName) throws IOException;? SimpleFSLock的實現類 class SimpleFSLock extends Lock { ? File lockFile;? ? public SimpleFSLock(File lockDir, String lockFileName) {? ? @Override? ??? if (!lockDir.exists()) { ????? if (!lockDir.mkdirs())? ??? } else if (!lockDir.isDirectory()) { ????? throw new IOException("Found regular file where directory expected: " + lockDir.getAbsolutePath());? ??? return lockFile.createNewFile(); ? } ? @Override? ??? if (lockFile.exists() && !lockFile.delete())? ? } ? @Override? ??? return lockFile.exists(); ? } } SimpleFSLockFactory的實現類 public class SimpleFSLockFactory extends FSLockFactory { ? public SimpleFSLockFactory(String lockDirName) throws IOException { ??? setLockDir(new File(lockDirName)); ? } ? @Override? ??? if (lockPrefix != null) { ????? lockName = lockPrefix + "-" + lockName; ??? } ??? return new SimpleFSLock(lockDir, lockName); ? } ? @Override? ??? if (lockDir.exists()) { ????? if (lockPrefix != null) { ??????? lockName = lockPrefix + "-" + lockName; ????? } ????? File lockFile = new File(lockDir, lockName); ????? if (lockFile.exists() && !lockFile.delete()) { ??????? throw new IOException("Cannot delete " + lockFile); ????? } ??? } ? } }; |
?
2、創建文檔Document對象,并加入域(Field)
代碼:
| Document doc = new Document(); doc.add(new Field("path", f.getPath(), Field.Store.YES, Field.Index.NOT_ANALYZED)); doc.add(new Field("modified",DateTools.timeToString(f.lastModified(), DateTools.Resolution.MINUTE), Field.Store.YES, Field.Index.NOT_ANALYZED)); doc.add(new Field("contents", new FileReader(f))); |
Document對象主要包括以下部分:
- 此文檔的boost,默認為1,大于一說明比一般的文檔更加重要,小于一說明更不重要。
- 一個ArrayList保存此文檔所有的域
- 每一個域包括域名,域值,和一些標志位,和fnm,fdx,fdt中的描述相對應。
| doc??? Document? (id=42)???? |
3、將文檔加入IndexWriter
代碼:
| writer.addDocument(doc);? -->IndexWriter.addDocument(Document doc, Analyzer analyzer)? ???? -->doFlush = docWriter.addDocument(doc, analyzer);? ????????? --> DocumentsWriter.updateDocument(Document, Analyzer, Term)? 注:--> 代表一級函數調用 |
IndexWriter繼而調用DocumentsWriter.addDocument,其又調用DocumentsWriter.updateDocument。
4、將文檔加入DocumentsWriter
代碼:
| DocumentsWriter.updateDocument(Document doc, Analyzer analyzer, Term delTerm)? -->(1) DocumentsWriterThreadState state = getThreadState(doc, delTerm);? -->(2) DocWriter perDoc = state.consumer.processDocument();? -->(3) finishDocument(state, perDoc); |
DocumentsWriter對象主要包含以下幾部分:
- 用于寫索引文件
- IndexWriter writer;
- Directory directory;
- Similarity similarity:分詞器
- String segment:當前的段名,每當flush的時候,將索引寫入以此為名稱的段。
| IndexWriter.doFlushInternal()? --> String segment = docWriter.getSegment();//return segment? --> newSegment = new SegmentInfo(segment,……);? --> docWriter.createCompoundFile(segment);//根據segment創建cfs文件。 |
- ?
- String docStoreSegment:存儲域所要寫入的目標段。(在索引文件格式一文中已經詳細描述)
- int docStoreOffset:存儲域在目標段中的偏移量。
- int nextDocID:下一篇添加到此索引的文檔ID號,對于同一個索引文件夾,此變量唯一,且同步訪問。
- DocConsumer consumer; 這是整個索引過程的核心,是IndexChain整個索引鏈的源頭。
| 基本索引鏈: 對于一篇文檔的索引過程,不是由一個對象來完成的,而是用對象組合的方式形成的一個處理鏈,鏈上的每個對象僅僅處理索引過程的一部分,稱為索引鏈,由于后面還有其他的索引鏈,所以此處的索引鏈我稱為基本索引鏈。 DocConsumer consumer 類型為DocFieldProcessor,是整個索引鏈的源頭,包含如下部分:
|
- 刪除文檔
- BufferedDeletes deletesInRAM = new BufferedDeletes();
- BufferedDeletes deletesFlushed = new BufferedDeletes();
| 類BufferedDeletes包含了一下的成員變量:
由此可見,文檔的刪除主要有三種方式:
刪除文檔既可以用reader進行刪除,也可以用writer進行刪除,不同的是,reader進行刪除后,此reader馬上能夠生效,而用writer刪除后,會被緩存在deletesInRAM及deletesFlushed中,只有寫入到索引文件中,當reader再次打開的時候,才能夠看到。 那deletesInRAM和deletesFlushed各有什么用處呢? 此版本的Lucene對文檔的刪除是支持多線程的,當用IndexWriter刪除文檔的時候,都是緩存在deletesInRAM中的,直到flush,才將刪除的文檔寫入到索引文件中去,我們知道flush是需要一段時間的,那么在flush的過程中,另一個線程又有文檔刪除怎么辦呢? 一般過程是這個樣子的,當flush的時候,首先在同步(synchornized)的方法pushDeletes中,將deletesInRAM全部加到deletesFlushed中,然后將deletesInRAM清空,退出同步方法,于是flush的線程程就向索引文件寫deletesFlushed中的刪除文檔的過程,而與此同時其他線程新刪除的文檔則添加到新的deletesInRAM中去,直到下次flush才寫入索引文件。 |
- 緩存管理
- 為了提高索引的速度,Lucene對很多的數據進行了緩存,使一起寫入磁盤,然而緩存需要進行管理,何時分配,何時回收,何時寫入磁盤都需要考慮。
- ArrayList freeCharBlocks = new ArrayList();將用于緩存詞(Term)信息的空閑塊
- ArrayList freeByteBlocks = new ArrayList();將用于緩存文檔號(doc id)及詞頻(freq),位置(prox)信息的空閑塊。
- ArrayList freeIntBlocks = new ArrayList();將存儲某詞的詞頻(freq)和位置(prox)分別在byteBlocks中的偏移量
- boolean bufferIsFull;用來判斷緩存是否滿了,如果滿了,則應該寫入磁盤
- long numBytesAlloc;分配的內存數量
- long numBytesUsed;使用的內存數量
- long freeTrigger;應該開始回收內存時的內存用量。
- long freeLevel;回收內存應該回收到的內存用量。
- long ramBufferSize;用戶設定的內存用量。
| 緩存用量之間的關系如下:? DocumentsWriter.setRAMBufferSizeMB(double mb){? ??? ramBufferSize = (long) (mb*1024*1024);//用戶設定的內存用量,當使用內存大于此時,開始寫入磁盤? DocumentsWriter.balanceRAM(){? ??? if (numBytesAlloc+deletesRAMUsed > freeTrigger) {? ??? //當分配的內存加刪除文檔所占用的內存大于105%的時候,開始釋放內存? ??????? while(numBytesAlloc+deletesRAMUsed > freeLevel) {? ??????? //一直進行釋放,直到95%? ??????????? //釋放free blocks ??????????? byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);? ??????????? freeCharBlocks.remove(freeCharBlocks.size()-1);? ??????????? freeIntBlocks.remove(freeIntBlocks.size()-1);? ??????? if (numBytesUsed+deletesRAMUsed > ramBufferSize){ ??????? //當使用的內存加刪除文檔占有的內存大于用戶指定的內存時,可以寫入磁盤 ????????????? bufferIsFull = true; ??????? } ??? }? 當判斷是否應該寫入磁盤時:
DocumentsWriter.timeToFlushDeletes(){ ??? return (bufferIsFull || deletesFull()) && setFlushPending(); } DocumentsWriter.deletesFull(){ ??? return (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH &&? } |
- 多線程并發索引
- 為了支持多線程并發索引,對每一個線程都有一個DocumentsWriterThreadState,其為每一個線程根據DocConsumer consumer的索引鏈來創建每個線程的索引鏈(XXXPerThread),來進行對文檔的并發處理。
- DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
- HashMap threadBindings = new HashMap();
- 雖然對文檔的處理過程可以并行,但是將文檔寫入索引文件卻必須串行進行,串行寫入的代碼在DocumentsWriter.finishDocument中
- WaitQueue waitQueue = new WaitQueue()
- long waitQueuePauseBytes
- long waitQueueResumeBytes
| 在Lucene中,文檔是按添加的順序編號的,DocumentsWriter中的nextDocID就是記錄下一個添加的文檔id。 當Lucene支持多線程的時候,就必須要有一個synchornized方法來付給文檔id并且將nextDocID加一,這些是在DocumentsWriter.getThreadState這個函數里面做的。 雖然給文檔付ID沒有問題了。但是由Lucene索引文件格式我們知道,文檔是要按照ID的順序從小到大寫到索引文件中去的,然而不同的文檔處理速度不同,當一個先來的線程一處理一篇需要很長時間的大文檔時,另一個后來的線程二可能已經處理了很多小的文檔了,但是這些后來小文檔的ID號都大于第一個線程所處理的大文檔,因而不能馬上寫到索引文件中去,而是放到waitQueue中,僅僅當大文檔處理完了之后才寫入索引文件。 waitQueue中有一個變量nextWriteDocID表示下一個可以寫入文件的ID,當付給大文檔ID=4時,則nextWriteDocID也設為4,雖然后來的小文檔5,6,7,8等都已處理結束,但是如下代碼, WaitQueue.add(){ ??? if (doc.docID == nextWriteDocID){? ?? doPause() } 則把5, 6, 7, 8放入waiting隊列,并且記錄當前等待的文檔所占用的內存大小waitingBytes。 當大文檔4處理完畢后,不但寫入文檔4,把原來等待的文檔5, 6, 7, 8也一起寫入。 WaitQueue.add(){ ??? if (doc.docID == nextWriteDocID) { ?????? writeDocument(doc); ?????? while(true) { ?????????? doc = waiting[nextWriteLoc]; ?????????? writeDocument(doc); ?????? } ?? } else { ????? ………… ?? } ?? doPause() } 但是這存在一個問題:當大文檔很大很大,處理的很慢很慢的時候,后來的線程二可能已經處理了很多的小文檔了,這些文檔都是在waitQueue中,則占有了越來越多的內存,長此以往,有內存不夠的危險。 因而在finishDocuments里面,在WaitQueue.add最后調用了doPause()函數 DocumentsWriter.finishDocument(){ ??? doPause = waitQueue.add(docWriter); ??? if (doPause)? ??? notifyAll(); } WaitQueue.doPause() {? 當waitingBytes足夠大的時候(為用戶指定的內存使用量的10%),doPause返回true,于是后來的線程二會進入wait狀態,不再處理另外的文檔,而是等待線程一處理大文檔結束。 當線程一處理大文檔結束的時候,調用notifyAll喚醒等待他的線程。 DocumentsWriter.waitForWaitQueue() {? WaitQueue.doResume() {? 當waitingBytes足夠小的時候,doResume返回true, 則線程二不用再wait了,可以繼續處理另外的文檔。 |
- 一些標志位
- int maxFieldLength:一篇文檔中,一個域內可索引的最大的詞(Term)數。
- int maxBufferedDeleteTerms:可緩存的最大的刪除詞(Term)數。當大于這個數的時候,就要寫到文件中了。
此過程又包含如下三個子過程:
4.1、得到當前線程對應的文檔集處理對象(DocumentsWriterThreadState)
代碼為:
| DocumentsWriterThreadState state = getThreadState(doc, delTerm); |
在Lucene中,對于同一個索引文件夾,只能夠有一個IndexWriter打開它,在打開后,在文件夾中,生成文件write.lock,當其他IndexWriter再試圖打開此索引文件夾的時候,則會報org.apache.lucene.store.LockObtainFailedException錯誤。
這樣就出現了這樣一個問題,在同一個進程中,對同一個索引文件夾,只能有一個IndexWriter打開它,因而如果想多線程向此索引文件夾中添加文檔,則必須共享一個IndexWriter,而且在以往的實現中,addDocument函數是同步的(synchronized),也即多線程的索引并不能起到提高性能的效果。
于是為了支持多線程索引,不使IndexWriter成為瓶頸,對于每一個線程都有一個相應的文檔集處理對象(DocumentsWriterThreadState),這樣對文檔的索引過程可以多線程并行進行,從而增加索引的速度。
getThreadState函數是同步的(synchronized),DocumentsWriter有一個成員變量threadBindings,它是一個HashMap,鍵為線程對象(Thread.currentThread()),值為此線程對應的DocumentsWriterThreadState對象。
DocumentsWriterThreadState DocumentsWriter.getThreadState(Document doc, Term delTerm)包含如下幾個過程:
- 根據當前線程對象,從HashMap中查找相應的DocumentsWriterThreadState對象,如果沒找到,則生成一個新對象,并添加到HashMap中
| DocumentsWriterThreadState state = (DocumentsWriterThreadState) threadBindings.get(Thread.currentThread());? if (state == null) {? ??? ……? ??? state = new DocumentsWriterThreadState(this);? ??? ……? ??? threadBindings.put(Thread.currentThread(), state);? }? |
- 如果此線程對象正在用于處理上一篇文檔,則等待,直到此線程的上一篇文檔處理完。
| DocumentsWriter.getThreadState() {? ??? waitReady(state);? ??? state.isIdle = false;? }? waitReady(state) {? ??? while (!state.isIdle) {wait();}? }?? 顯然如果state.isIdle為false,則此線程等待。? 在一篇文檔處理之前,state.isIdle = false會被設定,而在一篇文檔處理完畢之后,DocumentsWriter.finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter)中,會首先設定perThread.isIdle = true; 然后notifyAll()來喚醒等待此文檔完成的線程,從而處理下一篇文檔。 |
- 如果IndexWriter剛剛commit過,則新添加的文檔要加入到新的段中(segment),則首先要生成新的段名。
| initSegmentName(false);? --> if (segment == null) segment = writer.newSegmentName(); |
- 將此線程的文檔處理對象設為忙碌:state.isIdle = false;
4.2、用得到的文檔集處理對象(DocumentsWriterThreadState)處理文檔
代碼為:
| DocWriter perDoc = state.consumer.processDocument(); |
每一個文檔集處理對象DocumentsWriterThreadState都有一個文檔及域處理對象DocFieldProcessorPerThread,它的成員函數processDocument()被調用來對文檔及域進行處理。
| 線程索引鏈(XXXPerThread): 由于要多線程進行索引,因而每個線程都要有自己的索引鏈,稱為線程索引鏈。 線程索引鏈同基本索引鏈有相似的樹形結構,由基本索引鏈中每個層次的對象調用addThreads進行創建的,負責每個線程的對文檔的處理。 DocFieldProcessorPerThread是線程索引鏈的源頭,由DocFieldProcessor.addThreads(…)創建 DocFieldProcessorPerThread對象結構如下:
|
DocumentsWriter.DocWriter DocFieldProcessorPerThread.processDocument()包含以下幾個過程:
4.2.1、開始處理當前文檔
| consumer(DocInverterPerThread).startDocument();? |
在此版的Lucene中,幾乎所有的XXXPerThread的類,都有startDocument和finishDocument兩個函數,因為對同一個線程,這些對象都是復用的,而非對每一篇新來的文檔都創建一套,這樣也提高了效率,也牽扯到數據的清理問題。一般在startDocument函數中,清理處理上篇文檔遺留的數據,在finishDocument中,收集本次處理的結果數據,并返回,一直返回到DocumentsWriter.updateDocument(Document, Analyzer, Term) 然后根據條件判斷是否將數據刷新到硬盤上。
4.2.2、逐個處理文檔的每一個域
由于一個線程可以連續處理多個文檔,而在普通的應用中,幾乎每篇文檔的域都是大致相同的,為每篇文檔的每個域都創建一個處理對象非常低效,因而考慮到復用域處理對象DocFieldProcessorPerField,對于每一個域都有一個此對象。
那當來到一個新的域的時候,如何更快的找到此域的處理對象呢?Lucene創建了一個DocFieldProcessorPerField[] fieldHash哈希表來方便更快查找域對應的處理對象。
當處理各個域的時候,按什么順序呢?其實是按照域名的字典順序。因而Lucene創建了DocFieldProcessorPerField[] fields的數組來方便按順序處理域。
因而一個域的處理對象被放在了兩個地方。
對于域的處理過程如下:
4.2.2.1、首先:對于每一個域,按照域名,在fieldHash中查找域處理對象DocFieldProcessorPerField,代碼如下:
| final int hashPos = fieldName.hashCode() & hashMask;//計算哈希值? |
如果能夠找到,則更新DocFieldProcessorPerField中的域信息fp.fieldInfo.update(field.isIndexed()…)
如果沒有找到,則添加域到DocFieldProcessorPerThread.fieldInfos中,并創建新的DocFieldProcessorPerField,且將其加入哈希表。代碼如下:
| fp = new DocFieldProcessorPerField(this, fi);? |
如果是一個新的field,則將其加入fields數組fields[fieldCount++] = fp;
并且如果是存儲域的話,用StoredFieldsWriterPerThread將其寫到索引中:
| if (field.isStored()) {? |
4.2.2.1.1、處理存儲域的過程如下:
| StoredFieldsWriterPerThread.addField(Fieldable field, FieldInfo fieldInfo)? --> localFieldsWriter.writeField(fieldInfo, field); |
FieldsWriter.writeField(FieldInfo fi, Fieldable field)代碼如下:
| 請參照fdt文件的格式,則一目了然: fieldsStream.writeVInt(fi.number);//文檔號? fieldsStream.writeByte(bits); //域的屬性位 if (field.isCompressed()) {//對于壓縮域? ??????? fieldsStream.writeVInt(len);//寫長度? |
4.2.2.2、然后:對fields數組進行排序,是域按照名稱排序。quickSort(fields, 0, fieldCount-1);
4.2.2.3、最后:按照排序號的順序,對域逐個處理,此處處理的僅僅是索引域,代碼如下:
| for(int i=0;i????? fields[i].consumer.processFields(fields[i].fields, fields[i].fieldCount); |
域處理對象(DocFieldProcessorPerField)結構如下:
| 域索引鏈: 每個域也有自己的索引鏈,稱為域索引鏈,每個域的索引鏈也有同線程索引鏈有相似的樹形結構,由線程索引鏈中每個層次的每個層次的對象調用addField進行創建,負責對此域的處理。 和基本索引鏈及線程索引鏈不同的是,域索引鏈僅僅負責處理索引域,而不負責存儲域的處理。 DocFieldProcessorPerField是域索引鏈的源頭,對象結構如下:
|
4.2.2.3.1、處理索引域的過程如下:
DocInverterPerField.processFields(Fieldable[], int) 過程如下:
- 判斷是否要形成倒排表,代碼如下:
| boolean doInvert = consumer.start(fields, count);? --> TermsHashPerField.start(Fieldable[], int)?? ????? --> for(int i=0;i???????????? if (fields[i].isIndexed())? ???????????????? return true;? ??????????? return false; |
讀到這里,大家可能會發生困惑,既然XXXPerField是對于每一個域有一個處理對象的,那為什么參數傳進來的是Fieldable[]數組, 并且還有域的數目count呢?
其實這不經常用到,但必須得提一下,由上面的fieldHash的實現我們可以看到,是根據域名進行哈希的,所以準確的講,XXXPerField并非對于每一個域有一個處理對象,而是對每一組相同名字的域有相同的處理對象。
對于同一篇文檔,相同名稱的域可以添加多個,代碼如下:
| doc.add(new Field("contents", "the content of the file.", Field.Store.NO, Field.Index.NOT_ANALYZED));? |
則傳進來的名為"contents"的域如下:
| fields??? Fieldable[2]? (id=52)???? |
- 對傳進來的同名域逐一處理,代碼如下
| for(int i=0;i ??? final Fieldable field = fields[i]; ??? if (field.isIndexed() && doInvert) { ??????? //僅僅對索引域進行處理 ??????? if (!field.isTokenized()) { ??????????? //如果此域不分詞,見(1)對不分詞的域的處理 ??????? } else { ??????????? //如果此域分詞,見(2)對分詞的域的處理 ??????? } ??? } } |
(1) 對不分詞的域的處理
(1-1) 得到域的內容,并構建單個Token形成的SingleTokenAttributeSource。因為不進行分詞,因而整個域的內容算做一個Token.
String stringValue = field.stringValue(); //stringValue??? "200910240957"??
final int valueLength = stringValue.length();?
perThread.singleToken.reinit(stringValue, 0, valueLength);
對于此域唯一的一個Token有以下的屬性:
- Term:文字信息。在處理過程中,此值將保存在TermAttribute的實現類實例化的對象TermAttributeImp里面。
- Offset:偏移量信息,是按字或字母的起始偏移量和終止偏移量,表明此Token在文章中的位置,多用于加亮。在處理過程中,此值將保存在OffsetAttribute的實現類實例化的對象OffsetAttributeImp里面。
在SingleTokenAttributeSource里面,有一個HashMap來保存可能用于保存屬性的類名(Key,準確的講是接口)以及保存屬性信息的對象(Value):
| singleToken??? DocInverterPerThread$SingleTokenAttributeSource? (id=150)???? |
(1-2) 得到Token的各種屬性信息,為索引做準備。
consumer.start(field)做的主要事情就是根據各種屬性的類型來構造保存屬性的對象(HashMap中有則取出,無則構造),為索引做準備。
| consumer(TermsHashPerField).start(…) --> termAtt = fieldState.attributeSource.addAttribute(TermAttribute.class);得到的就是上述HashMap中的TermAttributeImpl??? --> consumer(FreqProxTermsWriterPerField).start(f); ????? --> if (fieldState.attributeSource.hasAttribute(PayloadAttribute.class)) { ??????????????? payloadAttribute = fieldState.attributeSource.getAttribute(PayloadAttribute.class);? --> nextPerField(TermsHashPerField).start(f); ????? --> termAtt = fieldState.attributeSource.addAttribute(TermAttribute.class);得到的還是上述HashMap中的TermAttributeImpl ????? --> consumer(TermVectorsTermsWriterPerField).start(f); ??????????? --> if (doVectorOffsets) { ????????????????????? offsetAttribute = fieldState.attributeSource.addAttribute(OffsetAttribute.class);? |
(1-3) 將Token加入倒排表
consumer(TermsHashPerField).add();
加入倒排表的過程,無論對于分詞的域和不分詞的域,過程是一樣的,因而放到對分詞的域的解析中一起說明。
(2) 對分詞的域的處理
(2-1) 構建域的TokenStream
| final TokenStream streamValue = field.tokenStreamValue(); //用戶可以在添加域的時候,應用構造函數public Field(String name, TokenStream tokenStream) 直接傳進一個TokenStream過來,這樣就不用另外構建一個TokenStream了。 if (streamValue != null)? ? …… ? stream = docState.analyzer.reusableTokenStream(fieldInfo.name, reader); } |
此時TokenStream的各項屬性值還都是空的,等待一個一個被分詞后得到,此時的TokenStream對象如下:
| stream??? StopFilter? (id=112)???? |
(2-2) 得到第一個Token,并初始化此Token的各項屬性信息,并為索引做準備(start)。
boolean hasMoreTokens = stream.incrementToken();//得到第一個Token
OffsetAttribute offsetAttribute = fieldState.attributeSource.addAttribute(OffsetAttribute.class);//得到偏移量屬性
| offsetAttribute??? OffsetAttributeImpl? (id=164)???? |
PositionIncrementAttribute posIncrAttribute = fieldState.attributeSource.addAttribute(PositionIncrementAttribute.class);//得到位置屬性
| posIncrAttribute??? PositionIncrementAttributeImpl? (id=129)???? |
consumer.start(field);//其中得到了TermAttribute屬性,如果存儲payload則得到PayloadAttribute屬性,如果存儲詞向量則得到OffsetAttribute屬性。
(2-3) 進行循環,不斷的取下一個Token,并添加到倒排表
| for(;;) { ??? if (!hasMoreTokens) break; ??? ……? ??? ……? |
(2-4) 添加Token到倒排表的過程consumer(TermsHashPerField).add()
TermsHashPerField對象主要包括以下部分:
- CharBlockPool charPool; 用于存儲Token的文本信息,如果不足時,從DocumentsWriter中的freeCharBlocks分配
- ByteBlockPool bytePool;用于存儲freq, prox信息,如果不足時,從DocumentsWriter中的freeByteBlocks分配
- IntBlockPool intPool; 用于存儲分別指向每個Token在bytePool中freq和prox信息的偏移量。如果不足時,從DocumentsWriter的freeIntBlocks分配
- TermsHashConsumerPerField consumer類型為FreqProxTermsWriterPerField,用于寫freq, prox信息到緩存中。
- RawPostingList[] postingsHash = new RawPostingList[postingsHashSize];存儲倒排表,每一個Term都有一個RawPostingList (PostingList),其中包含了int textStart,也即文本在charPool中的偏移量,int byteStart,即此Term的freq和prox信息在bytePool中的起始偏移量,int intStart,即此term的在intPool中的起始偏移量。
形成倒排表的過程如下:
| //得到token的文本及文本長度 final char[] tokenText = termAtt.termBuffer();//[s, t, u, d, e, n, t, s] final int tokenTextLen = termAtt.termLength();//tokenTextLen 8 //按照token的文本計算哈希值,以便在postingsHash中找到此token對應的倒排表 int downto = tokenTextLen;? int hashPos = code & postingsHashMask; //在倒排表哈希表中查找此Token,如果找到相應的位置,但是不是此Token,說明此位置存在哈希沖突,采取重新哈希rehash的方法。 p = postingsHash[hashPos]; if (p != null && !postingEquals(tokenText, tokenTextLen)) {?? //如果此Token之前從未出現過 if (p == null) { ??? if (textLen1 + charPool.charUpto > DocumentsWriter.CHAR_BLOCK_SIZE) { ??????? //當charPool不足的時候,在freeCharBlocks中分配新的buffer ??????? charPool.nextBuffer(); ??? } ??? //從空閑的倒排表中分配新的倒排表 ??? p = perThread.freePostings[--perThread.freePostingsCount]; ??? //將文本復制到charPool中 ??? final char[] text = charPool.buffer;? ??? //將倒排表放入哈希表中 ??? postingsHash[hashPos] = p;? ??? if (numPostingInt + intPool.intUpto > DocumentsWriter.INT_BLOCK_SIZE) intPool.nextBuffer(); ??? //當intPool不足的時候,在freeIntBlocks中分配新的buffer。 ??? if (DocumentsWriter.BYTE_BLOCK_SIZE - bytePool.byteUpto < numPostingInt*ByteBlockPool.FIRST_LEVEL_SIZE) ??????? bytePool.nextBuffer(); ??? //當bytePool不足的時候,在freeByteBlocks中分配新的buffer。 ??? //此處streamCount為2,表明在intPool中,每兩項表示一個詞,一個是指向bytePool中freq信息偏移量的,一個是指向bytePool中prox信息偏移量的。 ??? intUptos = intPool.buffer;? ??? p.intStart = intUptoStart + intPool.intOffset; ??? //在bytePool中分配兩個空間,一個放freq信息,一個放prox信息的。?? ??????? final int upto = bytePool.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE);? ??? //當Term原來沒有出現過的時候,調用newTerm ??? consumer(FreqProxTermsWriterPerField).newTerm(p); } //如果此Token之前曾經出現過,則調用addTerm。 else { ??? intUptos = intPool.buffers[p.intStart >> DocumentsWriter.INT_BLOCK_SHIFT];? } |
(2-5) 添加新Term的過程,consumer(FreqProxTermsWriterPerField).newTerm
| final void newTerm(RawPostingList p0) {? writeProx(FreqProxTermsWriter.PostingList p, int proxCode) { ? termsHashPerField.writeVInt(1, proxCode<<1);//第一個參數所謂1,也就是寫入此文檔在intPool中的第1項——prox信息。為什么左移一位呢?是因為后面可能跟著payload信息,參照索引文件格式(1)中或然跟隨規則。? } |
(2-6) 添加已有Term的過程
| final void addTerm(RawPostingList p0) { ? FreqProxTermsWriter.PostingList p = (FreqProxTermsWriter.PostingList) p0; ? if (docState.docID != p.lastDocID) { ????? //當文檔ID變了的時候,說明上一篇文檔已經處理完畢,可以寫入freq信息了。 ????? //第一個參數所謂0,也就是寫入上一篇文檔在intPool中的第0項——freq信息。至于信息為何這樣寫,參照索引文件格式(1)中的或然跟隨規則,及tis文件格式。 ????? if (1 == p.docFreq)? ????? //當文檔ID不變的時候,說明此文檔中這個詞又出現了一次,從而freq加一,寫入再次出現的位置信息,用差值。? |
(2-7) 結束處理當前域
| consumer(TermsHashPerField).finish(); --> FreqProxTermsWriterPerField.finish() --> TermVectorsTermsWriterPerField.finish() endConsumer(NormsWriterPerField).finish(); --> norms[upto] = Similarity.encodeNorm(norm);//計算標準化因子的值。 --> docIDs[upto] = docState.docID; |
4.2.3、結束處理當前文檔
final DocumentsWriter.DocWriter one = fieldsWriter(StoredFieldsWriterPerThread).finishDocument();
存儲域返回結果:一個寫成了二進制的存儲域緩存。
| one??? StoredFieldsWriter$PerDoc? (id=322)???? |
final DocumentsWriter.DocWriter two = consumer(DocInverterPerThread).finishDocument();
--> NormsWriterPerThread.finishDocument()
--> TermsHashPerThread.finishDocument()
索引域的返回結果為null
4.3、用DocumentsWriter.finishDocument結束本次文檔添加
代碼:
| DocumentsWriter.updateDocument(Document, Analyzer, Term) --> DocumentsWriter.finishDocument(DocumentsWriterThreadState, DocumentsWriter$DocWriter) ????? --> doPause = waitQueue.add(docWriter);//有關waitQueue,在DocumentsWriter的緩存管理中已作解釋 ??????????? --> DocumentsWriter$WaitQueue.writeDocument(DocumentsWriter$DocWriter) ????????????????? --> StoredFieldsWriter$PerDoc.finish() ??????????????????????? --> fieldsWriter.flushDocument(perDoc.numStoredFields, perDoc.fdt);將存儲域信息真正寫入文件。 |
5、DocumentsWriter對CharBlockPool,ByteBlockPool,IntBlockPool的緩存管理
- 在索引的過程中,DocumentsWriter將詞信息(term)存儲在CharBlockPool中,將文檔號(doc ID),詞頻(freq)和位置(prox)信息存儲在ByteBlockPool中。
- 在ByteBlockPool中,緩存是分塊(slice)分配的,塊(slice)是分層次的,層次越高,此層的塊越大,每一層的塊大小事相同的。
- nextLevelArray表示的是當前層的下一層是第幾層,可見第9層的下一層還是第9層,也就是說最高有9層。
- levelSizeArray表示每一層的塊大小,第一層是5個byte,第二層是14個byte以此類推。
| ByteBlockPool類中有以下靜態變量: final static int[] nextLevelArray = {1, 2, 3, 4, 5, 6, 7, 8, 9, 9};? |
- 在ByteBlockPool中分配一個塊的代碼如下:
| ? //此函數僅僅在upto已經是當前塊的結尾的時候方才調用來分配新塊。 public int allocSlice(final byte[] slice, final int upto) { ? //可根據塊的結束符來得到塊所在的層次。從而我們可以推斷,每個層次的塊都有不同的結束符,第1層為16,第2層位17,第3層18,依次類推。 ? final int level = slice[upto] & 15; ? //從數組總得到下一個層次及下一層塊的大小。 ? final int newLevel = nextLevelArray[level]; ? final int newSize = levelSizeArray[newLevel]; ? // 如果當前緩存總量不夠大,則從DocumentsWriter的freeByteBlocks中分配。 ? if (byteUpto > DocumentsWriter.BYTE_BLOCK_SIZE-newSize) ??? nextBuffer(); ? final int newUpto = byteUpto; ? final int offset = newUpto + byteOffset; ? byteUpto += newSize; ? //當分配了新的塊的時候,需要有一個指針從本塊指向下一個塊,使得讀取此信息的時候,能夠在此塊讀取結束后,到下一個塊繼續讀取。 ? //這個指針需要4個byte,在本塊中,除了結束符所占用的一個byte之外,之前的三個byte的數據都應該移到新的塊中,從而四個byte連起來形成一個指針。 ? buffer[newUpto] = slice[upto-3]; ? buffer[newUpto+1] = slice[upto-2]; ? buffer[newUpto+2] = slice[upto-1]; ? // 將偏移量(也即指針)寫入到連同結束符在內的四個byte ? slice[upto-3] = (byte) (offset >>> 24); ? slice[upto-2] = (byte) (offset >>> 16); ? slice[upto-1] = (byte) (offset >>> 8); ? slice[upto] = (byte) offset; ? // 在新的塊的結尾寫入新的結束符,結束符和層次的關系就是(endbyte = 16 | level) ? buffer[byteUpto-1] = (byte) (16|newLevel); ? return newUpto+3; } |
- 在ByteBlockPool中,文檔號和詞頻(freq)信息是應用或然跟隨原則寫到一個塊中去的,而位置信息(prox)是寫入到另一個塊中去的,對于同一個詞,這兩塊的偏移量保存在IntBlockPool中。因而在IntBlockPool中,每一個詞都有兩個int,第0個表示docid + freq在ByteBlockPool中的偏移量,第1個表示prox在ByteBlockPool中的偏移量。
- 在寫入docid + freq信息的時候,調用termsHashPerField.writeVInt(0, p.lastDocCode),第一個參數表示向此詞的第0個偏移量寫入;在寫入prox信息的時候,調用termsHashPerField.writeVInt(1, (proxCode<<1)|1),第一個參數表示向此詞的第1個偏移量寫入。
- CharBlockPool是按照出現的先后順序保存詞(term)
- 在TermsHashPerField中,有一個成員變量RawPostingList[] postingsHash,為每一個term分配了一個RawPostingList,將上述三個緩存關聯起來。
| ? abstract class RawPostingList { ? final static int BYTES_SIZE = DocumentsWriter.OBJECT_HEADER_BYTES + 3*DocumentsWriter.INT_NUM_BYTE; ? int textStart; //此詞在CharBlockPool中的偏移量,由此可以知道是哪個詞。 ? int intStart; //此詞在IntBlockPool中的偏移量,在指向的位置有兩個int,一個是docid + freq信息的偏移量,一個是prox信息的偏移量。 ? int byteStart; //此詞在ByteBlockPool中的起始偏移量 } static final class PostingList extends RawPostingList { ? int docFreq;??????????????????????????????????? // 此詞在此文檔中出現的次數 ? int lastDocID;????????????????????????????????? // 上次處理完的包含此詞的文檔號。 ? int lastDocCode;??????????????????????????????? // 文檔號和詞頻按照或然跟隨原則形成的編碼 ? int lastPosition;?????????????????????????????? // 上次處理完的此詞的位置 } 這里需要說明的是,在IntBlockPool中保存了兩個在ByteBlockPool中的偏移量,而在RawPostingList的byteStart又保存了在ByteBlockPool中的偏移量,這兩者有什么區別呢? 在IntBlockPool中保存的分別指向docid+freq及prox信息在ByteBlockPool中的偏移量是主要用來寫入信息的,它記錄的偏移量是下一個要寫入的docid+freq或者prox在ByteBlockPool中的位置,隨著信息的不斷寫入,IntBlockPool中的兩個偏移量是不斷改變的,始終指向下一個可以寫入的位置。 RawPostingList中byteStart主要是用來讀取docid及prox信息的,當索引過程基本結束,所有的信息都寫入在緩存中了,那么如何找到此詞對應的文檔號偏移量及位置信息,然后寫到索引文件中去呢?自然是通過RawPostingList找到byteStart,然后根據byteStart在ByteBlockPool中找到docid+freq及prox信息的起始位置,從起始位置開始的兩個大小為5的塊,第一個就是docid+freq信息的源頭,第二個就是prox信息的源頭,如果源頭的塊中包含了所有的信息,讀出來就可以了,如果源頭的塊中有指針,則沿著指針尋找到下一個塊,從而可以找到所有的信息。 |
- 下面舉一個實例來表明如果進行緩存管理的:
| 此例子中,準備添加三個文件: file01: common common common common common term file02: common common common common common term term file03: term term term common common common common common file04: term (1) 添加第一篇文檔第一個common
? (2) 添加第四個common
? (3) 添加第五個common
? (4) 添加第一篇文檔,第一個term
? (5) 添加第二篇文檔第一個common
? (6) 添加第二篇文檔第一個term
? (7) 添加第三篇文檔的第一個term
? (8) 添加第三篇文檔第二個term
? (9) 添加第三篇文檔第四個common
? (10) 添加第三篇文檔的第五個common
? (11) 添加第四篇文檔的第一個term
? (12) 最終PostingList, CharBlockPool, IntBlockPool,ByteBlockPool的關系如下圖: ?
? |
?
6、關閉IndexWriter對象
代碼:
| writer.close(); --> IndexWriter.closeInternal(boolean) ????? --> (1) 將索引信息由內存寫入磁盤: flush(waitForMerges, true, true);? |
對段的合并將在后面的章節進行討論,此處僅僅討論將索引信息由寫入磁盤的過程。
代碼:
| IndexWriter.flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes) --> IndexWriter.doFlush(boolean flushDocStores, boolean flushDeletes) ????? --> IndexWriter.doFlushInternal(boolean flushDocStores, boolean flushDeletes) |
將索引寫入磁盤包括以下幾個過程:
- 得到要寫入的段名:String segment = docWriter.getSegment();
- DocumentsWriter將緩存的信息寫入段:docWriter.flush(flushDocStores);
- 生成新的段信息對象:newSegment = new SegmentInfo(segment, flushedDocCount, directory, false, true, docStoreOffset, docStoreSegment, docStoreIsCompoundFile, docWriter.hasProx());
- 準備刪除文檔:docWriter.pushDeletes();
- 生成cfs段:docWriter.createCompoundFile(segment);
- 刪除文檔:applyDeletes();
6.1、得到要寫入的段名
代碼:
| SegmentInfo newSegment = null; final int numDocs = docWriter.getNumDocsInRAM();//文檔總數 String docStoreSegment = docWriter.getDocStoreSegment();//存儲域和詞向量所要要寫入的段名,"_0"??? int docStoreOffset = docWriter.getDocStoreOffset();//存儲域和詞向量要寫入的段中的偏移量 String segment = docWriter.getSegment();//段名,"_0" |
在Lucene的索引文件結構一章做過詳細介紹,存儲域和詞向量可以和索引域存儲在不同的段中。
6.2、將緩存的內容寫入段
代碼:
| flushedDocCount = docWriter.flush(flushDocStores); |
此過程又包含以下兩個階段;
- 按照基本索引鏈關閉存儲域和詞向量信息
- 按照基本索引鏈的結構將索引結果寫入段
6.2.1、按照基本索引鏈關閉存儲域和詞向量信息
代碼為:
| closeDocStore(); flushState.numDocsInStore = 0; |
其主要是根據基本索引鏈結構,關閉存儲域和詞向量信息:
- consumer(DocFieldProcessor).closeDocStore(flushState);
- consumer(DocInverter).closeDocStore(state);
- consumer(TermsHash).closeDocStore(state);
- consumer(FreqProxTermsWriter).closeDocStore(state);
- if (nextTermsHash != null) nextTermsHash.closeDocStore(state);
- consumer(TermVectorsTermsWriter).closeDocStore(state);
- endConsumer(NormsWriter).closeDocStore(state);
- consumer(TermsHash).closeDocStore(state);
- fieldsWriter(StoredFieldsWriter).closeDocStore(state);
- consumer(DocInverter).closeDocStore(state);
其中有實質意義的是以下兩個closeDocStore:
- 詞向量的關閉:TermVectorsTermsWriter.closeDocStore(SegmentWriteState)
| void closeDocStore(final SegmentWriteState state) throws IOException { ???????????????????? if (tvx != null) {???????????? //為不保存詞向量的文檔在tvd文件中寫入零。即便不保存詞向量,在tvx, tvd中也保留一個位置? ??????????? fill(state.numDocsInStore - docWriter.getDocStoreOffset());? ??????????? //關閉tvx, tvf, tvd文件的寫入流? ??????????? tvx.close();? ??????????? tvf.close();? ??????????? tvd.close();? ??????????? tvx = null;? ??????????? //記錄寫入的文件名,為以后生成cfs文件的時候,將這些寫入的文件生成一個統一的cfs文件。? ??????????? state.flushedFiles.add(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_INDEX_EXTENSION);? ??????????? state.flushedFiles.add(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION);? ??????????? state.flushedFiles.add(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);? ??????????? //從DocumentsWriter的成員變量openFiles中刪除,未來可能被IndexFileDeleter刪除? ??????????? docWriter.removeOpenFile(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_INDEX_EXTENSION);? ??????????? docWriter.removeOpenFile(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION);? ??????????? docWriter.removeOpenFile(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);? ??????????? lastDocID = 0;? ??????? }????? } |
- 存儲域的關閉:StoredFieldsWriter.closeDocStore(SegmentWriteState)
| public void closeDocStore(SegmentWriteState state) throws IOException { ??? //關閉fdx, fdt寫入流 ??? fieldsWriter.close();? ??? //記錄寫入的文件名? |
6.2.2、按照基本索引鏈的結構將索引結果寫入段
代碼為:
| consumer(DocFieldProcessor).flush(threads, flushState); ??? //回收fieldHash,以便用于下一輪的索引,為提高效率,索引鏈中的對象是被復用的。 ??? Map> childThreadsAndFields = new HashMap>();? ??? //寫入存儲域 ??? --> fieldsWriter(StoredFieldsWriter).flush(state); ??? //寫入索引域 ??? --> consumer(DocInverter).flush(childThreadsAndFields, state); ??? //寫入域元數據信息,并記錄寫入的文件名,以便以后生成cfs文件 ??? --> final String fileName = state.segmentFileName(IndexFileNames.FIELD_INFOS_EXTENSION); ??? --> fieldInfos.write(state.directory, fileName); ??? --> state.flushedFiles.add(fileName); |
此過程也是按照基本索引鏈來的:
- consumer(DocFieldProcessor).flush(…);
- consumer(DocInverter).flush(…);
- consumer(TermsHash).flush(…);
- consumer(FreqProxTermsWriter).flush(…);
- if (nextTermsHash != null) nextTermsHash.flush(…);
- consumer(TermVectorsTermsWriter).flush(…);
- endConsumer(NormsWriter).flush(…);
- consumer(TermsHash).flush(…);
- fieldsWriter(StoredFieldsWriter).flush(…);
- consumer(DocInverter).flush(…);
6.2.2.1、寫入存儲域
代碼為:
| StoredFieldsWriter.flush(SegmentWriteState state) {? |
從代碼中可以看出,是寫入fdx, fdt兩個文件,但是在上述的closeDocStore已經寫入了,并且把state.numDocsInStore置零,fieldsWriter設為null,在這里其實什么也不做。
6.2.2.2、寫入索引域
代碼為:
| DocInverter.flush(Map>, SegmentWriteState) ??? //寫入倒排表及詞向量信息 ??? --> consumer(TermsHash).flush(childThreadsAndFields, state); ??? //寫入標準化因子 ??? --> endConsumer(NormsWriter).flush(endChildThreadsAndFields, state); |
6.2.2.2.1、寫入倒排表及詞向量信息
代碼為:
| TermsHash.flush(Map>, SegmentWriteState) ??? //寫入倒排表信息 ??? --> consumer(FreqProxTermsWriter).flush(childThreadsAndFields, state); ?? //回收RawPostingList ??? --> shrinkFreePostings(threadsAndFields, state); ??? //寫入詞向量信息 ??? --> if (nextTermsHash != null) nextTermsHash.flush(nextThreadsAndFields, state); ????????? --> consumer(TermVectorsTermsWriter).flush(childThreadsAndFields, state); |
6.2.2.2.1.1、寫入倒排表信息
代碼為:
| FreqProxTermsWriter.flush(Map?????????????????????????????????????? Collection>, SegmentWriteState) ????(a) 所有域按名稱排序,使得同名域能夠一起處理 ??? Collections.sort(allFields); ??? final int numAllFields = allFields.size(); ????(b) 生成倒排表的寫對象 ??? final FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state, fieldInfos); ??? int start = 0; ????(c) 對于每一個域 ??? while(start < numAllFields) { ????????(c-1) 找出所有的同名域 ??????? final FieldInfo fieldInfo = allFields.get(start).fieldInfo; ??????? final String fieldName = fieldInfo.name; ??????? int end = start+1; ??????? while(end < numAllFields && allFields.get(end).fieldInfo.name.equals(fieldName)) ??????????? end++; ??????? FreqProxTermsWriterPerField[] fields = new FreqProxTermsWriterPerField[end-start]; ??????? for(int i=start;i ??????????? fields[i-start] = allFields.get(i); ??????????? fieldInfo.storePayloads |= fields[i-start].hasPayloads; ??????? } ????????(c-2) 將同名域的倒排表添加到文件 ??????? appendPostings(fields, consumer); ???????(c-3) 釋放空間 ??????? for(int i=0;i ??????????? TermsHashPerField perField = fields[i].termsHashPerField; ??????????? int numPostings = perField.numPostings; ??????????? perField.reset(); ??????????? perField.shrinkHash(numPostings); ??????????? fields[i].reset(); ??????? } ??????? start = end; ??? } ????(d) 關閉倒排表的寫對象 ??? consumer.finish(); |
(b) 生成倒排表的寫對象
代碼為:
| public FormatPostingsFieldsWriter(SegmentWriteState state, FieldInfos fieldInfos) throws IOException {? ??? dir = state.directory;? ??? segment = state.segmentName;? ??? totalNumDocs = state.numDocs;? ??? this.fieldInfos = fieldInfos;? ??? //用于寫tii,tis? ??? termsOut = new TermInfosWriter(dir, segment, fieldInfos, state.termIndexInterval);? ??? //用于寫freq, prox的跳表?? ??? skipListWriter = new DefaultSkipListWriter(termsOut.skipInterval, termsOut.maxSkipLevels, totalNumDocs, null, null);? ??? //記錄寫入的文件名,? ??? state.flushedFiles.add(state.segmentFileName(IndexFileNames.TERMS_EXTENSION));? ??? state.flushedFiles.add(state.segmentFileName(IndexFileNames.TERMS_INDEX_EXTENSION));?? ??? //用以上兩個寫對象,按照一定的格式寫入段? ??? termsWriter = new FormatPostingsTermsWriter(state, this);? } |
對象結構如下:
| consumer????FormatPostingsFieldsWriter? (id=119)? //用于處理一個域? ??? dir??? SimpleFSDirectory? (id=126)?? //目標索引文件夾? ??? totalNumDocs??? 8?? //文檔總數? ??? fieldInfos??? FieldInfos? (id=70)? //域元數據信息??? ??? segment??? "_0"?? //段名? ??? skipListWriter??? DefaultSkipListWriter? (id=133)? //freq, prox中跳表的寫對象??? ??? termsOut??? TermInfosWriter? (id=125)? //tii, tis文件的寫對象? ??? termsWriter????FormatPostingsTermsWriter? (id=135)? //用于添加詞(Term)? ??????? currentTerm??? null???? ??????? currentTermStart??? 0???? ??????? fieldInfo??? null???? ??????? freqStart??? 0???? ??????? proxStart??? 0???? ??????? termBuffer??? null???? ??????? termsOut??? TermInfosWriter? (id=125)???? ??????? docsWriter????FormatPostingsDocsWriter? (id=139)? //用于寫入此詞的docid, freq信息? ??????????? df??? 0???? ??????????? fieldInfo??? null???? ??????????? freqStart??? 0???? ??????????? lastDocID??? 0???? ??????????? omitTermFreqAndPositions??? false???? ??????????? out??? SimpleFSDirectory$SimpleFSIndexOutput? (id=144)???? ??????????? skipInterval??? 16???? ??????????? skipListWriter??? DefaultSkipListWriter? (id=133)???? ??????????? storePayloads??? false???? ??????????? termInfo??? TermInfo? (id=151)???? ??????????? totalNumDocs??? 8????? ??????????? posWriter????FormatPostingsPositionsWriter? (id=146)? //用于寫入此詞在此文檔中的位置信息??? ??????????????? lastPayloadLength??? -1???? ??????????????? lastPosition??? 0???? ??????????????? omitTermFreqAndPositions??? false???? ??????????????? out??? SimpleFSDirectory$SimpleFSIndexOutput? (id=157)???? ??????????????? parent??? FormatPostingsDocsWriter? (id=139)???? ??????????????? storePayloads??? false??? |
- FormatPostingsFieldsWriter.addField(FieldInfo field)用于添加索引域信息,其返回FormatPostingsTermsConsumer用于添加詞信息
- FormatPostingsTermsConsumer.addTerm(char[] text, int start)用于添加詞信息,其返回FormatPostingsDocsConsumer用于添加freq信息
- FormatPostingsDocsConsumer.addDoc(int docID, int termDocFreq)用于添加freq信息,其返回FormatPostingsPositionsConsumer用于添加prox信息
- FormatPostingsPositionsConsumer.addPosition(int position, byte[] payload, int payloadOffset, int payloadLength)用于添加prox信息
(c-2) 將同名域的倒排表添加到文件
代碼為:
| ? FreqProxTermsWriter.appendPostings(FreqProxTermsWriterPerField[], FormatPostingsFieldsConsumer) { ??? int numFields = fields.length; ??? final FreqProxFieldMergeState[] mergeStates = new FreqProxFieldMergeState[numFields]; ??? for(int i=0;i ????? FreqProxFieldMergeState fms = mergeStates[i] = new FreqProxFieldMergeState(fields[i]); ????? boolean result = fms.nextTerm(); //對所有的域,取第一個詞(Term) ??? } ????(1) 添加此域,雖然有多個域,但是由于是同名域,只取第一個域的信息即可。返回的是用于添加此域中的詞的對象。 ??? final FormatPostingsTermsConsumer termsConsumer = consumer.addField(fields[0].fieldInfo); ??? FreqProxFieldMergeState[] termStates = new FreqProxFieldMergeState[numFields]; ??? final boolean currentFieldOmitTermFreqAndPositions = fields[0].fieldInfo.omitTermFreqAndPositions; ????(2) 此while循環是遍歷每一個尚有未處理的詞的域,依次按照詞典順序處理這些域所包含的詞。當一個域中的所有的詞都被處理過后,則numFields減一,并從mergeStates數組中移除此域。直到所有的域的所有的詞都處理完畢,方才退出此循環。 ??? while(numFields > 0) { ???????(2-1) 找出所有域中按字典順序的下一個詞??赡芏鄠€同名域中,都包含同一個term,因而要遍歷所有的numFields,得到所有的域里的下一個詞,numToMerge即為有多少個域包含此詞。 ????? termStates[0] = mergeStates[0]; ????? int numToMerge = 1; ????? for(int i=1;i ??????? final char[] text = mergeStates[i].text; ??????? final int textOffset = mergeStates[i].textOffset; ??????? final int cmp = compareText(text, textOffset, termStates[0].text, termStates[0].textOffset); ??????? if (cmp < 0) { ????????? termStates[0] = mergeStates[i]; ????????? numToMerge = 1; ??????? } else if (cmp == 0) ????????? termStates[numToMerge++] = mergeStates[i]; ????? } ????? (2-2) 添加此詞,返回FormatPostingsDocsConsumer用于添加文檔號(doc ID)及詞頻信息(freq) ????? final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(termStates[0].text, termStates[0].textOffset); ??????(2-3) 由于共numToMerge個域都包含此詞,每個詞都有一個鏈表的文檔號表示包含這些詞的文檔。此循環遍歷所有的包含此詞的域,依次按照從小到大的循序添加包含此詞的文檔號及詞頻信息。當一個域中對此詞的所有文檔號都處理過了,則numToMerge減一,并從termStates數組中移除此域。當所有包含此詞的域的所有文檔號都處理過了,則結束此循環。 ????? while(numToMerge > 0) { ????????(2-3-1) 找出最小的文檔號 ??????? FreqProxFieldMergeState minState = termStates[0]; ??????? for(int i=1;i ????????? if (termStates[i].docID < minState.docID) ??????????? minState = termStates[i]; ??????? final int termDocFreq = minState.termFreq; ????????(2-3-2) 添加文檔號及詞頻信息,并形成跳表,返回FormatPostingsPositionsConsumer用于添加位置(prox)信息 ??????? final FormatPostingsPositionsConsumer posConsumer = docConsumer.addDoc(minState.docID, termDocFreq); ????????//ByteSliceReader是用于讀取bytepool中的prox信息的。 ??????? final ByteSliceReader prox = minState.prox; ??????? if (!currentFieldOmitTermFreqAndPositions) { ????????? int position = 0; ??????????(2-3-3) 此循環對包含此詞的文檔,添加位置信息 ????????? for(int j=0;j ??????????? final int code = prox.readVInt(); ??????????? position += code >> 1; ??????????? final int payloadLength; ????????????// 如果此位置有payload信息,則從bytepool中讀出,否則設為零。 ??????????? if ((code & 1) != 0) { ????????????? payloadLength = prox.readVInt(); ????????????? if (payloadBuffer == null || payloadBuffer.length < payloadLength) ??????????????? payloadBuffer = new byte[payloadLength]; ????????????? prox.readBytes(payloadBuffer, 0, payloadLength); ??????????? } else ????????????? payloadLength = 0; ??????????????//添加位置(prox)信息 ????????????? posConsumer.addPosition(position, payloadBuffer, 0, payloadLength); ????????? } ????????? posConsumer.finish(); ??????? } ???????(2-3-4) 判斷退出條件,上次選中的域取得下一個文檔號,如果沒有,則說明此域包含此詞的文檔已經處理完畢,則從termStates中刪除此域,并將numToMerge減一。然后此域取得下一個詞,當循環到(2)的時候,表明此域已經開始處理下一個詞。如果沒有下一個詞,說明此域中的所有的詞都處理完畢,則從mergeStates中刪除此域,并將numFields減一,當numFields為0的時候,循環(2)也就結束了。 ??????? if (!minState.nextDoc()) {//獲得下一個docid ????????? //如果此域包含此詞的文檔已經沒有下一篇docid,則從數組termStates中移除,numToMerge減一。 ????????? int upto = 0; ????????? for(int i=0;i ??????????? if (termStates[i] != minState) ????????????? termStates[upto++] = termStates[i]; ????????? numToMerge--; ????????? //此域則取下一個詞(term),在循環(2)處來參與下一個詞的合并 ????????? if (!minState.nextTerm()) { ??????????? //如果此域沒有下一個詞了,則此域從數組mergeStates中移除,numFields減一。 ??????????? upto = 0; ??????????? for(int i=0;i ????????????? if (mergeStates[i] != minState) ??????????????? mergeStates[upto++] = mergeStates[i]; ??????????? numFields--; ????????? } ??????? } ????? } ??????(2-4) 經過上面的過程,docid和freq信息雖已經寫入段文件,而跳表信息并沒有寫到文件中,而是寫入skip buffer里面了,此處真正寫入文件。并且詞典(tii, tis)也應該寫入文件。 ????? docConsumer(FormatPostingsDocsWriter).finish(); ??? } ??? termsConsumer.finish(); ? } |
(2-3-4) 獲得下一篇文檔號代碼如下:
| ? public boolean nextDoc() {//如何獲取下一個docid ? if (freq.eof()) {//如果bytepool中的freq信息已經讀完 ??? if (p.lastDocCode != -1) {//由上述緩存管理,PostingList里面還存著最后一篇文檔的文檔號及詞頻信息,則將最后一篇文檔返回 ????? docID = p.lastDocID; ????? if (!field.omitTermFreqAndPositions) ??????? termFreq = p.docFreq; ????? p.lastDocCode = -1; ????? return true; ??? } else ????? return false;//沒有下一篇文檔 ? } ? final int code = freq.readVInt();//如果bytepool中的freq信息尚未讀完 ? if (field.omitTermFreqAndPositions) ??? docID += code; ? else { ??? //讀出文檔號及詞頻信息。 ??? docID += code >>> 1; ??? if ((code & 1) != 0) ????? termFreq = 1; ??? else ????? termFreq = freq.readVInt(); ? } ? return true; } |
(2-3-2) 添加文檔號及詞頻信息代碼如下:
| ? FormatPostingsPositionsConsumer FormatPostingsDocsWriter.addDoc(int docID, int termDocFreq) { ??? final int delta = docID - lastDocID; ??? //當文檔數量達到skipInterval倍數的時候,添加跳表項。 ??? if ((++df % skipInterval) == 0) { ????? skipListWriter.setSkipData(lastDocID, storePayloads, posWriter.lastPayloadLength); ????? skipListWriter.bufferSkip(df); ?? } ?? lastDocID = docID; ?? if (omitTermFreqAndPositions) ???? out.writeVInt(delta); ?? else if (1 == termDocFreq) ???? out.writeVInt((delta<<1) | 1); ?? else { ???? //寫入文檔號及詞頻信息。 ???? out.writeVInt(delta<<1); ???? out.writeVInt(termDocFreq); ?? } ?? return posWriter; } |
(2-3-3) 添加位置信息:
| ? FormatPostingsPositionsWriter.addPosition(int position, byte[] payload, int payloadOffset, int payloadLength) { ??? final int delta = position - lastPosition; ??? lastPosition = position; ??? if (storePayloads) { ??????? //保存位置及payload信息 ??????? if (payloadLength != lastPayloadLength) { ??????????? lastPayloadLength = payloadLength; ??????????? out.writeVInt((delta<<1)|1); ??????????? out.writeVInt(payloadLength); ??????? } else ??????????? out.writeVInt(delta << 1); ??????????? if (payloadLength > 0) ??????????????? out.writeBytes(payload, payloadLength); ??? } else ??????? out.writeVInt(delta); } |
(2-4) 將跳表和詞典(tii, tis)寫入文件
| FormatPostingsDocsWriter.finish() { ??? //將跳表緩存寫入文件 ??? long skipPointer = skipListWriter.writeSkip(out); ??? if (df > 0) { ????? //將詞典(terminfo)寫入tii,tis文件 ????? parent.termsOut(TermInfosWriter).add(fieldInfo.number, utf8.result, utf8.length, termInfo); ??? } ? } |
將跳表緩存寫入文件:
| DefaultSkipListWriter(MultiLevelSkipListWriter).writeSkip(IndexOutput)? { ??? long skipPointer = output.getFilePointer(); ??? if (skipBuffer == null || skipBuffer.length == 0) return skipPointer; ??? //正如我們在索引文件格式中分析的那樣, 高層在前,低層在后,除最低層外,其他的層都有長度保存。 ??? for (int level = numberOfSkipLevels - 1; level > 0; level--) { ????? long length = skipBuffer[level].getFilePointer(); ????? if (length > 0) { ??????? output.writeVLong(length); ??????? skipBuffer[level].writeTo(output); ????? } ??? } ??? //寫入最低層 ??? skipBuffer[0].writeTo(output); ??? return skipPointer; ? } |
將詞典(terminfo)寫入tii,tis文件:
- tii文件是tis文件的類似跳表的東西,是在tis文件中每隔indexInterval個詞提取出一個詞放在tii文件中,以便很快的查找到詞。
- 因而TermInfosWriter類型中有一個成員變量other也是TermInfosWriter類型的,還有一個成員變量isIndex來表示此對象是用來寫tii文件的還是用來寫tis文件的。
- 如果一個TermInfosWriter對象的isIndex=false則,它是用來寫tis文件的,它的other指向的是用來寫tii文件的TermInfosWriter對象
- 如果一個TermInfosWriter對象的isIndex=true則,它是用來寫tii文件的,它的other指向的是用來寫tis文件的TermInfosWriter對象
| TermInfosWriter.add (int fieldNumber, byte[] termBytes, int termBytesLength, TermInfo ti) { ??? //如果詞的總數是indexInterval的倍數,則應該寫入tii文件 ??? if (!isIndex && size % indexInterval == 0) ????? other.add(lastFieldNumber, lastTermBytes, lastTermBytesLength, lastTi); ??? //將詞寫入tis文件 ??? writeTerm(fieldNumber, termBytes, termBytesLength); ??? output.writeVInt(ti.docFreq);?????????????????????? // write doc freq ??? output.writeVLong(ti.freqPointer - lastTi.freqPointer); // write pointers ??? output.writeVLong(ti.proxPointer - lastTi.proxPointer); ??? if (ti.docFreq >= skipInterval) { ????? output.writeVInt(ti.skipOffset); ??? } ??? if (isIndex) { ????? output.writeVLong(other.output.getFilePointer() - lastIndexPointer); ????? lastIndexPointer = other.output.getFilePointer(); // write pointer ??? } ??? lastFieldNumber = fieldNumber; ??? lastTi.set(ti); ??? size++; ? } |
6.2.2.2.1.2、寫入詞向量信息
代碼為:
| TermVectorsTermsWriter.flush (Map>? ??? if (tvx != null) { ????? if (state.numDocsInStore > 0) ??????? fill(state.numDocsInStore - docWriter.getDocStoreOffset()); ????? tvx.flush(); ????? tvd.flush(); ????? tvf.flush(); ??? } ??? for (Map.Entry> entry :? ????? for (final TermsHashConsumerPerField field : entry.getValue() ) { ??????? TermVectorsTermsWriterPerField perField = (TermVectorsTermsWriterPerField) field; ??????? perField.termsHashPerField.reset(); ??????? perField.shrinkHash(); ????? } ????? TermVectorsTermsWriterPerThread perThread = (TermVectorsTermsWriterPerThread) entry.getKey(); ????? perThread.termsHashPerThread.reset(true); ??? } ? } |
從代碼中可以看出,是寫入tvx, tvd, tvf三個文件,但是在上述的closeDocStore已經寫入了,并且把tvx設為null,在這里其實什么也不做,僅僅是清空postingsHash,以便進行下一輪索引時重用此對象。
6.2.2.2.2、寫入標準化因子
代碼為:
| NormsWriter.flush(Map> threadsAndFields, ?????????????????????????? SegmentWriteState state) { ??? final Map> byField = new HashMap>(); ??? for (final Map.Entry> entry :? ???? //遍歷所有的域,將同名域對應的NormsWriterPerField放到同一個鏈表中。 ????? final Collection fields = entry.getValue(); ????? final Iterator fieldsIt = fields.iterator(); ????? while (fieldsIt.hasNext()) { ??????? final NormsWriterPerField perField = (NormsWriterPerField) fieldsIt.next(); ??????? List l = byField.get(perField.fieldInfo); ??????? if (l == null) { ??????????? l = new ArrayList(); ??????????? byField.put(perField.fieldInfo, l); ??????? } ??????? l.add(perField); ??? } ??? //記錄寫入的文件名,方便以后生成cfs文件。 ??? final String normsFileName = state.segmentName + "." + IndexFileNames.NORMS_EXTENSION; ??? state.flushedFiles.add(normsFileName); ??? IndexOutput normsOut = state.directory.createOutput(normsFileName); ??? try { ????? //寫入nrm文件頭 ????? normsOut.writeBytes(SegmentMerger.NORMS_HEADER, 0, SegmentMerger.NORMS_HEADER.length); ????? final int numField = fieldInfos.size(); ????? int normCount = 0; ????? //對每一個域進行處理 ????? for(int fieldNumber=0;fieldNumber ??????? final FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber); ??????? //得到同名域的鏈表 ??????? List toMerge = byField.get(fieldInfo); ??????? int upto = 0; ??????? if (toMerge != null) { ????????? final int numFields = toMerge.size(); ????????? normCount++; ????????? final NormsWriterPerField[] fields = new NormsWriterPerField[numFields]; ????????? int[] uptos = new int[numFields]; ????????? for(int j=0;j ??????????? fields[j] = toMerge.get(j); ????????? int numLeft = numFields; ????????? //處理同名的多個域 ????????? while(numLeft > 0) { ??????????? //得到所有的同名域中最小的文檔號 ??????????? int minLoc = 0; ??????????? int minDocID = fields[0].docIDs[uptos[0]]; ??????????? for(int j=1;j ????????????? final int docID = fields[j].docIDs[uptos[j]]; ????????????? if (docID < minDocID) { ??????????????? minDocID = docID; ??????????????? minLoc = j; ????????????? } ??????????? } ??????????? // 在nrm文件中,每一個文件都有一個位置,沒有設定的,放入默認值 ??????????? for (;upto<minDocID;upto++) ????????????? normsOut.writeByte(defaultNorm); ??????????? //寫入當前的nrm值 ??????????? normsOut.writeByte(fields[minLoc].norms[uptos[minLoc]]); ??????????? (uptos[minLoc])++; ??????????? upto++; ??????????? //如果當前域的文檔已經處理完畢,則numLeft減一,歸零時推出循環。 ??????????? if (uptos[minLoc] == fields[minLoc].upto) { ????????????? fields[minLoc].reset(); ????????????? if (minLoc != numLeft-1) { ??????????????? fields[minLoc] = fields[numLeft-1]; ??????????????? uptos[minLoc] = uptos[numLeft-1]; ????????????? } ????????????? numLeft--; ??????????? } ????????? } ????????? // 對所有的未設定nrm值的文檔寫入默認值。 ????????? for(;upto ??????????? normsOut.writeByte(defaultNorm); ??????? } else if (fieldInfo.isIndexed && !fieldInfo.omitNorms) { ????????? normCount++; ????????? // Fill entire field with default norm: ????????? for(;upto ??????????? normsOut.writeByte(defaultNorm); ??????? } ????? } ??? } finally { ????? normsOut.close(); ??? } ? } |
6.2.2.3、寫入域元數據
代碼為:
| FieldInfos.write(IndexOutput) { ??? output.writeVInt(CURRENT_FORMAT); ??? output.writeVInt(size()); ??? for (int i = 0; i < size(); i++) { ????? FieldInfo fi = fieldInfo(i); ????? byte bits = 0x0; ????? if (fi.isIndexed) bits |= IS_INDEXED; ????? if (fi.storeTermVector) bits |= STORE_TERMVECTOR; ????? if (fi.storePositionWithTermVector) bits |= STORE_POSITIONS_WITH_TERMVECTOR; ????? if (fi.storeOffsetWithTermVector) bits |= STORE_OFFSET_WITH_TERMVECTOR; ????? if (fi.omitNorms) bits |= OMIT_NORMS; ????? if (fi.storePayloads) bits |= STORE_PAYLOADS; ????? if (fi.omitTermFreqAndPositions) bits |= OMIT_TERM_FREQ_AND_POSITIONS; ????? output.writeString(fi.name); ????? output.writeByte(bits); ??? } } |
此處基本就是按照fnm文件的格式寫入的。
6.3、生成新的段信息對象
代碼:
| newSegment = new SegmentInfo(segment, flushedDocCount, directory, false, true, docStoreOffset, docStoreSegment, docStoreIsCompoundFile, docWriter.hasProx()); segmentInfos.add(newSegment); |
?
6.4、準備刪除文檔
代碼:
| docWriter.pushDeletes(); ??? --> deletesFlushed.update(deletesInRAM); |
此處將deletesInRAM全部加到deletesFlushed中,并把deletesInRAM清空。原因上面已經闡明。
6.5、生成cfs段
代碼:
| docWriter.createCompoundFile(segment); newSegment.setUseCompoundFile(true); |
代碼為:
| DocumentsWriter.createCompoundFile(String segment) { ??? CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION); ??? //將上述中記錄的文檔名全部加入cfs段的寫對象。 ??? for (final String flushedFile : flushState.flushedFiles) ????? cfsWriter.addFile(flushedFile); ??? cfsWriter.close(); ? } |
6.6、刪除文檔
代碼:
| applyDeletes(); |
代碼為:
| boolean applyDeletes(SegmentInfos infos) { ? if (!hasDeletes()) ??? return false; ? final int infosEnd = infos.size(); ? int docStart = 0; ? boolean any = false; ? for (int i = 0; i < infosEnd; i++) { ??? assert infos.info(i).dir == directory; ??? SegmentReader reader = writer.readerPool.get(infos.info(i), false); ??? try { ????? any |= applyDeletes(reader, docStart); ????? docStart += reader.maxDoc(); ??? } finally { ????? writer.readerPool.release(reader); ??? } ? } ? deletesFlushed.clear(); ? return any; } |
- Lucene刪除文檔可以用reader,也可以用writer,但是歸根結底還是用reader來刪除的。
- reader的刪除有以下三種方式:
- 按照詞刪除,刪除所有包含此詞的文檔。
- 按照文檔號刪除。
- 按照查詢對象刪除,刪除所有滿足此查詢的文檔。
- 但是這三種方式歸根結底還是按照文檔號刪除,也就是寫.del文件的過程。
| ? private final synchronized boolean applyDeletes(IndexReader reader, int docIDStart) ? throws CorruptIndexException, IOException { ? final int docEnd = docIDStart + reader.maxDoc(); ? boolean any = false; ? //按照詞刪除,刪除所有包含此詞的文檔。 ? TermDocs docs = reader.termDocs(); ? try { ??? for (Entry entry: deletesFlushed.terms.entrySet()) { ????? Term term = entry.getKey(); ????? docs.seek(term); ????? int limit = entry.getValue().getNum(); ????? while (docs.next()) { ??????? int docID = docs.doc(); ??????? if (docIDStart+docID >= limit) ????????? break; ??????? reader.deleteDocument(docID); ??????? any = true; ????? } ??? } ? } finally { ??? docs.close(); ? } ? //按照文檔號刪除。 ? for (Integer docIdInt : deletesFlushed.docIDs) { ??? int docID = docIdInt.intValue(); ??? if (docID >= docIDStart && docID < docEnd) { ????? reader.deleteDocument(docID-docIDStart); ????? any = true; ??? } ? } ? //按照查詢對象刪除,刪除所有滿足此查詢的文檔。 ? IndexSearcher searcher = new IndexSearcher(reader); ? for (Entry entry : deletesFlushed.queries.entrySet()) { ??? Query query = entry.getKey(); ??? int limit = entry.getValue().intValue(); ??? Weight weight = query.weight(searcher); ??? Scorer scorer = weight.scorer(reader, true, false); ??? if (scorer != null) { ????? while(true)? { ??????? int doc = scorer.nextDoc(); ??????? if (((long) docIDStart) + doc >= limit) ????????? break; ??????? reader.deleteDocument(doc); ??????? any = true; ????? } ??? } ? } ? searcher.close(); ? return any; } |
更多0
總結
以上是生活随笔為你收集整理的Lucene学习总结之四:Lucene索引过程分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Lucene学习总结之三:Lucene的
- 下一篇: Lucene学习总结之五:Lucene段