企业搜索引擎开发之连接器connector(二十九)
在哪里調用監控器管理對象snapshotRepositoryMonitorManager的start方法及stop方法,然后又在哪里調用CheckpointAndChangeQueue對象的resume方法獲取List<CheckpointAndChange> guaranteedChanges集合
下面跟蹤到DiffingConnectorTraversalManager類的相關方法,在該類實現的方法中,調用了監控器管理對象snapshotRepositoryMonitorManager的相關方法實現對其操作
private final DocumentSnapshotRepositoryMonitorManagersnapshotRepositoryMonitorManager;private final TraversalContextManager traversalContextManager;/*** Boolean to mark TraversalManager as invalid.* It's possible for Connector Manager to keep a reference to* an outdated TraversalManager (after a new one has been given* previous TraversalManagers are invalid to use).*/private boolean isActive = true;/*** Creates a {@link DiffingConnectorTraversalManager}.** @param snapshotRepositoryMonitorManager the* {@link DocumentSnapshotRepositoryMonitorManager}* for use accessing a {@link ChangeSource}* @param traversalContextManager {@link TraversalContextManager}* that holds the current {@link TraversalContext}*/public DiffingConnectorTraversalManager(DocumentSnapshotRepositoryMonitorManager snapshotRepositoryMonitorManager,TraversalContextManager traversalContextManager) {this.snapshotRepositoryMonitorManager = snapshotRepositoryMonitorManager;this.traversalContextManager = traversalContextManager;}resumeTraversal方法啟動監視器管理對象snapshotRepositoryMonitorManager,并返回DocumentList集合
/* @Override */public synchronized DocumentList resumeTraversal(String checkpoint)throws RepositoryException {/* Exhaustive list of method's use:resumeTraversal(null) from startTraversal:monitors get started from nullresumeTraversal(null) from Connector Manager sometime after startTraversal:monitors already started from previous resumeTraversal callresumeTraversal(cp) from Connector Manager without a startTraversal:means there was a shutdown or turn offmonitors get started from cp; should use stateresumeTraversal(cp) from Connector Manager sometime after some uses:is most common case; roll*/if (isActive()) {//啟動snapshotRepositoryMonitorManagerif (!snapshotRepositoryMonitorManager.isRunning()) {snapshotRepositoryMonitorManager.start(checkpoint);}return newDocumentList(checkpoint);} else {throw new RepositoryException("Inactive FileTraversalManager referanced.");}}進一步調用newDocumentList方法返回DocumentList集合
private DocumentList newDocumentList(String checkpoint)throws RepositoryException {//獲取隊列 CheckpointAndChangeQueue(隊列 CheckpointAndChangeQueue只由snapshotRepositoryMonitorManager引用)CheckpointAndChangeQueue checkpointAndChangeQueue =snapshotRepositoryMonitorManager.getCheckpointAndChangeQueue();try {DiffingConnectorDocumentList documentList = new DiffingConnectorDocumentList(checkpointAndChangeQueue,CheckpointAndChangeQueue.initializeCheckpointStringIfNull(checkpoint));//Map<String, MonitorCheckpoint>Map<String, MonitorCheckpoint> guaranteesMade =checkpointAndChangeQueue.getMonitorRestartPoints();snapshotRepositoryMonitorManager.acceptGuarantees(guaranteesMade);return new ConfirmActiveDocumentList(documentList);} catch (IOException e) {throw new RepositoryException("Failure when making DocumentList.", e);}}DiffingConnectorDocumentList documentList對象的構造函數里面封裝了CheckpointAndChangeQueue checkpointAndChangeQueue隊列集合
DiffingConnectorDocumentList 類完整實現如下:
/*** An implementation of {@link DocumentList} for the {@link DiffingConnector}.** @since 2.8*/ public class DiffingConnectorDocumentList implements DocumentList {private final Iterator<CheckpointAndChange> checkpointAndChangeIterator;private String checkpoint;/*** Creates a document list that returns a batch of documents from the provided* {@link CheckpointAndChangeQueue}.** @param queue a CheckpointAndChangeQueue containing document changes* @param checkpoint point into the change queue after which to start* returning documents* @throws IOException if persisting fails*/public DiffingConnectorDocumentList(CheckpointAndChangeQueue queue,String checkpoint) throws IOException {//CheckpointAndChangeQueue queued的resume方法獲取List<CheckpointAndChange>//本DocumentList批次數據已經加載于內存List<CheckpointAndChange> guaranteedChanges = queue.resume(checkpoint);checkpointAndChangeIterator = guaranteedChanges.iterator();this.checkpoint = checkpoint;}/*** 調用方獲取該狀態并持久化,迭代完畢即為最后的checkpoint*//* @Override */public String checkpoint() {return checkpoint;}/* @Override */public Document nextDocument() throws RepositoryException {if (checkpointAndChangeIterator.hasNext()) {CheckpointAndChange checkpointAndChange =checkpointAndChangeIterator.next();//更新checkpointcheckpoint = checkpointAndChange.getCheckpoint().toString();return checkpointAndChange.getChange().getDocumentHandle().getDocument();} else {return null;}} }在其構造方法中調用參數CheckpointAndChangeQueue queue的resume方法獲取List<CheckpointAndChange> guaranteedChanges,在其nextDocument()方法中通過迭代獲取CheckpointAndChange checkpointAndChange對象,同時更新checkpoint狀態標識
最后獲取與監視器關聯的MonitorCheckpoint對象映射
//Map<String, MonitorCheckpoint>Map<String, MonitorCheckpoint> guaranteesMade =checkpointAndChangeQueue.getMonitorRestartPoints();然后調用監控器管理對象snapshotRepositoryMonitorManager的acceptGuarantees方法,相應的監視器對象接收并確認MonitorCheckpoint對象
/*** 監視器管理對象收到CheckpointAndChangeQueue對象反饋,分發給對應的監視器處理MonitorCheckpoint*//* @Override */public void acceptGuarantees(Map<String, MonitorCheckpoint> guarantees) {for (Map.Entry<String, MonitorCheckpoint> entry : guarantees.entrySet()) {String monitorName = entry.getKey();MonitorCheckpoint checkpoint = entry.getValue();DocumentSnapshotRepositoryMonitor monitor = fileSystemMonitorsByName.get(monitorName);if (monitor != null) {// Signal is asynch. Let monitor figure out how to use.//回調 monitor.acceptGuarantee(checkpoint);}}}與倉庫對象相對應的具體監視器接收確認
/*** 監視器收到反饋 [MonitorCheckpoint接收確認]* @param cp*/// Public for DocumentSnapshotRepositoryMonitorTest @VisibleForTestingpublic void acceptGuarantee(MonitorCheckpoint cp) {snapshotStore.acceptGuarantee(cp);guaranteeCheckpoint = cp;}倉庫對應的存儲對象處于處理鏈的末端
/*** 反饋MonitorCheckpoint處理* @param cp*/void acceptGuarantee(MonitorCheckpoint cp) {long readSnapshotNumber = cp.getSnapshotNumber();if (readSnapshotNumber < 0) {throw new IllegalArgumentException("Received invalid snapshot in: " + cp);}if (oldestSnapshotToKeep > readSnapshotNumber) {LOG.warning("Received an older snapshot than " + oldestSnapshotToKeep + ": " + cp);} else {oldestSnapshotToKeep = readSnapshotNumber;}}---------------------------------------------------------------------------
本系列企業搜索引擎開發之連接器connector系本人原創
轉載請注明出處 博客園 刺猬的溫馴
本人郵箱:?chenying998179@163#com (#改為.)
本文鏈接?http://www.cnblogs.com/chenying99/p/3789650.html?
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的企业搜索引擎开发之连接器connector(二十九)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: poj1753Flip Game(dfs
- 下一篇: 刘强东写在上市之际:京东要成为一家世界级