分享

Hadoop DistributedCache使用及原理

desehawk 2014-6-28 12:45:21 发表于 原理型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 24050
本帖最后由 desehawk 于 2014-6-28 12:45 编辑
问题导读:
1.DistributedCache是什么?
2.每个存储在HDFS中的文件被放到缓存中后有什么特征?
3.distributedCache可以分发什么类型的文件?
4.distributedCache通过什么设置来分发文件,如何设置分发多个文件?





概览
DistributedCache 是一个提供给Map/Reduce框架的工具,用来缓存文件(text, archives, jars and so on)文件的默认访问协议为(hdfs://).

DistributedCache将拷贝缓存的文件到Slave节点在任何Job在节点上执行之前。
文件在每个Job中只会被拷贝一次,缓存的归档文件会被在Slave节点中解压缩。



符号链接
每个存储在HDFS中的文件被放到缓存中后都可以通过一个符号链接使用。
URI hdfs://namenode/test/input/file1#myfile 你可以在程序中直接使用myfile来访问 file1这个文件。 myfile是一个符号链接文件。



缓存在本地的存储目录

  1. <property>
  2.   <name>mapred.local.dir</name>
  3.   <value>${hadoop.tmp.dir}/mapred/local</value>
  4.   <description>The local directory where MapReduce stores intermediate
  5.   data files.  May be a comma-separated list of
  6.   directories on different devices in order to spread disk i/o.
  7.   Directories that do not exist are ignored.
  8.   </description>
  9. </property>
  10. <property>
  11.   <name>local.cache.size</name>
  12.   <value>10737418240</value> (默认大小:10GB)
  13.   <description>The limit on the size of cache you want to keep, set by default
  14.   to 10GB. This will act as a soft limit on the cache directory for out of band data.
  15.   </description>
  16. </property>
复制代码



实际在DataNode节点中的存储目录:
/netqin/hadoop/tmp{${hadoop.tmp.dir}}/mapred/local/taskTracker/archive/hadoop-server01{NameNode主机名称}


Archive文件会被解压缩

例子

  1. package com.netqin.examples;
  2. import java.io.BufferedReader;
  3. import java.io.FileReader;
  4. import java.io.IOException;
  5. import java.net.URI;
  6. import java.util.StringTokenizer;
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.filecache.DistributedCache;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.IntWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.mapreduce.Job;
  13. import org.apache.hadoop.mapreduce.Mapper;
  14. import org.apache.hadoop.mapreduce.Reducer;
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  17. import org.apache.hadoop.util.GenericOptionsParser;
  18. public class CacheDemo {
  19.     public static void UseDistributedCacheBySymbolicLink() throws Exception {
  20.         FileReader reader = new FileReader("hdfs://mail.py");
  21.         BufferedReader br = new BufferedReader(reader);
  22.         String s = null;
  23.         while ((s = br.readLine()) != null) {
  24.             System.out.println(s);
  25.         }
  26.         br.close();
  27.         reader.close();
  28.     }
  29.     public static class TokenizerMapper extends
  30.             Mapper<Object, Text, Text, IntWritable> {
  31.         private final static IntWritable one = new IntWritable(1);
  32.         private Text word = new Text();
  33.         protected void setup(Context context) throws IOException,
  34.                 InterruptedException {
  35.             System.out.println("Now, use the distributed cache and syslink");
  36.             try {
  37.                 UseDistributedCacheBySymbolicLink();
  38.             } catch (Exception e) {
  39.                 e.printStackTrace();
  40.             }
  41.         }
  42.         public void map(Object key, Text value, Context context)
  43.                 throws IOException, InterruptedException {
  44.             StringTokenizer itr = new StringTokenizer(value.toString());
  45.             while (itr.hasMoreTokens()) {
  46.                 word.set(itr.nextToken());
  47.                 context.write(word, one);
  48.             }
  49.         }
  50.     }
  51.     public static class IntSumReducer extends
  52.             Reducer<Text, IntWritable, Text, IntWritable> {
  53.         private IntWritable result = new IntWritable();
  54.         public void reduce(Text key, Iterable<IntWritable> values,
  55.                 Context context) throws IOException, InterruptedException {
  56.             int sum = 0;
  57.             for (IntWritable val : values) {
  58.                 sum += val.get();
  59.             }
  60.             result.set(sum);
  61.             context.write(key, result);
  62.         }
  63.     }
  64.     public static void main(String[] args) throws Exception {
  65.         Configuration conf = new Configuration();
  66.         String[] otherArgs = new GenericOptionsParser(conf, args)
  67.                 .getRemainingArgs();
  68.         if (otherArgs.length != 2) {
  69.             System.err.println("Usage: wordcount <in> <out>");
  70.             System.exit(2);
  71.         }
  72.         DistributedCache.createSymlink(conf);
  73.         String path = "/tmp/test/mail.py";
  74.         Path filePath = new Path(path);
  75.         String uriWithLink = filePath.toUri().toString() + "#" + "mail.py";
  76.         DistributedCache.addCacheFile(new URI(uriWithLink), conf);
  77.       
  78.         // Path p = new Path("/tmp/hadoop-0.20.2-capacity-scheduler.jar#hadoop-0.20.2-capacity-scheduler.jar");
  79.         // DistributedCache.addArchiveToClassPath(p, conf);
  80.       
  81.       
  82.         Job job = new Job(conf, "CacheDemo");
  83.         job.setJarByClass(CacheDemo.class);
  84.         job.setMapperClass(TokenizerMapper.class);
  85.         job.setCombinerClass(IntSumReducer.class);
  86.         job.setReducerClass(IntSumReducer.class);
  87.         job.setOutputKeyClass(Text.class);
  88.         job.setOutputValueClass(IntWritable.class);
  89.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  90.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  91.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  92.     }
  93. }
复制代码






DistributedCache
DistributedCache 可将具体应用相关的、大尺寸的、只读的文件有效地分布放置。

DistributedCache 是Map/Reduce框架提供的功能,能够缓存应用程序所需的文件 (包括文本,档案文件,jar文件等)。

应用程序在JobConf中通过url(hdfs://)指定需要被缓存的文件。 DistributedCache假定由hdfs://格式url指定的文件已经在 FileSystem上了。
Map-Redcue框架在作业所有任务执行之前会把必要的文件拷贝到slave节点上。 它运行高效是因为每个作业的文件只拷贝一次并且为那些没有文档的slave节点缓存文档。

DistributedCache 根据缓存文档修改的时间戳进行追踪。 在作业执行期间,当前应用程序或者外部程序不能修改缓存文件。

distributedCache可以分发简单的只读数据或文本文件,也可以分发复杂类型的文件例如归档文件和jar文件。归档文件(zip,tar,tgz和tar.gz文件)在slave节点上会被解档(un-archived)。 这些文件可以设置执行权限。

用户可以通过设置mapred.cache.{files|archives}来分发文件。 如果要分发多个文件,可以使用逗号分隔文件所在路径。也可以利用API来设置该属性: DistributedCache.addCacheFile(URI,conf)/ DistributedCache.addCacheArchive(URI,conf) and DistributedCache.setCacheFiles(URIs,conf)/ DistributedCache.setCacheArchives(URIs,conf)

其中URI的形式是 hdfs://host:port/absolute-path#link-name 在Streaming程序中,可以通过命令行选项 -cacheFile/-cacheArchive 分发文件。


用户可以通过DistributedCache.createSymlink(Configuration)方法让DistributedCache 在当前工作目录下创建到缓存文件的符号链接。 或者通过设置配置文件属性mapred.create.symlink为yes。 分布式缓存会截取URI的片段作为链接的名字。 例如,URI是 hdfs://namenode:port/lib.so.1#lib.so, 则在task当前工作目录会有名为lib.so的链接, 它会链接分布式缓存中的lib.so.1。

DistributedCache可在map/reduce任务中作为 一种基础软件分发机制使用。它可以被用于分发jar包和本地库(native libraries)。 DistributedCache.addArchiveToClassPath(Path, Configuration)DistributedCache.addFileToClassPath(Path, Configuration) API能够被用于 缓存文件和jar包,并把它们加入子jvm的classpath。也可以通过设置配置文档里的属性 mapred.job.classpath.{files|archives}达到相同的效果。缓存文件可用于分发和装载本地库。
http://www.open-open.com/lib/view/open1337349822015.html

Hadoop有一个叫做分布式缓存(distributed cache)的机制来将数据分发到集群上的所有节点上。为了节约网络带宽,在每一个作业中,各个文件通常只需要复制到一个节点一次。
缓存文件复制位置:mapred-site.xml中
  1. <property>
  2. <name>mapred.local.dir</name>
  3. <value>/home/hadoop/tmp</value>
  4. </property>
复制代码



操作步骤:
1.将数据的分发到每个节点上:
DistributedCache.addCacheFile(new URI("hdfs://cloud01:9000/user/hadoop/mrinput/ST.txt"), conf);
注意,此操作一定要在创建Job,将conf传递给Job之前进行,否则数据文件的路径不会被Mapper中取到。
2.在每个Mapper中获取文件URI,再进行相关操作:
            URI[] uris=DistributedCache.getCacheFiles(context.getConfiguration());

比如读取该文件:
       FileSystem fs = FileSystem.get(URI.create("hdfs://cloud01:9000"), context.getConfiguration());
  FSDataInputStream in = null;
  in = fs.open(new Path(uris[0].getPath()));
  BufferedReader br=new BufferedReader(new InputStreamReader(in));

hadoop中的DistributedCache 2
WordCount.javaHadoop的分布式缓存机制使得一个job的所有map或reduce可以访问同一份文件。在任务提交后,hadoop将由-files和-archive选项指定的文件复制到HDFS上(JobTracker的文件系统)。在任务运行前,TaskTracker从JobTracker文件系统复制文件到本地磁盘作为缓存,这样任务就可以访问这些文件。对于job来说,它并不关心文件是从哪儿来的。在使用DistributedCache时,对于本地化文件的访问,通常使用Symbolic Link来访问,这样更方便。通过 URI hdfs://namenode/test/input/file1#myfile 指定的文件在当前工作目录中被符号链接为myfile。这样job里面可直接通过myfile来访问文件,而不用关心该文件在本地的具体路径。
示例如下:


  1. package org.myorg;
  2. import java.io.BufferedReader;
  3. import java.io.FileReader;
  4. import java.io.IOException;
  5. import java.net.URI;
  6. import java.util.StringTokenizer;
  7. import java.io.IOException;
  8. import java.util.*;
  9. import org.apache.hadoop.filecache.DistributedCache;
  10. import org.apache.hadoop.fs.Path;
  11. import org.apache.hadoop.conf.*;
  12. import org.apache.hadoop.io.*;
  13. import org.apache.hadoop.mapred.*;
  14. import org.apache.hadoop.util.*;
  15. public class WordCount
  16. {
  17. public static void UseDistributedCacheBySymbolicLink() throws Exception
  18. {
  19. FileReader reader = new FileReader("god.txt");
  20. BufferedReader br = new BufferedReader(reader);
  21. String s1 = null;
  22. while ((s1 = br.readLine()) != null)
  23. {
  24. System.out.println(s1);
  25. }
  26. br.close();
  27. reader.close();
  28. }
  29. public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
  30. {
  31. public void configure(JobConf job)
  32. {
  33. System.out.println("Now, use the distributed cache and syslink");
  34. try {
  35. UseDistributedCacheBySymbolicLink();
  36. }
  37. catch (Exception e)
  38. {
  39. e.printStackTrace();
  40. }
  41. }
  42. private final static IntWritable one = new IntWritable(1);
  43. private Text word = new Text();
  44. public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
  45. {
  46. String line = value.toString();
  47. StringTokenizer tokenizer = new StringTokenizer(line);
  48. while (tokenizer.hasMoreTokens())
  49. {
  50. word.set(tokenizer.nextToken());
  51. output.collect(word, one);
  52. }
  53. }
  54. }
  55. public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>
  56. {
  57. public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
  58. {
  59. int sum = 0;
  60. while (values.hasNext())
  61. {
  62. sum += values.next().get();
  63. }
  64. output.collect(key, new IntWritable(sum));
  65. }
  66. }
  67. public static void main(String[] args) throws Exception
  68. {
  69. JobConf conf = new JobConf(WordCount.class);
  70. conf.setJobName("wordcount");
  71. conf.setOutputKeyClass(Text.class);
  72. conf.setOutputValueClass(IntWritable.class);
  73. conf.setMapperClass(Map.class);
  74. conf.setCombinerClass(Reduce.class);
  75. conf.setReducerClass(Reduce.class);
  76. conf.setInputFormat(TextInputFormat.class);
  77. conf.setOutputFormat(TextOutputFormat.class);
  78. FileInputFormat.setInputPaths(conf, new Path(args[0]));
  79. FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  80. DistributedCache.createSymlink(conf);
  81. String path = "/xuxm_dev_test_61_pic/in/WordCount.java";
  82. Path filePath = new Path(path);
  83. String uriWithLink = filePath.toUri().toString() + "#" + "god.txt";
  84. DistributedCache.addCacheFile(new URI(uriWithLink), conf);
  85. JobClient.runJob(conf);
  86. }
  87. }
复制代码


执行方法参考http://hadoop.apache.org/common/ ... BC%9AWordCount+v1.0

  程序运行的结果是在jobtracker中的task的log可以看到打印后的/xuxm_dev_test_61_pic/in/WordCount.java文件的内容。

  如果程序中要用到很多小文件,那么使用Symbolic Link将非常方便。
     请在执行前先将WordCount.java文件放到指定位置,否则就会找不到文件

概念:
reduce-side join技术是灵活的,但是有时候它仍然会变得效率极低。由于join直到reduce()阶段才会开始,我们将会在网络中传递shuffle所有数据,而在大多数情况下,我们会在join阶段丢掉大多数传递的数据。因此我们期望能够在map阶段完成整个join操作。

主要技术难点:

在map阶段完成join的主要困难就是mapper可能需要与一个它自己不能获得的数据进行join操作,如果我们能够保证这样子的数据可被mapper获得,那我们这个技术就可用。举个例子,如果我们知道两个源数据被分为同样大小的partition,而且每个partition都以适合作为join key的key值排序的话,那每个mapper()就可以获取所有join操作需要的数据。事实上,Hadoop的org.apache.hadoop.mared.join包中包含了这样的帮助类来实现mapside join,但不幸的是,这样的情况太少了。而且使用这样的类会造成额外的开销。因此,我们不会继续讨论这个包。

什么情况下使用?

情况1:如果我们知道两个源数据被分为同样大小的partition,而且每个partition都以适合作为join key的key值排序

情况2:当join大型数据时,通常只有一个源数据十分巨大,另一个数据可能就会呈数量级的减小。例如,一个电话公司的用户数据可能只有千万条用户数据,但他的交易记录数据可能会有十亿条数量级以上的具体电话记录。当小的数据源可以被分配到mapper的内存中时,我们可以获得效果明显的性能提高,只要通过将小的数据源拷贝到每一台mapper机器上,使mapper在map阶段就进行join操作。这个操作就叫做replicate join。

解决方案:

Hadoop有一个叫做分布式缓存(distributed cache)的机制来将数据分发到集群上的所有节点上。它通常用来分发所有mapper需要的包含“background”数据的文件。例如你使用Hadoop来分类文档,你可能会有一个关键字的列表,你将使用distributed cache来保证所有mapper能够获得这些keywords("background data")。
操作步骤:

1.将数据分发到每个节点上:


  1. DistributedCache.addCacheFile(new Path(args[0]).toUri(), conf);
复制代码

2.在每个mapper上使用DistributedCache.getLocalCacheFiles()来获取文件,之后再进行相应的操作:


  1. DistributedCache.getLocalCacheFiles();
复制代码


新出现的问题:

我们的又一个限制是我们其中一个join的表必须足够小以至于能保存到内存中。尽管在不对称大小的输入数据中,较小的那个数据可能仍然不够小(不够小到可以放入内存中。)
1.我们可以通过重新安排数据加工步骤来使它们有效。例如:如果你需要一个所有用户在415区的排序数据时,在滤除一定记录前就将Orders以及Customers表连接起来虽然正确,但是效率却不高。Customers和Orders表都可能大到不能放入内存中。此时我们可以预处理数据使Customers或者Orders表变小。
2.有时候我们不论怎样预处理数据都不能使数据足够小,那我们应该在map时过滤掉不属于415 area的用户。详见《Hadoop in Action》 Chapter5.2.3 semijoin



synthetic_control.data.zip

118.04 KB, 下载次数: 1

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条