pig2 发表于 2014-3-14 22:00:38

认识hadoop缓存机制DistributedCache

传统开发中有缓存机制,那么hadoop中的缓存机制了解的不是很多,我们会产生的问题如:

DistributedCache是什么?
DistributedCache将job指定的文件,在job执行前,做如何处理?
DistributedCache常见的应用场景是什么?
DistributedCache的有几种使用方式?

howtodown 发表于 2014-3-14 22:12:54

DistributedCache


DistributedCache 可将具体应用相关的、大尺寸的、只读的文件有效地分布放置。


DistributedCache 是Map/Reduce框架提供的功能,能够缓存应用程序所需的文件 (包括文本,档案文件,jar文件等)。


应用程序在JobConf中通过url(hdfs://)指定需要被缓存的文件。 DistributedCache假定由hdfs://格式url指定的文件已经在 FileSystem上了。


Map-Redcue框架在作业所有任务执行之前会把必要的文件拷贝到slave节点上。 它运行高效是因为每个作业的文件只拷贝一次并且为那些没有文档的slave节点缓存文档。


DistributedCache 根据缓存文档修改的时间戳进行追踪。 在作业执行期间,当前应用程序或者外部程序不能修改缓存文件。


distributedCache可以分发简单的只读数据或文本文件,也可以分发复杂类型的文件例如归档文件和jar文件。归档文件(zip,tar,tgz和tar.gz文件)在slave节点上会被解档(un-archived)。 这些文件可以设置执行权限。


用户可以通过设置mapred.cache.{files|archives}来分发文件。 如果要分发多个文件,可以使用逗号分隔文件所在路径。也可以利用API来设置该属性: DistributedCache.addCacheFile(URI,conf)/ DistributedCache.addCacheArchive(URI,conf) and DistributedCache.setCacheFiles(URIs,conf)/ DistributedCache.setCacheArchives(URIs,conf) 其中URI的形式是 hdfs://host:port/absolute-path#link-name 在Streaming程序中,可以通过命令行选项 -cacheFile/-cacheArchive 分发文件。


用户可以通过 DistributedCache.createSymlink(Configuration)方法让DistributedCache 在当前工作目录下创建到缓存文件的符号链接。 或者通过设置配置文件属性mapred.create.symlink为yes。 分布式缓存会截取URI的片段作为链接的名字。 例如,URI是 hdfs://namenode:port/lib.so.1#lib.so, 则在task当前工作目录会有名为lib.so的链接, 它会链接分布式缓存中的lib.so.1。


DistributedCache可在map/reduce任务中作为 一种基础软件分发机制使用。它可以被用于分发jar包和本地库(native libraries)。 DistributedCache.addArchiveToClassPath(Path, Configuration)和 DistributedCache.addFileToClassPath(Path, Configuration) API能够被用于 缓存文件和jar包,并把它们加入子jvm的classpath。也可以通过设置配置文档里的属性 mapred.job.classpath.{files|archives}达到相同的效果。缓存文件可用于分发和装载本地库。

pig2 发表于 2014-3-14 22:05:50

本帖最后由 pig2 于 2014-3-14 22:07 编辑

一.DistributedCache介绍:

DistributedCache是hadoop框架提供的一种机制,可以将job指定的文件,在job执行前,先行分发到task执行的机器上,并有相关机制对cache文件进行管理.

二.常见的应用场景有:

分发第三方库(jar,so等);分发算法需要的词典文件;分发程序运行需要的配置;分发多表数据join时小表数据简便处理等

主要的注意事项有:

1.DistributedCache只能应用于分布式的情况,包括伪分布式,完全分布式.有些api在这2种情况下有移植性问题.


2.需要分发的文件,必须提前放到hdfs上.默认的路径前缀是hdfs://的,不是file://


3.需要分发的文件,最好在运行期间是只读的.


4.不建议分发较大的文件,比如压缩文件,可能会影响task的启动速度.


三、DistributedCache的相关配置
MRv1
属性名默认值备注
mapred.local.dir${hadoop.tmp.dir}/mapred/localThe local directory where MapReduce stores intermediate data files. May be a comma-separated list of directories on different devices in order to spread disk i/o. Directories that do not exist are ignored.
local.cache.size10737418240(10G)The number of bytes to allocate in each local TaskTracker directory for holding Distributed Cache data.
mapreduce.tasktracker.cache.local.numberdirectories10000The maximum number of subdirectories that should be created in any particular distributed cache store. After this many directories have been created, cache items will be expunged regardless of whether the total size threshold has been exceeded.
mapreduce.tasktracker.cache.local.keep.pct0.95(作用于上面2个参数)It is the target percentage of the local distributed cache that should be kept in between garbage collection runs. In practice it will delete unused distributed cache entries in LRU order until the size of the cache is less than mapreduce.tasktracker.cache.local.keep.pct of the maximum cache size. This is a floating point value between 0.0 and 1.0. The default is 0.95.

MRv2
新的yarn架构的代码还没有看过,不过从配置里可以看出相关的如下配置,本文主要基于MRv1.
yarn.nodemanager.local-dirs
yarn.nodemanager.delete.debug-delay-sec
yarn.nodemanager.local-cache.max-files-per-directory
yarn.nodemanager.localizer.cache.cleanup.interval-ms
yarn.nodemanager.localizer.cache.target-size-mb

四、DistributedCache的使用方式1.通过配置
可以配置这三个属性值:
mapred.cache.files,
mapred.cache.archives,
mapred.create.symlink (值设为yes 如果要建link的话)
如果要分发的文件有多个的话,要以逗号分隔(貌似在建link的时候,逗号分隔前后还不能有空格,否则会报错)

2.使用命令行
在pipes和streaming里面可能会用到
-filesSpecify comma-separated files to be copied to the Map/Reduce cluster
-libjarsSpecify comma-separated jar files to include in the classpath
-archivesSpecify comma-separated archives to be unarchived on the compute machines

例如:
-files hdfs://host:fs_port/user/testfile.txt
-files hdfs://host:fs_port/user/testfile.txt#testfile
-files hdfs://host:fs_port/user/testfile1.txt,hdfs://host:fs_port/user/testfile2.txt
-archives hdfs://host:fs_port/user/testfile.jar
-archives hdfs://host:fs_port/user/testfile.tgz#tgzdir
3.代码调用
DistributedCache.addCacheFile(URI,conf) / DistributedCache.addCacheArchive(URI,conf)
DistributedCache.setCacheFiles(URIs,conf) / DistributedCache.setCacheArchives(URIs,conf)
如果要建link,需要增加DistributedCache.createSymlink(Configuration)

获取cache文件可以使用
getLocalCacheFiles(Configuration conf)getLocalCacheArchives(Configuration conf)

代码调用常常会有各样的问题,一般我比较倾向于通过createSymlink的方式来使用,就把cache当做当前目录的文件来操作,简单很多.
常见的通过代码来读取cache文件的问题如下:
a.getLocalCacheFiles在伪分布式情况下,常常返回null.
b.getLocalCacheFiles其实是把DistributedCache中的所有文件都返回.需要自己筛选出所需的文件.archives也有类似的问题.
c.getLocalCacheFiles返回的是tt机器本地文件系统的路径,使用的时候要注意,因为很多地方默认的都是hdfs://,可以自己加上file://来避免这个问题

4.symlink
给分发的文件,在task运行的当前工作目录建立软连接,在使用起来的时候会更方便.没有上面的各种麻烦
mapred.create.symlink 需要设置为yes,不是true或Y之类哦

5.实际文件存放情况
下图显示的为tt机器上实际文件的状况 (只有yarn集群的截图)



五、DistributedCache的内部基本流程
1.每个tasktracker启动时,都会产生一个TrackerDistributedCacheManager对象,用来管理该tt机器上所有的task的cache文件.
2.在客户端提交job时,在JobClient内,对即将cache的文件,进行校验
   以确定文件是否存在,文件的大小,文件的修改时间,以及文件的权限是否是private or public.
3.当task在tt初始化job时,会由TrackerDistributedCacheManager产生一个TaskDistributedCacheManager对象,来管理本task的cache文件.
4.和本task相关联的TaskDistributedCacheManager,获取并解压相关cache文件到本地相应目录
   如果本tt机器上已经有了本job的其他task,并已经完成了相应cache文件的获取和解压工作,则不会重复进行.
   如果本地已经有了cache文件,则比较修改时间和hdfs上的文件是否一致,如果一致则可以使用.
5.当task结束时,会对该cache进行ref减一操作.
6.TrackerDistributedCacheManager有一个clearup线程,每隔1min会去处理那些无人使用的,目录大小大于local.cache.size或者子目录个数大于mapreduce.tasktracker.cache.local.numberdirectories的cache目录.

hyj 发表于 2014-3-14 22:10:44

DistributedCache 是一个提供给Map/Reduce框架的工具,用来缓存文件(text, archives, jars and so on)文件的默认访问协议为(hdfs://).

DistributedCache将拷贝缓存的文件到Slave节点在任何Job在节点上执行之前。
文件在每个Job中只会被拷贝一次,缓存的归档文件会被在Slave节点中解压缩。


每个存储在HDFS中的文件被放到缓存中后都可以通过一个符号链接使用。
URI hdfs://namenode/test/input/file1#myfile 你可以在程序中直接使用myfile来访问 file1这个文件。 myfile是一个符号链接文件。

471505881qq 发表于 2014-4-1 11:39:00


谢谢楼主的分享!

Joker 发表于 2015-1-7 17:16:47

博主你好,我在缓存Jar文件时候,在提交Job时候,却说找不到此类,但是我进行缓存了,加入classpath中
                DistributedCache.addFileToClassPath(new Path("hdfs://xx.xx.xx.x:9000/lib/mysql-connector-java-5.1.13-bin.jar"), job.getConfiguration());


请问,有什么解决方法吗?

howtodown 发表于 2015-1-7 18:45:38

Joker 发表于 2015-1-7 17:16
博主你好,我在缓存Jar文件时候,在提交Job时候,却说找不到此类,但是我进行缓存了,加入classpath中


...

记得需要写在Job类初始化之前,否则在运行会中找不到文件,因为Job初始化时将传入Configuration对象克隆一份给了JobContext。

rocky2015 发表于 2015-12-13 18:19:50

学习了·

qw409237780 发表于 2017-10-13 15:46:32

楼主你好,不管我用新版的还是旧版的api,都报出错误,我用的是伪分布式的,想实现的功能是map端join连接,报出了如下错误java.lang.Exception: java.io.FileNotFoundException: hdfs:/localhost:9000/user/raoc/peopleandgood/input/People (没有那个文件或目录)

URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration());这个方法是取到了我们在JobDriver中所传进去的文件url,但是在map的setup方法中,加载这个文件的url时,报出了上面这个错误。
以下是map端的代码

public class JoinMapper extends Mapper<Text, Text, Text, Text> {
        private Hashtable<String, String> joinData = new Hashtable<>();
       
        protected void setup(Context context) throws IOException, InterruptedException {
                URI[] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration());
                System.out.println(cacheFiles==null);
                if(cacheFiles != null && cacheFiles.length>0){
                        String line;
                        String[] tokens;
                        System.out.println(cacheFiles.toString());
                        BufferedReader joinReader = new BufferedReader(new FileReader(cacheFiles.toString()));
                        try{
                                while((line=joinReader.readLine())!=null){
                                        tokens=line.split(",");
                                        joinData.put(tokens, new String(tokens+","+tokens));
                                }
                        }finally {
                                joinReader.close();
                        }
                }
        }
       
        public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
                String joinValue = joinData.get(key.toString());
//                System.out.println("***key"+key.toString()+"****value"+value.toString());
//                System.out.println(joinValue);
                if(joinValue!=null){
                        context.write(key, new Text(joinValue+","+value.toString()));
                }
        }

}



