Injector Job深入分析
生活随笔
收集整理的這篇文章主要介紹了
Injector Job深入分析
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
Injector Job的主要功能是根據(jù)crawlId在hbase中創(chuàng)建一個(gè)表,將將文本中的seed注入表中。
(一)命令執(zhí)行
1、運(yùn)行命令
[jediael@master local]$ bin/nutch inject seeds/ -crawlId sourcetest InjectorJob: starting at 2015-03-10 14:59:19 InjectorJob: Injecting urlDir: seeds InjectorJob: Using class org.apache.gora.hbase.store.HBaseStore as the Gora storage class. InjectorJob: total number of urls rejected by filters: 0 InjectorJob: total number of urls injected after normalization and filtering: 1 Injector: finished at 2015-03-10 14:59:26, elapsed: 00:00:06
2、查看表中內(nèi)容
hbase(main):004:0> scan 'sourcetest_webpage' ROW COLUMN+CELL com.163.money:http/ column=f:fi, timestamp=1425970761871, value=\x00'\x8D\x00 com.163.money:http/ column=f:ts, timestamp=1425970761871, value=\x00\x00\x01L\x02{\x08_ com.163.money:http/ column=mk:_injmrk_, timestamp=1425970761871, value=y com.163.money:http/ column=mk:dist, timestamp=1425970761871, value=0 com.163.money:http/ column=mtdt:_csh_, timestamp=1425970761871, value=?\x80\x00\x00 com.163.money:http/ column=s:s, timestamp=1425970761871, value=?\x80\x00\x00 1 row(s) in 0.0430 seconds
3、讀取數(shù)據(jù)庫中的內(nèi)容
由于hbase表使用了字節(jié)碼表示內(nèi)容,因此需要通過以下命令來查看具體內(nèi)容
[jediael@master local]$ bin/nutch readdb -dump ./test -crawlId sourcetest -content WebTable dump: starting WebTable dump: done [jediael@master local]$ cat test/part-r-00000 http://money.163.com/ key: com.163.money:http/ baseUrl: null status: 0 (null) fetchTime: 1425970759775 prevFetchTime: 0 fetchInterval: 2592000 retriesSinceFetch: 0 modifiedTime: 0 prevModifiedTime: 0 protocolStatus: (null) parseStatus: (null) title: null score: 1.0 marker _injmrk_ : y marker dist : 0 reprUrl: null metadata _csh_ : ?錕
(二)源碼流程分析
類:org.apache.nutch.crawl.InjectorJob
1、程序入口
? public static void main(String[] args) throws Exception {int res = ToolRunner.run(NutchConfiguration.create(), new InjectorJob(),args);System.exit(res);}
2、ToolRunner.run(String[] args)
此步驟主要是調(diào)用inject方法,其余均是一些參數(shù)合規(guī)性的檢查
? public int run(String[] args) throws Exception {…………inject(new Path(args[0]));…………}
3、inject()方法
nutch均使用 Map<String, Object> run(Map<String, Object> args)來運(yùn)行具體的job,即其使用Map類參數(shù),并返回Map類參數(shù)。
<pre name="code" class="java">public void inject(Path urlDir) throws Exception {run(ToolUtil.toArgMap(Nutch.ARG_SEEDDIR, urlDir));}
4、job的具體配置,并創(chuàng)建hbase中的表格
public Map<String, Object> run(Map<String, Object> args) throws Exception {numJobs = 1;currentJobNum = 0;currentJob = new NutchJob(getConf(), "inject " + input);FileInputFormat.addInputPath(currentJob, input);currentJob.setMapperClass(UrlMapper.class);currentJob.setMapOutputKeyClass(String.class);currentJob.setMapOutputValueClass(WebPage.class);currentJob.setOutputFormatClass(GoraOutputFormat.class);DataStore<String, WebPage> store = StorageUtils.createWebStore(currentJob.getConfiguration(), String.class, WebPage.class);GoraOutputFormat.setOutput(currentJob, store, true);currentJob.setReducerClass(Reducer.class);currentJob.setNumReduceTasks(0);currentJob.waitForCompletion(true);ToolUtil.recordJobStatus(null, currentJob, results); }
5、mapper方法
由于Injector Job中無reducer,因此只要關(guān)注mapper即可。
mapper主要完成以下幾項(xiàng)工作:
(1)對(duì)文本中的內(nèi)容進(jìn)行分析,并提取其中的參數(shù)
(2)根據(jù)filter過濾url
(3)反轉(zhuǎn)url作為key,創(chuàng)建Webpage對(duì)象作為value,然后將之寫入表中。
protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String url = value.toString().trim(); // value is line of textif (url != null && (url.length() == 0 || url.startsWith("#"))) {/* Ignore line that start with # */return;}// if tabs : metadata that could be stored// must be name=value and separated by \tfloat customScore = -1f;int customInterval = interval;Map<String, String> metadata = new TreeMap<String, String>();if (url.indexOf("\t") != -1) {String[] splits = url.split("\t");url = splits[0];for (int s = 1; s < splits.length; s++) {// find separation between name and valueint indexEquals = splits[s].indexOf("=");if (indexEquals == -1) {// skip anything without a =continue;}String metaname = splits[s].substring(0, indexEquals);String metavalue = splits[s].substring(indexEquals + 1);if (metaname.equals(nutchScoreMDName)) {try {customScore = Float.parseFloat(metavalue);} catch (NumberFormatException nfe) {}} else if (metaname.equals(nutchFetchIntervalMDName)) {try {customInterval = Integer.parseInt(metavalue);} catch (NumberFormatException nfe) {}} elsemetadata.put(metaname, metavalue);}}try {url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);url = filters.filter(url); // filter the url} catch (Exception e) {LOG.warn("Skipping " + url + ":" + e);url = null;}if (url == null) {context.getCounter("injector", "urls_filtered").increment(1);return;} else { // if it passesString reversedUrl = TableUtil.reverseUrl(url); // collect itWebPage row = WebPage.newBuilder().build();row.setFetchTime(curTime);row.setFetchInterval(customInterval);// now add the metadataIterator<String> keysIter = metadata.keySet().iterator();while (keysIter.hasNext()) {String keymd = keysIter.next();String valuemd = metadata.get(keymd);row.getMetadata().put(new Utf8(keymd),ByteBuffer.wrap(valuemd.getBytes()));}if (customScore != -1)row.setScore(customScore);elserow.setScore(scoreInjected);try {scfilters.injectedScore(url, row);} catch (ScoringFilterException e) {if (LOG.isWarnEnabled()) {LOG.warn("Cannot filter injected score for url " + url+ ", using default (" + e.getMessage() + ")");}}context.getCounter("injector", "urls_injected").increment(1);row.getMarkers().put(DbUpdaterJob.DISTANCE, new Utf8(String.valueOf(0)));Mark.INJECT_MARK.putMark(row, YES_STRING);context.write(reversedUrl, row);}}
(三)重點(diǎn)源碼學(xué)習(xí)
總結(jié)
以上是生活随笔為你收集整理的Injector Job深入分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 分类算法简介
- 下一篇: hadoop调优之一:概述