分享

Nutch 2.0 抓取流程介绍

pig2 2013-12-18 14:40:47 发表于 介绍解说 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 12477

1. 整体流程
InjectorJob => GeneratorJob => FetcherJob => ParserJob => DbUpdaterJob => SolrIndexerJob
InjectorJob : 从文件中得到一批种子网页,把它们放到抓取数据库中去
GeneratorJob: 从抓取数据库中产生要抓取的页面放到抓取队列中去
FetcherJob:   对抓取队列中的网页进行抓取,在reducer中使用了生产/消费者模型
ParserJob:    对抓取完成的网页进行解析,产生一些新的链接与网页内容的解析结果
DbUpdaterJob: 把新产生的链接更新到抓取数据库中去
SolrIndexerJob: 对解析后的内容进行索引建立

2. InjectorJob分析  下面是InjectorJob的启动函数,代码如下
  1. public Map<String,Object> run(Map<String,Object> args) throws Exception {
  2. getConf().setLong("injector.current.time", System.currentTimeMillis());
  3. Path input;
  4. Object path = args.get(Nutch.ARG_SEEDDIR);
  5. if (path instanceof Path) {
  6. input = (Path)path;
  7. } else {
  8. input = new Path(path.toString());
  9. }
  10. numJobs = 2;
  11. currentJobNum = 0;
  12. status.put(Nutch.STAT_PHASE, "convert input");
  13. currentJob = new NutchJob(getConf(), "inject-p1 " + input);
  14. FileInputFormat.addInputPath(currentJob, input);
  15. // mapper方法,从文件中解析出url,写入数据库
  16. currentJob.setMapperClass(UrlMapper.class);
  17. currentJob.setMapOutputKeyClass(String.class);
  18. // map 的输出为WebPage,它是用Gora compile生成的,可以通过Gora把它映射到不同的数据库中,
  19. currentJob.setMapOutputValueClass(WebPage.class);
  20. // 输出到GoraOutputFormat
  21. currentJob.setOutputFormatClass(GoraOutputFormat.class);
  22. DataStore<String, WebPage> store = StorageUtils.createWebStore(currentJob.getConfiguration(),
  23. String.class, WebPage.class);
  24. GoraOutputFormat.setOutput(currentJob, store, true);
  25. currentJob.setReducerClass(Reducer.class);
  26. currentJob.setNumReduceTasks(0);
  27. currentJob.waitForCompletion(true);
  28. ToolUtil.recordJobStatus(null, currentJob, results);
  29. currentJob = null;
  30. status.put(Nutch.STAT_PHASE, "merge input with db");
  31. status.put(Nutch.STAT_PROGRESS, 0.5f);
  32. currentJobNum = 1;
  33. currentJob = new NutchJob(getConf(), "inject-p2 " + input);
  34. StorageUtils.initMapperJob(currentJob, FIELDS, String.class,
  35. WebPage.class, InjectorMapper.class);
  36. currentJob.setNumReduceTasks(0);
  37. ToolUtil.recordJobStatus(null, currentJob, results);
  38. status.put(Nutch.STAT_PROGRESS, 1.0f);
  39. return results;
  40. }
复制代码
因为InjectorJob扩展自NutchTool,实现了它的run方法。
   我们可以看到,这里有两个MR任务,第一个主要是从文件中读入种子网页,写到DataStore数据库中,第二个MR任务主要是对数据库中的WebPage对象做一个分数与抓取间隔的设置。它使用到一个initMapperJob方法,代码如下
  1. public static <K, V> void initMapperJob(Job job,
  2. Collection<WebPage.Field> fields,
  3. Class<K> outKeyClass, Class<V> outValueClass,
  4. Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass,
  5. Class<? extends Partitioner<K, V>> partitionerClass, boolean reuseObjects)
  6. throws ClassNotFoundException, IOException {
  7. // 这里是生成一个DataStore的抽象,这里的DataStore用户可以不同的模块,如Hbase,MySql等
  8. DataStore<String, WebPage> store = createWebStore(job.getConfiguration(),
  9. String.class, WebPage.class);
  10. if (store==null) throw new RuntimeException("Could not create datastore");
  11. Query<String, WebPage> query = store.newQuery();
  12. query.setFields(toStringArray(fields));
  13. GoraMapper.initMapperJob(job, query, store,
  14. outKeyClass, outValueClass, mapperClass, partitionerClass, reuseObjects);
  15. GoraOutputFormat.setOutput(job, store, true);
  16. }