JobDriver端的代码:

public class JoinDriver {

        public static void main(String[] args) throws Exception {
                Configuration conf = new Configuration();
                conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
                URI url =new URI(args);
                DistributedCache.addCacheFile(url, conf);
                Job job = Job.getInstance(conf, "JobName");
                job.setJarByClass(mapper_Package.JoinDriver.class);
                // TODO: specify a mapper
                job.setMapperClass(JoinMapper.class);
                // TODO: specify a reducer
                job.setNumReduceTasks(0);       
                // TODO: specify output types
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Text.class);
                job.setInputFormatClass(KeyValueTextInputFormat.class);
                // TODO: specify input and output DIRECTORIES (not files)
                FileInputFormat.setInputPaths(job, new Path(args));
                FileOutputFormat.setOutputPath(job, new Path(args));
               

                if (!job.waitForCompletion(true))
                        return;
        }

}


请楼主指点,谢谢啦

qw409237780 发表于 2017-10-13 17:55:06

Mapper代码:
public class JoinMapper extends Mapper<Text, Text, Text, Text> {
        private Hashtable<String, String> joinData = new Hashtable<>();
       
        protected void setup(Context context) throws IOException, InterruptedException {
                Path[] cacheFiles = context.getLocalCacheFiles();
                System.out.println(cacheFiles==null);
                if(cacheFiles != null && cacheFiles.length>0){
                        String line;
                        String[] tokens;
                        System.out.println(cacheFiles.toString());
                        BufferedReader joinReader = new BufferedReader(new FileReader(cacheFiles.toString()));
                        try{
                                while((line=joinReader.readLine())!=null){
                                        tokens=line.split(",");
                                        joinData.put(tokens, new String(tokens+","+tokens));
                                }
                        }finally {
                                joinReader.close();
                        }
                }
        }
       
