(转载)Nutch 2.0 之 抓取流程简单分析
生活随笔
收集整理的這篇文章主要介紹了
(转载)Nutch 2.0 之 抓取流程简单分析
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
Nutch 2.0 抓取流程介紹
---------------------
InjectorJob => GeneratorJob => FetcherJob => ParserJob => DbUpdaterJob => SolrIndexerJob
InjectorJob : 從文件中得到一批種子網(wǎng)頁,把它們放到抓取數(shù)據(jù)庫中去
GeneratorJob: 從抓取數(shù)據(jù)庫中產(chǎn)生要抓取的頁面放到抓取隊列中去
FetcherJob: ? 對抓取隊列中的網(wǎng)頁進行抓取,在reducer中使用了生產(chǎn)/消費者模型
ParserJob: ? ?對抓取完成的網(wǎng)頁進行解析,產(chǎn)生一些新的鏈接與網(wǎng)頁內(nèi)容的解析結(jié)果
DbUpdaterJob: 把新產(chǎn)生的鏈接更新到抓取數(shù)據(jù)庫中去
SolrIndexerJob: 對解析后的內(nèi)容進行索引建立
? 下面是InjectorJob的啟動函數(shù),代碼如下
[java]?view plain?copy ?public?Map<String,Object>?run(Map<String,Object>?args)?throws?Exception?{?? ???getConf().setLong("injector.current.time",?System.currentTimeMillis());?? ???Path?input;?? ???Object?path?=?args.get(Nutch.ARG_SEEDDIR);?? ???if?(path?instanceof?Path)?{?? ?????input?=?(Path)path;?? ???}?else?{?? ?????input?=?new?Path(path.toString());?? ???}?? ???numJobs?=?2;?? ???currentJobNum?=?0;?? ???status.put(Nutch.STAT_PHASE,?"convert?input");?? ???currentJob?=?new?NutchJob(getConf(),?"inject-p1?"?+?input);?? ???FileInputFormat.addInputPath(currentJob,?input);?? //?mapper方法,從文件中解析出url,寫入數(shù)據(jù)庫?? ???currentJob.setMapperClass(UrlMapper.class);?? ???currentJob.setMapOutputKeyClass(String.class);?? //?map?的輸出為WebPage,它是用Gora?compile生成的,可以通過Gora把它映射到不同的數(shù)據(jù)庫中,?? ???currentJob.setMapOutputValueClass(WebPage.class);?? //?輸出到GoraOutputFormat?? ???currentJob.setOutputFormatClass(GoraOutputFormat.class);?? ???DataStore<String,?WebPage>?store?=?StorageUtils.createWebStore(currentJob.getConfiguration(),?? ???????String.class,?WebPage.class);?? ???GoraOutputFormat.setOutput(currentJob,?store,?true);?? ???currentJob.setReducerClass(Reducer.class);?? ???currentJob.setNumReduceTasks(0);?? ???currentJob.waitForCompletion(true);?? ???ToolUtil.recordJobStatus(null,?currentJob,?results);?? ???currentJob?=?null;?? ?? ?? ???status.put(Nutch.STAT_PHASE,?"merge?input?with?db");?? ???status.put(Nutch.STAT_PROGRESS,?0.5f);?? ???currentJobNum?=?1;?? ???currentJob?=?new?NutchJob(getConf(),?"inject-p2?"?+?input);?? ???StorageUtils.initMapperJob(currentJob,?FIELDS,?String.class,?? ???????WebPage.class,?InjectorMapper.class);?? ???currentJob.setNumReduceTasks(0);?? ???ToolUtil.recordJobStatus(null,?currentJob,?results);?? ???status.put(Nutch.STAT_PROGRESS,?1.0f);?? ???return?results;?? ?}??
? ?
? ?因為InjectorJob擴展自NutchTool,實現(xiàn)了它的run方法。
? ?我們可以看到,這里有兩個MR任務,第一個主要是從文件中讀入種子網(wǎng)頁,寫到DataStore數(shù)據(jù)庫中,第二個MR任務主要是對數(shù)據(jù)庫中的WebPage對象做一個分數(shù)與抓取間隔的設置。它使用到一個initMapperJob方法,代碼如下
[java]?view plain?copy public?static?<K,?V>?void?initMapperJob(Job?job,?? ????Collection<WebPage.Field>?fields,?? ????Class<K>?outKeyClass,?Class<V>?outValueClass,?? ????Class<??extends?GoraMapper<String,?WebPage,?K,?V>>?mapperClass,?? ????Class<??extends?Partitioner<K,?V>>?partitionerClass,?boolean?reuseObjects)?? throws?ClassNotFoundException,?IOException?{?? ?//?這里是生成一個DataStore的抽象,這里的DataStore用戶可以不同的模塊,如Hbase,MySql等?? ??DataStore<String,?WebPage>?store?=?createWebStore(job.getConfiguration(),?? ??????String.class,?WebPage.class);?? ??if?(store==null)?throw?new?RuntimeException("Could?not?create?datastore");?? ??Query<String,?WebPage>?query?=?store.newQuery();?? ??query.setFields(toStringArray(fields));?? ??GoraMapper.initMapperJob(job,?query,?store,?? ??????outKeyClass,?outValueClass,?mapperClass,?partitionerClass,?reuseObjects);?? ??GoraOutputFormat.setOutput(job,?store,?true);?? }??
? ?
? ?下面是GeneratorJob的run方法代碼
[java]?view plain?copy ?public?Map<String,Object>?run(Map<String,Object>?args)?throws?Exception?{?? ???//?map?to?inverted?subset?due?for?fetch,?sort?by?score?? ???Long?topN?=?(Long)args.get(Nutch.ARG_TOPN);?? ???Long?curTime?=?(Long)args.get(Nutch.ARG_CURTIME);?? ???if?(curTime?==?null)?{?? ?????curTime?=?System.currentTimeMillis();?? ???}?? ???Boolean?filter?=?(Boolean)args.get(Nutch.ARG_FILTER);?? ???Boolean?norm?=?(Boolean)args.get(Nutch.ARG_NORMALIZE);?? ???//?map?to?inverted?subset?due?for?fetch,?sort?by?score?? ???getConf().setLong(GENERATOR_CUR_TIME,?curTime);?? ???if?(topN?!=?null)?? ?????getConf().setLong(GENERATOR_TOP_N,?topN);?? ???if?(filter?!=?null)?? ?????getConf().setBoolean(GENERATOR_FILTER,?filter);?? ???int?randomSeed?=?Math.abs(new?Random().nextInt());?? ???batchId?=?(curTime?/?1000)?+?"-"?+?randomSeed;?? ???getConf().setInt(GENERATOR_RANDOM_SEED,?randomSeed);?? ???getConf().set(BATCH_ID,?batchId);?? ???getConf().setLong(Nutch.GENERATE_TIME_KEY,?System.currentTimeMillis());?? ???if?(norm?!=?null)?? ?????getConf().setBoolean(GENERATOR_NORMALISE,?norm);?? ???String?mode?=?getConf().get(GENERATOR_COUNT_MODE,?GENERATOR_COUNT_VALUE_HOST);?? ???if?(GENERATOR_COUNT_VALUE_HOST.equalsIgnoreCase(mode))?{?? ?????getConf().set(URLPartitioner.PARTITION_MODE_KEY,?URLPartitioner.PARTITION_MODE_HOST);?? ???}?else?if?(GENERATOR_COUNT_VALUE_DOMAIN.equalsIgnoreCase(mode))?{?? ???????getConf().set(URLPartitioner.PARTITION_MODE_KEY,?URLPartitioner.PARTITION_MODE_DOMAIN);?? ???}?else?{?? ?????LOG.warn("Unknown?generator.max.count?mode?'"?+?mode?+?"',?using?mode="?+?GENERATOR_COUNT_VALUE_HOST);?? ?????getConf().set(GENERATOR_COUNT_MODE,?GENERATOR_COUNT_VALUE_HOST);?? ?????getConf().set(URLPartitioner.PARTITION_MODE_KEY,?URLPartitioner.PARTITION_MODE_HOST);?? ???}?? ?? ?? //?上面是設置一些要使用要的常量?? ???numJobs?=?1;?? ???currentJobNum?=?0;?? //?生成一個job?? ???currentJob?=?new?NutchJob(getConf(),?"generate:?"?+?batchId);?? //?初始化Map,這里的Map的輸出類型為<SelectorEntry,WebPage>,?使用?SelectorEntryPartitioner來進行切分?? ???StorageUtils.initMapperJob(currentJob,?FIELDS,?SelectorEntry.class,?? ???????WebPage.class,?GeneratorMapper.class,?SelectorEntryPartitioner.class,?true);?? //?初始化Reducer,?使用了generatorReducer來進行聚合處理?? ???StorageUtils.initReducerJob(currentJob,?GeneratorReducer.class);?? ???currentJob.waitForCompletion(true);?? ???ToolUtil.recordJobStatus(null,?currentJob,?results);?? ???results.put(BATCH_ID,?batchId);?? ???return?results;?? ?}?? ????
? 好像比原來的Generate簡單很多,這里的GeneratorMapper完成的工作與之前的版本是一樣的,如url的正規(guī)化,過濾,分數(shù)的設置,而GeneratorReducer完成的工作也和之前差不多,只是輸出變成了DataStore,如HBase,完成以后會每個WebPage進行打標記,表示當前WebPage所完成的一個狀態(tài)。
? ?使用了Gora的 fetcher比原來簡單了很多,下面是其run的源代碼
[java]?view plain?copy ?public?Map<String,Object>?run(Map<String,Object>?args)?throws?Exception?{?? ???checkConfiguration();?? ???String?batchId?=?(String)args.get(Nutch.ARG_BATCH);?? ???Integer?threads?=?(Integer)args.get(Nutch.ARG_THREADS);?? ???Boolean?shouldResume?=?(Boolean)args.get(Nutch.ARG_RESUME);?? ???Integer?numTasks?=?(Integer)args.get(Nutch.ARG_NUMTASKS);?? ?? ???if?(threads?!=?null?&&?threads?>?0)?{?? ?????getConf().setInt(THREADS_KEY,?threads);?? ???}?? ???if?(batchId?==?null)?{?? ?????batchId?=?Nutch.ALL_BATCH_ID_STR;?? ???}?? ???getConf().set(GeneratorJob.BATCH_ID,?batchId);?? ???if?(shouldResume?!=?null)?{?? ?????getConf().setBoolean(RESUME_KEY,?shouldResume);?? ???}?? ????? ???LOG.info("FetcherJob?:?timelimit?set?for?:?"?+?getConf().getLong("fetcher.timelimit",?-1));?? ???LOG.info("FetcherJob:?threads:?"?+?getConf().getInt(THREADS_KEY,?10));?? ???LOG.info("FetcherJob:?parsing:?"?+?getConf().getBoolean(PARSE_KEY,?false));?? ???LOG.info("FetcherJob:?resuming:?"?+?getConf().getBoolean(RESUME_KEY,?false));?? ?? ?? ???//?set?the?actual?time?for?the?timelimit?relative?? ???//?to?the?beginning?of?the?whole?job?and?not?of?a?specific?task?? ???//?otherwise?it?keeps?trying?again?if?a?task?fails?? ???long?timelimit?=?getConf().getLong("fetcher.timelimit.mins",?-1);?? ???if?(timelimit?!=?-1)?{?? ?????timelimit?=?System.currentTimeMillis()?+?(timelimit?*?60?*?1000);?? ?????getConf().setLong("fetcher.timelimit",?timelimit);?? ???}?? ???numJobs?=?1;?? ???currentJob?=?new?NutchJob(getConf(),?"fetch");?? //?得到它過濾的字段?? ???Collection<WebPage.Field>?fields?=?getFields(currentJob);?? //?初始化mapper,?其輸出為<IntWritable,FetchEntry>?? //?在mapper中輸入數(shù)據(jù)進行過濾,主要是對不是同一個batch與已經(jīng)fetch的數(shù)據(jù)進行過濾?? ???StorageUtils.initMapperJob(currentJob,?fields,?IntWritable.class,?? ???????FetchEntry.class,?FetcherMapper.class,?FetchEntryPartitioner.class,?false);?? //?初始化reducer?? ???StorageUtils.initReducerJob(currentJob,?FetcherReducer.class);?? ???if?(numTasks?==?null?||?numTasks?<?1)?{?? ?????currentJob.setNumReduceTasks(currentJob.getConfiguration().getInt("mapred.map.tasks",?? ?????????currentJob.getNumReduceTasks()));?? ???}?else?{?? ?????currentJob.setNumReduceTasks(numTasks);?? ???}?? ???currentJob.waitForCompletion(true);?? ???ToolUtil.recordJobStatus(null,?currentJob,?results);?? ???return?results;?? ?}??
? 這里把原來在Mapper中使用到的生產(chǎn)者與消費者模型用到了reducer中,重寫了reducer的run方法,在其中打開多個抓取線程,對url進行多線程抓取,有興趣可以看一下FetcherReducer這個類。
? ?下面是ParserJob.java中的run代碼
[java]?view plain?copy ?@Override?? ?public?Map<String,Object>?run(Map<String,Object>?args)?throws?Exception?{?? ???String?batchId?=?(String)args.get(Nutch.ARG_BATCH);?? ???Boolean?shouldResume?=?(Boolean)args.get(Nutch.ARG_RESUME);?? ???Boolean?force?=?(Boolean)args.get(Nutch.ARG_FORCE);?? ????? ???if?(batchId?!=?null)?{?? ?????getConf().set(GeneratorJob.BATCH_ID,?batchId);?? ???}?? ???if?(shouldResume?!=?null)?{?? ?????getConf().setBoolean(RESUME_KEY,?shouldResume);?? ???}?? ???if?(force?!=?null)?{?? ?????getConf().setBoolean(FORCE_KEY,?force);?? ???}?? ???LOG.info("ParserJob:?resuming:\t"?+?getConf().getBoolean(RESUME_KEY,?false));?? ???LOG.info("ParserJob:?forced?reparse:\t"?+?getConf().getBoolean(FORCE_KEY,?false));?? ???if?(batchId?==?null?||?batchId.equals(Nutch.ALL_BATCH_ID_STR))?{?? ?????LOG.info("ParserJob:?parsing?all");?? ???}?else?{?? ?????LOG.info("ParserJob:?batchId:\t"?+?batchId);?? ???}?? ???currentJob?=?new?NutchJob(getConf(),?"parse");?? ????? ???Collection<WebPage.Field>?fields?=?getFields(currentJob);?? //?初始化mapper,輸出類型為<String,WebPage>,?解析全部在maper完成?? ???StorageUtils.initMapperJob(currentJob,?fields,?String.class,?WebPage.class,?? ???????ParserMapper.class);?? //?初始化reducer,這里是支持把<key,values>寫到數(shù)據(jù)庫中?? ???StorageUtils.initReducerJob(currentJob,?IdentityPageReducer.class);?? ???currentJob.setNumReduceTasks(0);?? ?? ?? ???currentJob.waitForCompletion(true);?? ???ToolUtil.recordJobStatus(null,?currentJob,?results);?? ???return?results;?? ?}??
? ?
下面是DbUpdaterjob的run方法代碼
[java]?view plain?copy ?public?Map<String,Object>?run(Map<String,Object>?args)?throws?Exception?{?? ???String?crawlId?=?(String)args.get(Nutch.ARG_CRAWL);?? ???numJobs?=?1;?? ???currentJobNum?=?0;?? ???currentJob?=?new?NutchJob(getConf(),?"update-table");?? ???if?(crawlId?!=?null)?{?? ?????currentJob.getConfiguration().set(Nutch.CRAWL_ID_KEY,?crawlId);?? ???}?? ???//job.setBoolean(ALL,?updateAll);?? ???ScoringFilters?scoringFilters?=?new?ScoringFilters(getConf());?? ???HashSet<WebPage.Field>?fields?=?new?HashSet<WebPage.Field>(FIELDS);?? ???fields.addAll(scoringFilters.getFields());?? ????? ???//?Partition?by?{url},?sort?by?{url,score}?and?group?by?{url}.?? ???//?This?ensures?that?the?inlinks?are?sorted?by?score?when?they?enter?? ???//?the?reducer.?? ????? ???currentJob.setPartitionerClass(UrlOnlyPartitioner.class);?? ???currentJob.setSortComparatorClass(UrlScoreComparator.class);?? ???currentJob.setGroupingComparatorClass(UrlOnlyComparator.class);?? ????? //?這里的maper讀取webpage中的outlinks字段值,對每個外鏈接計算分數(shù)?? ???StorageUtils.initMapperJob(currentJob,?fields,?UrlWithScore.class,?? ???????NutchWritable.class,?DbUpdateMapper.class);?? //?對新生成的外鏈接設置一些分數(shù),狀態(tài)等信息,再把新的WebPage寫回數(shù)據(jù)庫?? ???StorageUtils.initReducerJob(currentJob,?DbUpdateReducer.class);?? ???currentJob.waitForCompletion(true);?? ???ToolUtil.recordJobStatus(null,?currentJob,?results);?? ???return?results;?? ?}??
下面是其run方法的源代碼
[java]?view plain?copy ??@Override?? ??public?Map<String,Object>?run(Map<String,Object>?args)?throws?Exception?{?? ????String?solrUrl?=?(String)args.get(Nutch.ARG_SOLR);?? ????String?batchId?=?(String)args.get(Nutch.ARG_BATCH);?? ????NutchIndexWriterFactory.addClassToConf(getConf(),?SolrWriter.class);?? ????getConf().set(SolrConstants.SERVER_URL,?solrUrl);?? ?? ?? //?初始化?job?? ????currentJob?=?createIndexJob(getConf(),?"solr-index",?batchId);?? ????Path?tmp?=?new?Path("tmp_"?+?System.currentTimeMillis()?+?"-"?? ????????????????+?new?Random().nextInt());?? //?設置輸出索引到文件,輸出格式使用IndexeroutputFormat,?其默認調(diào)用Solr的API把數(shù)據(jù)傳給Solr建立索引?? ????FileOutputFormat.setOutputPath(currentJob,?tmp);?? ????currentJob.waitForCompletion(true);?? ????ToolUtil.recordJobStatus(null,?currentJob,?results);?? ????return?results;?? ??}??
有興趣可以看一下SolrWriter,它實現(xiàn)了NutchIndexerWriter這個接口,來把數(shù)據(jù)寫到不同的后臺搜索引擎中,這里默認使用了Solr,當然你也可以通過實現(xiàn)它來擴展你自己的搜索引擎,當然nutch還提供了插件來自定義索引的字段值,也就是IndexingFilter.java這個接口。
---------------------
1. 整體流程
InjectorJob => GeneratorJob => FetcherJob => ParserJob => DbUpdaterJob => SolrIndexerJob
InjectorJob : 從文件中得到一批種子網(wǎng)頁,把它們放到抓取數(shù)據(jù)庫中去
GeneratorJob: 從抓取數(shù)據(jù)庫中產(chǎn)生要抓取的頁面放到抓取隊列中去
FetcherJob: ? 對抓取隊列中的網(wǎng)頁進行抓取,在reducer中使用了生產(chǎn)/消費者模型
ParserJob: ? ?對抓取完成的網(wǎng)頁進行解析,產(chǎn)生一些新的鏈接與網(wǎng)頁內(nèi)容的解析結(jié)果
DbUpdaterJob: 把新產(chǎn)生的鏈接更新到抓取數(shù)據(jù)庫中去
SolrIndexerJob: 對解析后的內(nèi)容進行索引建立
2. InjectorJob分析
? 下面是InjectorJob的啟動函數(shù),代碼如下
[java]?view plain?copy
? ?
? ?因為InjectorJob擴展自NutchTool,實現(xiàn)了它的run方法。
? ?我們可以看到,這里有兩個MR任務,第一個主要是從文件中讀入種子網(wǎng)頁,寫到DataStore數(shù)據(jù)庫中,第二個MR任務主要是對數(shù)據(jù)庫中的WebPage對象做一個分數(shù)與抓取間隔的設置。它使用到一個initMapperJob方法,代碼如下
[java]?view plain?copy
? ?
3. GeneratorJob 源代碼分析
? ?下面是GeneratorJob的run方法代碼
[java]?view plain?copy
? 好像比原來的Generate簡單很多,這里的GeneratorMapper完成的工作與之前的版本是一樣的,如url的正規(guī)化,過濾,分數(shù)的設置,而GeneratorReducer完成的工作也和之前差不多,只是輸出變成了DataStore,如HBase,完成以后會每個WebPage進行打標記,表示當前WebPage所完成的一個狀態(tài)。
4. FetcherJob 源代碼分析
? ?使用了Gora的 fetcher比原來簡單了很多,下面是其run的源代碼
[java]?view plain?copy
? 這里把原來在Mapper中使用到的生產(chǎn)者與消費者模型用到了reducer中,重寫了reducer的run方法,在其中打開多個抓取線程,對url進行多線程抓取,有興趣可以看一下FetcherReducer這個類。
5. ParserJob 代碼分析
? ?下面是ParserJob.java中的run代碼
[java]?view plain?copy
? ?
6. DbUpdaterJob 代碼分析
下面是DbUpdaterjob的run方法代碼
[java]?view plain?copy
7. SolrIndexerJob 代碼分析
下面是其run方法的源代碼
[java]?view plain?copy
有興趣可以看一下SolrWriter,它實現(xiàn)了NutchIndexerWriter這個接口,來把數(shù)據(jù)寫到不同的后臺搜索引擎中,這里默認使用了Solr,當然你也可以通過實現(xiàn)它來擴展你自己的搜索引擎,當然nutch還提供了插件來自定義索引的字段值,也就是IndexingFilter.java這個接口。
8. 總結(jié)
? ? Nutch 2.0個人感覺現(xiàn)在還是不成熟的,有很多功能還沒有完成,主要的改變還是在它的數(shù)據(jù)存儲層,把原來的數(shù)據(jù)存儲進行了抽象,使其可以更好的運行在大規(guī)模數(shù)據(jù)抓取中,而且可以讓用戶來擴展具體的數(shù)據(jù)存儲。當然數(shù)據(jù)存儲層的變化帶來了一些流程上的變化,有一些操作可以支持使用數(shù)據(jù)庫操作來完成,這也大大減少了一些原來要MR任務來完成的代碼??傊畁utch 2.0 ?還是讓我們看到了nutch的一個發(fā)展方向。希望它發(fā)現(xiàn)的越來越好吧。
轉(zhuǎn)載地址:http://blog.csdn.net/amuseme_lu/article/details/7777426
總結(jié)
以上是生活随笔為你收集整理的(转载)Nutch 2.0 之 抓取流程简单分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: (转载)Nutch2 WebPage 字
- 下一篇: Nutch爬虫引擎使用分析