复制代码
3. GeneratorJob 源代码分析   下面是GeneratorJob的run方法代码
  1. public Map<String,Object> run(Map<String,Object> args) throws Exception {
  2. // map to inverted subset due for fetch, sort by score
  3. Long topN = (Long)args.get(Nutch.ARG_TOPN);
  4. Long curTime = (Long)args.get(Nutch.ARG_CURTIME);
  5. if (curTime == null) {
  6. curTime = System.currentTimeMillis();
  7. }
  8. Boolean filter = (Boolean)args.get(Nutch.ARG_FILTER);
  9. Boolean norm = (Boolean)args.get(Nutch.ARG_NORMALIZE);
  10. // map to inverted subset due for fetch, sort by score
  11. getConf().setLong(GENERATOR_CUR_TIME, curTime);
  12. if (topN != null)
  13. getConf().setLong(GENERATOR_TOP_N, topN);
  14. if (filter != null)
  15. getConf().setBoolean(GENERATOR_FILTER, filter);
  16. int randomSeed = Math.abs(new Random().nextInt());
  17. batchId = (curTime / 1000) + "-" + randomSeed;
  18. getConf().setInt(GENERATOR_RANDOM_SEED, randomSeed);
  19. getConf().set(BATCH_ID, batchId);
  20. getConf().setLong(Nutch.GENERATE_TIME_KEY, System.currentTimeMillis());
  21. if (norm != null)
  22. getConf().setBoolean(GENERATOR_NORMALISE, norm);
  23. String mode = getConf().get(GENERATOR_COUNT_MODE, GENERATOR_COUNT_VALUE_HOST);
  24. if (GENERATOR_COUNT_VALUE_HOST.equalsIgnoreCase(mode)) {
  25. getConf().set(URLPartitioner.PARTITION_MODE_KEY, URLPartitioner.PARTITION_MODE_HOST);
  26. } else if (GENERATOR_COUNT_VALUE_DOMAIN.equalsIgnoreCase(mode)) {
  27. getConf().set(URLPartitioner.PARTITION_MODE_KEY, URLPartitioner.PARTITION_MODE_DOMAIN);
  28. } else {
  29. LOG.warn("Unknown generator.max.count mode '" + mode + "', using mode=" + GENERATOR_COUNT_VALUE_HOST);
  30. getConf().set(GENERATOR_COUNT_MODE, GENERATOR_COUNT_VALUE_HOST);
  31. getConf().set(URLPartitioner.PARTITION_MODE_KEY, URLPartitioner.PARTITION_MODE_HOST);
  32. }
  33. // 上面是设置一些要使用要的常量
  34. numJobs = 1;
  35. currentJobNum = 0;
  36. // 生成一个job
  37. currentJob = new NutchJob(getConf(), "generate: " + batchId);
  38. // 初始化Map,这里的Map的输出类型为<SelectorEntry,WebPage>, 使用 SelectorEntryPartitioner来进行切分
  39. StorageUtils.initMapperJob(currentJob, FIELDS, SelectorEntry.class,
  40. WebPage.class, GeneratorMapper.class, SelectorEntryPartitioner.class, true);
  41. // 初始化Reducer, 使用了generatorReducer来进行聚合处理
  42. StorageUtils.initReducerJob(currentJob, GeneratorReducer.class);
  43. currentJob.waitForCompletion(true);
  44. ToolUtil.recordJobStatus(null, currentJob, results);
  45. results.put(BATCH_ID, batchId);
  46. return results;
  47. }
复制代码
好像比原来的Generate简单很多,这里的GeneratorMapper完成的工作与之前的版本是一样的,如url的正规化,过滤,分数的设置,而GeneratorReducer完成的工作也和之前差不多,只是输出变成了DataStore,如HBase,完成以后会每个WebPage进行打标记,表示当前WebPage所完成的一个状态。