        public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
                String joinValue = joinData.get(key.toString());
//                System.out.println("***key"+key.toString()+"****value"+value.toString());
//                System.out.println(joinValue);
                if(joinValue!=null){
                        context.write(key, new Text(joinValue+","+value.toString()));
                }
        }

}




JobDriver端代码:
public class JoinDriver {

        public static void main(String[] args) throws Exception {
                Configuration conf = new Configuration();
                conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
                URI url =new URI(args);
                Job job = Job.getInstance(conf, "JobName");
                job.addCacheFile(url);
//                job.createSymlink();
                job.setJarByClass(mapper_Package.JoinDriver.class);
                // TODO: specify a mapper
                job.setMapperClass(JoinMapper.class);
                // TODO: specify a reducer
                job.setNumReduceTasks(0);       
                // TODO: specify output types
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Text.class);
                job.setInputFormatClass(KeyValueTextInputFormat.class);
                // TODO: specify input and output DIRECTORIES (not files)
                FileInputFormat.setInputPaths(job, new Path(args));
                FileOutputFormat.setOutputPath(job, new Path(args));
               

                if (!job.waitForCompletion(true))
                        return;
        }

}

使用了新的api,同时context.getocalCache(),在完全分布式的环境下,bug被解决,但是在伪分布的模式下,问题还是存在
页: [1]
查看完整版本: 认识hadoop缓存机制DistributedCache