4. FetcherJob 源代码分析   使用了Gora的 fetcher比原来简单了很多,下面是其run的源代码
  1. public Map<String,Object> run(Map<String,Object> args) throws Exception {
  2. checkConfiguration();
  3. String batchId = (String)args.get(Nutch.ARG_BATCH);
  4. Integer threads = (Integer)args.get(Nutch.ARG_THREADS);
  5. Boolean shouldResume = (Boolean)args.get(Nutch.ARG_RESUME);
  6. Integer numTasks = (Integer)args.get(Nutch.ARG_NUMTASKS);
  7. if (threads != null && threads > 0) {
  8. getConf().setInt(THREADS_KEY, threads);
  9. }
  10. if (batchId == null) {
  11. batchId = Nutch.ALL_BATCH_ID_STR;
  12. }
  13. getConf().set(GeneratorJob.BATCH_ID, batchId);
  14. if (shouldResume != null) {
  15. getConf().setBoolean(RESUME_KEY, shouldResume);
  16. }
  17. LOG.info("FetcherJob : timelimit set for : " + getConf().getLong("fetcher.timelimit", -1));
  18. LOG.info("FetcherJob: threads: " + getConf().getInt(THREADS_KEY, 10));
  19. LOG.info("FetcherJob: parsing: " + getConf().getBoolean(PARSE_KEY, false));
  20. LOG.info("FetcherJob: resuming: " + getConf().getBoolean(RESUME_KEY, false));
  21. // set the actual time for the timelimit relative
  22. // to the beginning of the whole job and not of a specific task
  23. // otherwise it keeps trying again if a task fails
  24. long timelimit = getConf().getLong("fetcher.timelimit.mins", -1);
  25. if (timelimit != -1) {
  26. timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
  27. getConf().setLong("fetcher.timelimit", timelimit);
  28. }
  29. numJobs = 1;
  30. currentJob = new NutchJob(getConf(), "fetch");
  31. // 得到它过滤的字段
  32. Collection<WebPage.Field> fields = getFields(currentJob);
  33. // 初始化mapper, 其输出为<IntWritable,FetchEntry>
  34. // 在mapper中输入数据进行过滤,主要是对不是同一个batch与已经fetch的数据进行过滤
  35. StorageUtils.initMapperJob(currentJob, fields, IntWritable.class,
  36. FetchEntry.class, FetcherMapper.class, FetchEntryPartitioner.class, false);
  37. // 初始化reducer
  38. StorageUtils.initReducerJob(currentJob, FetcherReducer.class);
  39. if (numTasks == null || numTasks < 1) {
  40. currentJob.setNumReduceTasks(currentJob.getConfiguration().getInt("mapred.map.tasks",
  41. currentJob.getNumReduceTasks()));
  42. } else {
  43. currentJob.setNumReduceTasks(numTasks);
  44. }
  45. currentJob.waitForCompletion(true);
  46. ToolUtil.recordJobStatus(null, currentJob, results);
  47. return results;
  48. }
复制代码
这里把原来在Mapper中使用到的生产者与消费者模型用到了reducer中,重写了reducer的run方法,在其中打开多个抓取线程,对url进行多线程抓取,有兴趣可以看一下FetcherReducer这个类。

5. ParserJob 代码分析   下面是ParserJob.java中的run代码
  1. @Override
  2. public Map<String,Object> run(Map<String,Object> args) throws Exception {
  3. String batchId = (String)args.get(Nutch.ARG_BATCH);
  4. Boolean shouldResume = (Boolean)args.get(Nutch.ARG_RESUME);
  5. Boolean force = (Boolean)args.get(Nutch.ARG_FORCE);
  6. if (batchId != null) {
  7. getConf().set(GeneratorJob.BATCH_ID, batchId);
  8. }
  9. if (shouldResume != null) {
  10. getConf().setBoolean(RESUME_KEY, shouldResume);
  11. }
  12. if (force != null) {
  13. getConf().setBoolean(FORCE_KEY, force);
  14. }
  15. LOG.info("ParserJob: resuming:\t" + getConf().getBoolean(RESUME_KEY, false));
  16. LOG.info("ParserJob: forced reparse:\t" + getConf().getBoolean(FORCE_KEY, false));
  17. if (batchId == null || batchId.equals(Nutch.ALL_BATCH_ID_STR)) {
  18. LOG.info("ParserJob: parsing all");
  19. } else {
  20. LOG.info("ParserJob: batchId:\t" + batchId);
  21. }
  22. currentJob = new NutchJob(getConf(), "parse");
  23. Collection<WebPage.Field> fields = getFields(currentJob);
  24. // 初始化mapper,输出类型为<String,WebPage>, 解析全部在maper完成
  25. StorageUtils.initMapperJob(currentJob, fields, String.class, WebPage.class,
  26. ParserMapper.class);
  27. // 初始化reducer,这里是支持把<key,values>写到数据库中
  28. StorageUtils.initReducerJob(currentJob, IdentityPageReducer.class);
  29. currentJob.setNumReduceTasks(0);
  30. currentJob.waitForCompletion(true);
  31. ToolUtil.recordJobStatus(null, currentJob, results);
  32. return results;
  33. }
复制代码
6. DbUpdaterJob 代码分析
下面是DbUpdaterjob的run方法代码
  1. public Map<String,Object> run(Map<String,Object> args) throws Exception {
  2. String crawlId = (String)args.get(Nutch.ARG_CRAWL);
  3. numJobs = 1;
  4. currentJobNum = 0;
  5. currentJob = new NutchJob(getConf(), "update-table");
  6. if (crawlId != null) {
  7. currentJob.getConfiguration().set(Nutch.CRAWL_ID_KEY, crawlId);
  8. }
  9. //job.setBoolean(ALL, updateAll);
  10. ScoringFilters scoringFilters = new ScoringFilters(getConf());
  11. HashSet<WebPage.Field> fields = new HashSet<WebPage.Field>(FIELDS);
  12. fields.addAll(scoringFilters.getFields());
  13. // Partition by {url}, sort by {url,score} and group by {url}.
  14. // This ensures that the inlinks are sorted by score when they enter
  15. // the reducer.
  16. currentJob.setPartitionerClass(UrlOnlyPartitioner.class);
  17. currentJob.setSortComparatorClass(UrlScoreComparator.class);
  18. currentJob.setGroupingComparatorClass(UrlOnlyComparator.class);
  19. // 这里的maper读取webpage中的outlinks字段值,对每个外链接计算分数
  20. StorageUtils.initMapperJob(currentJob, fields, UrlWithScore.class,
  21. NutchWritable.class, DbUpdateMapper.class);
  22. // 对新生成的外链接设置一些分数,状态等信息,再把新的WebPage写回数据库
  23. StorageUtils.initReducerJob(currentJob, DbUpdateReducer.class);
  24. currentJob.waitForCompletion(true);
  25. ToolUtil.recordJobStatus(null, currentJob, results);
  26. return results;
  27. }
复制代码
7. SolrIndexerJob 代码分析
下面是其run方法的源代码
  1. @Override
  2. public Map<String,Object> run(Map<String,Object> args) throws Exception {
  3. String solrUrl = (String)args.get(Nutch.ARG_SOLR);
  4. String batchId = (String)args.get(Nutch.ARG_BATCH);
  5. NutchIndexWriterFactory.addClassToConf(getConf(), SolrWriter.class);
  6. getConf().set(SolrConstants.SERVER_URL, solrUrl);
  7. // 初始化 job
  8. currentJob = createIndexJob(getConf(), "solr-index", batchId);
  9. Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-"
  10. + new Random().nextInt());
  11. // 设置输出索引到文件,输出格式使用IndexeroutputFormat, 其默认调用Solr的API把数据传给Solr建立索引
  12. FileOutputFormat.setOutputPath(currentJob, tmp);
  13. currentJob.waitForCompletion(true);
  14. ToolUtil.recordJobStatus(null, currentJob, results);
  15. return results;
  16. }
复制代码
有兴趣可以看一下SolrWriter,它实现了NutchIndexerWriter这个接口,来把数据写到不同的后台搜索引擎中,这里默认使用了Solr,当然你也可以通过实现它来扩展你自己的搜索引擎,当然nutch还提供了插件来自定义索引的字段值,也就是IndexingFilter.java这个接口。

8. 总结    Nutch 2.0个人感觉现在还是不成熟的,有很多功能还没有完成,主要的改变还是在它的数据存储层,把原来的数据存储进行了抽象,使其可以更好的运行在大规模数据抓取中,而且可以让用户来扩展具体的数据存储。当然数据存储层的变化带来了一些流程上的变化,有一些操作可以支持使用数据库操作来完成,这也大大减少了一些原来要MR任务来完成的代码。总之nutch 2.0  还是让我们看到了nutch的一个发展方向。希望它发现的越来越好吧。


已有(3)人评论

跳转到指定楼层
anduo1989 发表于 2014-12-13 12:25:59
商用一般使用的1版本
回复

使用道具 举报

tang 发表于 2015-6-20 10:16:43
回复

使用道具 举报

dead_lyn 发表于 2015-9-12 11:11:02
SOS~~~求问LZ,,,
在使用nutch后,爬取的都是一个一个的网页内容。。。
想问:后续的处理一般是怎么样的。
比如,想爬取一个网站的视频资源,如何下载下来呢。。。
谢谢啦!!

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条