问题导读
1.为何HDFS 中的块如此之大?
2.对分布式文件系统中的块进行抽象会带来哪些好处?
3.hadoop集群通过什么工具可以导入本地文件或则关系数据库数据?
4.hadoop是否支持随机读写?
当数据集的大小超过一台独立的物理计算机的存储能力时,就有必要对它进行分区(partition)并存储到若干台单独的计算机上。管理网络中跨多台计算机存储的文件系统称为分布式文件系统(distributed filesystem)。该系统架构于网络之上,势必会引入网络编程的复杂性,因此分布式文件系统比普通磁盘文件系统更为复杂。例如,使文件系统能够容忍节点故障且不丢失任何数据,就是一个极大的挑战。
Hadoop有一个称为HDFS的分布式系统,即Hadoop Distributed Filesystem。在非正式文档或旧文档以及配置文件中,有时也简称为DFS,它们是一回事儿。HDFS是Hadoop的旗舰级文件系统,同时也是本章的重点,但实际上Hadoop是一个综合性的文件系统抽象,因此下面我们也将看到Hadoop集成其他文件系统的方法(如本地文件系统和Amazon S3系统)。
1.1 HDFS的设计
HDFS以流式数据访问模式来存储超大文件,运行于商用硬件集群上。让我们仔细看看下面的描述。
超大文件 “超大文件”在这里指具有几百MB、几百GB甚至几百TB大小的文件。目前已经有存储PB级数据的Hadoop 集群了。 流式数据访问 HDFS的构建思路是这样的:一次写入、多次读取是最高效的访问模式。数据集通常由数据源生成或从数据源复制而来,接着长时间在此数据集上进行各种分析。每次分析都将涉及该数据集的大部分数据甚至全部,因此读取整个数据集的时间延迟比读取第一条记录的时间延迟更重要。 商用硬件 Hadoop并不需要运行在昂贵且高可靠的硬件上。它是设计运行在商用硬件(在各种零售店都能买到的普通硬件)的集群上的,因此至少对于庞大的集群来说,节点故障的几率还是非常高的。HDFS遇到上述故障时,被设计成能够继续运行且不让用户察觉到明显的中断。同样,那些不适合在HDFS上运行的应用也值得研究。目前某些应用领域并不适合在HDFS上运行,不过以后可能会有所改进。 低时间延迟的数据访问要求低时间延迟数据访问的应用,例如几十毫秒范围,不适合在HDFS上运行。记住,HDFS是为高数据吞吐量应用优化的,这可能会以提高时间延迟为代价。目前,对于低延迟的访问需求,HBase(参见第12 章)是更好的选择。 大量的小文件 由于namenode将文件系统的元数据存储在内存中,因此该文件系统所能存储的文件总数受限于namenode的内存容量。根据经验,每个文件、目录和数据块的存储信息大约占150字节。因此,举例来说,如果有一百万个文件,且每个文件占一个数据块,那至少需要300 MB 的内存。尽管存储上百万个文件是可行的,但是存储数十亿个文件就超出了当前硬件的能力。 多用户写入,任意修改文件 HDFS中的文件可能只有一个writer,而且写操作总是将数据添加在文件的末尾。它不支持具有多个写入者的操作,也不支持在文件的任意位置进行修改。可能以后会支持这些操作,但它们相对比较低效。
1.2 HDFS的概念
1.2.1 数据块
每个磁盘都有默认的数据块大小,这是磁盘进行数据读/写的最小单位。构建于单个磁盘之上的文件系统通过磁盘块来管理该文件系统中的块,该文件系统块的大小可以是磁盘块的整数倍。文件系统块一般为几千字节,而磁盘块一般为512字节。这些信息——文件系统块大小——对于需要读/写文件的文件系统用户来说是透明的。尽管如此,系统仍然提供了一些工具(如df和fsck)来维护文件系统,由它们对文件系统中的块进行操作。
HDFS同样也有块(block)的概念,但是大得多,默认为64 MB。与单一磁盘上的文件系统相似,HDFS上的文件也被划分为块大小的多个分块(chunk),作为独立的存储单元。但与其他文件系统不同的是,HDFS中小于一个块大小的文件不会占据整个块的空间。如果没有特殊指出,本书中提到的“块”特指HDFS中的块。
为何HDFS 中的块如此之大?
HDFS的块比磁盘的块大,其目的是为了最小化寻址开销。如果块设置得足够大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。因而,传输一个由多个块组成的文件的时间取决于磁盘传输速率。
我们来做一个速算,如果寻址时间约为10ms,而传输速率为100 MB/s,为了使寻址时间仅占传输时间的1%,我们要将块大小设置约为100 MB。默认的块大小实际为64 MB,但是很多情况下HDFS使用128 MB的块设置。以后随着新一代磁盘驱动器传输速率的提升,块的大小将被设置得更大。
但是这个数也不会设置得过大。MapReduce中的map任务通常一次只处理一个块中的数据,因此如果任务数太少(少于集群中的节点数量),作业的运行速度就会比较慢。
对分布式文件系统中的块进行抽象会带来很多好处。
第一个最明显的好处是,一个文件的大小可以大于网络中任意一个磁盘的容量。文件的所有块并不需要存储在同一个磁盘上,因此它们可以利用集群上的任意一个磁盘进行存储。事实上,尽管不常见,但对于整个HDFS集群而言,也可以仅存储一个文件,该文件的块占满集群中所有的磁盘。
第二个好处是,使用抽象块而非整个文件作为存储单元,大大简化了存储子系统的设计。简化是所有系统的目标,但是这对于故障种类繁多的分布式系统来说尤为重要。将存储子系统控制单元设置为块,可简化存储管理(由于块的大小是固定的,因此计算单个磁盘能存储多少个块就相对容易)。同时也消除了对元数据的顾虑(块只是存储数据的一部分——而文件的元数据,如权限信息,并不需要与块一同存储,这样一来,其他系统就可以单独管理这些元数据)。
不仅如此,块还非常适合用于数据备份进而提供数据容错能力和提高可用性。将每个块复制到少数几个独立的机器上(默认为3个),可以确保在块、磁盘或机器发生故障后数据不会丢失。如果发现一个块不可用,系统会从其他地方读取另一个复本,而这个过程对用户是透明的。一个因损坏或机器故障而丢失的块可以从其他候选地点复制到另一台可以正常运行的机器上,以保证复本的数量回到正常水平。参见4.1节对数据完整性的讨论,进一步了解如何应对数据损坏。同样,有些应用程序可能选择为一些常用的文件块设置更高的复本数量进而分散集群中的读取负载。
与磁盘文件系统相似,HDFS中fsck指令可以显示块信息。例如,执行以下命令将列出文件系统中各个文件由哪些块构成(参见10.1.4.2节):
% hadoop fsck / -files -blocks 复制代码
1.2.2 namenode和datanode
和多个datanode(工作者)。namenode管理文件系统的命名空间。它维护着文件系统树及整棵树内所有的文件和目录。这些信息以两个文件形式永久保存在本地磁盘上:命名空间镜像文件和编辑日志文件。namenode也记录着每个文件中各个块所在的数据节点信息,但它并不永久保存块的位置信息,因为这些信息会在系统启动时由数据节点重建。
客户端(client)代表用户通过与namenode和datanode交互来访问整个文件系统。客户端提供一个类似于POSIX(可移植操作系统界面)的文件系统接口,因此用户在编程时无需知道namenode和datanode也可实现其功能。
datanode是文件系统的工作节点。它们根据需要存储并检索数据块(受客户端或namenode调度),并且定期向namenode发送它们所存储的块的列表。
没有namenode,文件系统将无法使用。事实上,如果运行namenode服务的机器毁坏,文件系统上所有的文件将会丢失,因为我们不知道如何根据datanode的块重建文件。因此,对namenode实现容错非常重要,Hadoop为此提供两种机制。
第一种机制是备份那些组成文件系统元数据持久状态的文件。Hadoop可以通过配置使namenode在多个文件系统上保存元数据的持久状态。这些写操作是实时同步的,是原子操作。一般的配置是,将持久状态写入本地磁盘的同时,写入一个远程挂载的网络文件系统(NFS)。
另一种可行的方法是运行一个辅助namenode,但它不能被用作namenode。这个辅助namenode的重要作用是定期通过编辑日志合并命名空间镜像,以防止编辑日志过大。这个辅助namenode一般在另一台单独的物理计算机上运行,因为它需要占用大量CPU时间与namenode相同容量的内存来执行合并操作。它会保存合并后的命名空间镜像的副本,并在namenode发生故障时启用。但是,辅助namenode保存的状态总是滞后于主节点,所以在主节点全部失效时,难免会丢失部分数据。在这种情况下,一般把存储在NFS上的namenode元数据复制到辅助namenode并作为新的主namenode运行。
详情参见10.1.1节对文件系统镜像与编辑日志的讨论。
1.2.3 联邦HDFS
namenode在内存中保存文件系统中每个文件和每个数据块的引用关系,这意味着对于一个拥有大量文件的超大集群来说,内存将成为限制系统横向扩展的瓶颈(参见9.4.2节)。在2.x发行版本系列中引入的联邦HDFS允许系统通过添加namenode实现扩展,其中每个namenode管理文件系统命名空间中的一部分。例如,一个namenode可能管理/user目录下的所有文件,而另一个namenode可能管理/share目录下的所有文件。
在联邦环境下,每个namenode维护一个命名空间卷(namespace volume),包括命名空间的源数据和在该命名空间下的文件的所有数据块的数据块池。命名空间卷之间是相互独立的,两两之间并不相互通信,甚至其中一个namenode的失效也不会影响由其他namenode维护的命名空间的可用性。数据块池不再进行切分,因此集群中的datanode需要注册到每个namenode,并且存储着来自多个数据块池中的数据块。
要想访问联邦HDFS集群,客户端需要使用客户端挂载数据表将文件路径映射到namenode。该功能可以通过ViewFileSystem和viewfs://URI进行配置和管理。
1.2.4 HDFS的高可用性
通过联合使用在多个文件系统中备份namenode的元数据和通过备用namenode创建监测点能防止数据丢失,但是依旧无法实现文件系统的高可用性。Namenode依旧存在单点失效(SPOF)的问题。如果namenode失效了,那么所有的客户端——包括MapReduce作业——均无法读、写或列 (list)文件,因为namenode是唯一存储元数据与文件到数据块映射的地方。在这一情况下,Hadoop系统无法提供服务直到有新的namenode上线。
在这样的情况下,要想从一个失效的namenode恢复,系统管理员得启动一个拥有文件系统元数据副本的新的namenode,并配置datanode和客户端以便使用这个新的namenode。新的namenode直到满足以下情形才能响应服务:1)将命名空间的映像导入内存中;2)重做编辑日志;3)接收到足够多的来自datanode的数据块报告并退出安全模式。对于一个大型并拥有大量文件和数据块的集群,namenode的冷启动需要30分钟,甚至更长时间。
系统恢复时间太长,也会影响到日常维护。事实上,namenode失效的可能性非常低,所以在实际应用中计划系统失效时间就显得尤为重要。
Hadoop的2.x发行版本系列针对上述问题在HDFS中增加了对高可用性(HA)的支持。在这一实现中,配置了一对活动-备用(active-standby) namenode。当活动namenode失效,备用namenode就会接管它的任务并开始服务于来自客户端的请求,不会有任何明显中断。实现这一目标需要在架构上做如下修改。
namenode之间需要通过高可用的共享存储实现编辑日志的共享。(在早期的高可用实现版本中,需要一个NFS过滤器来辅助实现,但是在后期版本中将提供更多的选择,比如构建于ZooKeeper之上的BookKeeper这样的系统。)当备用namenode接管工作之后,它将通读共享编辑日志直至末尾,以实现与活动namenode的状态同步,并继续读取由活动namenode写入的新条目。 datanode需要同时向两个namenode发送数据块处理报告,因为数据块的映射信息存储在namenode的内存中,而非磁盘。 客户端需要使用特定的机制来处理namenode的失效问题,这一机制对用户是透明的。
在活动namenode失效之后,备用namenode能够快速(几十秒的时间)实现任务接管,因为最新的状态存储在内存中:包括最新的编辑日志条目和最新的数据块映射信息。实际观察到的失效时间略长一点(需要1分钟左右),这是因为系统需要保守确定活动namenode是否真的失效了。
在活动namenode失效且备用namenode也失效的情况下,当然这类情况发生的概率非常低,管理员依旧可以申明一个备用namenode并实现冷启动。这类情况并不会比非高可用(no-HA)的情况更差,并且从操作的角度讲这是一个进步,因为上述处理已是一个标准的处理过程并植入Hadoop中。
故障切换与规避
一个称为故障转移控制器(failover_controller)的系统中有一个新实体管理着将活动namenode转移为备用namenode的转换过程。故障转移控制器是可插拔的,但其最初的实现是基于ZooKeeper的并由此确保有且仅有一个活动namenode。每一个namenode运行着一个轻量级的故障转移控制器,其工作就是监视宿主namenode是否失效(通过一个简单的心跳机制实现)并在namenode失效时进行故障切换。
管理员也可以手动发起故障转移,例如在进行日常维护时。这称为“平稳的故障转移”,因为故障转移控制器可以组织两个namenode有序切换角色。
但在非平稳故障转移的情况下,无法确切知道失效namenode是否已经停止运行。例如,在网速非常慢或者网络被分割的情况下,同样也可能激发故障转移,但是先前的活动namenode依然运行着并且依旧是活动namenode。高可用实现做了更进一步的优化,以确保先前活动的namenode不会执行危害系统并导致系统崩溃的操作——该方法称为“规避”(fencing)。系统引入了一系列的规避机制,包括杀死namenode进程,收回访问共享存储目录的权限(通常使用供应商指定的NFS命令),通过远程管理命令以屏蔽相应网络端口。诉诸的最后手段是,先前活动namenode可以通过一个相当形象的称为STONITH(shootthe other node in the head)的技术进行规避,该方法主要通过一个特定的供电单元对相应主机进行断电操作。
客户端的故障切换通过客户端类库实现透明处理。最简单的实现是通过客户端的配置文件实现故障切换的控制。HDFS URI使用一个逻辑主机名,该主机名映射到一对namenode地址(在配置文件中设置),客户端类库会访问每一个namenode地址直至处理完成。
1.3 命令行接口
现在我们通过命令行交互来进一步认识HDFS。HDFS还有很多其他接口,但命令行是最简单的,同时也是许多开发者最熟悉的。
参照附录A中伪分布模式下设置Hadoop的说明,我们先在一台机器上运行HDFS。稍后介绍如何在集群上运行HDFS,以提供伸缩性与容错性。
在我们设置伪分布配置时,有两个属性项需要进一步解释。第一项是fs.default.name,设置为hdfs://localhost/,用于设置Hadoop的默认文件系统。文件系统是由URI指定的,这里我们已使用hdfs URI来配置HDFS为Hadoop的默认文件系统。HDFS的守护程序通过该属性项来确定HDFS namenode的主机及端口。我们将在localhost默认端口8020上运行namenode。这样一来,HDFS客户端可以通过该属性得知namenode在哪里运行进而连接到它。
第二个属性dfs.replication,我们设为1,这样一来,HDFS就不会按默认设置将文件系统块复本设为3。在单独一个datanode上运行时,HDFS无法将块复制到3个datanode上,所以会持续给出块复本不足的警告。设置这个属性之后,就不会再有问题了。
文件系统的基本操作
至此,文件系统已经可以使用了,我们可以执行所有常用的文件系统操作,例如,读取文件,新建目录,移动文件,删除数据,列出目录,等等。可以输入hadoop fs -help命令获取每个命令的详细帮助文件。
首先从本地文件系统将一个文件复制到HDFS:
% hadoop fs -copyFromLocal input/docs/quangle.txt
hdfs://localhost/user/tom/quangle.txt 复制代码
该命令调用Hadoop文件系统的shell命令fs,后者提供了一系列子命令,在这个例子中,我们执行的是-copyFromLocal。本地文件quangle.txt被复制到运行在localhost上的 HDFS实例中,路径为/user/tom/quangle.txt。事实上,我们可以简化命令格式以省略主机的URI并使用默认设置,即省略hdfs://localhost,因为该项已在core-site.xml中指定。
% hadoop fs -copyFromLocal input/docs/quangle.txt /user/tom/quangle.txt 复制代码
我们也可以使用相对路径,并将文件复制到HDFS的home目录中,本例中为/user/tom:
% hadoop fs -copyFromLocal input/docs/quangle.txt quangle.txt 复制代码
我们把文件复制回本地文件系统,并检查是否一致:
% hadoop fs -copyToLocal quangle.txt quangle.copy.txt
% md5 input/docs/quangle.txt quangle.copy.txt
MD5 (input/docs/quangle.txt) = a16f231da6b05e2ba7a339320e7dacd9
MD5 (quangle.copy.txt) = a16f231da6b05e2ba7a339320e7dacd9 复制代码
MD5键值相同,表明这个文件在HDFS之旅中得以幸存并保存完整。 最后,我们看一下HDFS文件列表。我们新建一个目录看它在列表中是怎么显示的:
% hadoop fs -mkdir books
% hadoop fs -ls .
Found 2 items
drwxr-xr-x - tom supergroup 0 2009-04-02 22:41 /user/tom/books
-rw-r--r-- 1 tom supergroup 118 2009-04-02 22:29 /user/tom/quangle.txt 复制代码
返回的结果信息与Unix命令ls -l的输出结果非常相似,仅有细微差别。第1列显示的是文件模式。第2列是这个文件的备份数(这在传统Unix文件系统是没有的)。由于我们在整个文件系统范围内设置的默认复本数为1,所以这里显示的也都是1。这一列的开头目录为空,因为本例中没有使用复本的概念——目录作为元数据保存在namenode中,而非datanode中。第3列和第4列显示文件的所属用户和组别。第5列是文件的大小,以字节为单位,目录为0。第6列和第7列是文件的最后修改日期与时间。最后,第8列是文件或目录的绝对路径。
HDFS中的文件访问权限
针对文件和目录,HDFS的权限模式与POSIX 非常相似。
一共提供三类权限模式:只读权限(r)、写入权限(w)和可执行权限(x)。读取文件或列出目录内容时需要只读权限。写入一个文件或是在一个目录上新建及删除文件或目录,需要写入权限。对于文件而言,可执行权限可以忽略,因为你不能在HDFS中执行文件(与POSIX不同),但在访问一个目录的子项时需要该权限。
每个文件和目录都有所属用户(owner)、所属组别(group)及模式(mode)。这个模式是由所属用户的权限、组内成员的权限及其他用户的权限组成的。
在默认情况下,可以通过正在运行进程的用户名和组名来唯一确定客户端的标识。但由于客户端是远程的,任何用户都可以简单地在远程系统上以其名义新建一个账户来进行访问。因此,作为共享文件系统资源和防止数据意外损失的一种机制,权限只能供合作团体中的用户使用,而不能用于在一个不友好的环境中保护资源。注意,最新版的Hadoop已经支持Kerberos用户认证,该认证去除了这些限制,详见第325页的“安全”小节。但是,除了上述限制之外,为防止用户或自动工具及程序意外修改或删除文件系统的重要部分,启用权限控制还是很重要的(这也是默认的配置,参见dfs.permissions属性)。
如果启用权限检查,就会检查所属用户权限,以确认客户端的用户名与所属用户是否匹配,另外也将检查所属组别权限,以确认该客户端是否是该用户组的成员;若不符,则检查其他权限。
这里有一个超级用户(super-user)的概念,超级用户是namenode进程的标识。对于超级用户,系统不会执行任何权限检查。 1.4 Hadoop文件系统
Hadoop有一个抽象的文件系统概念,HDFS只是其中的一个实现。Java抽象类 org.apache.hadoop.fs.FileSystem定义了Hadoop 中的一个文件系统接口,并且该抽象类有几个具体实现,如表3-1所示。
表3-1. Hadoop文件系统
文件系统 URI方案 Java实现(均包含在org.apache.hadoop包中) 描述 Local file fs.LocalFileSystem 使用了客户端校验和的本地磁盘文件系统。没有使用校验和的本地磁盘文件系统RawLocalFileSystem。详情参见4.1.2节 HDFS hdfs hdfs.DistributedFileSystem Hadoop 的分布式文件系统。将HDFS设计成与MapReduce结合使用,可以实现高性能 HFTP Hftp hdfs.hftpFileSystem 一个在HTTP 上提供对HDFS 只读访问的文件系统(尽管名称为HFTP,但与FTP无关)。通常与distcp结合使用(参见3.8节),以实现在运行不同版本的HDFS的集群之间复制数据 HSFTP hsftp hdfs.HsftpFileSyste 在HTTPS 上提供对HDFS只读访问的文件系统(同上,与FTP 无关) WebHDFS Webhdfs Hdfs.web.WebHdfsFileSystem 基于HTTP,对HDFS提供安全读写访问的文件系统。WebHDFS是为了替代HFTP和HSFTP而构建的 HAR har fs.HarFileSystem 一个构建在其他文件系统之上用于文件存档的文件系统。Hadoop存档文件系统通常用于需要将HDFS 中的文件进行存档时,以减少namenode内存的使用。参见3.9节
hfs(云存储) kfs fs.kfs.kosmosFileSystem CloudStore(其前身为Kosmos文件系统)是类似于HDFS或是谷歌的GFS的文件系统,用C++写。详情参见http://kosmosfs.sourceforge.net/ FTP ftp fs.ftp.FTPFileSystem 由FTP 服务器支持的文件系统 S3(原生) S3n fs.s3native.NativeS3FileSystem 由Amazon S3 支持的文件系统。参见http://wiki.apache.org/hadoop/AmazonS3 S3(基于块) S3 fs.sa.S3FileSystem 由Amazon S3 支持的文件系统,以块格式存储文件(与HDFS 很相似)以解决S3 的5 GB文件大小限制 分布式RAID hdfs hdfs.DistributedRaidFileSystem RAID版本的HDFS是为了存档而设计的。针对HDFS中的每个文件,创建一个(更小的)校验文件,并允许HDFS中的数据副本由3降为2,由此可以减少25%~30%的存储空间,但是数据丢失的概率保持不变。分布式RAID模式需要在集群中运行一个RaidNode后台进程 View viewfs viewfs.ViewFileSystem 针对其他Hadoop文件系统挂载的客户端表。通常用于联邦namenode创建挂载点。详情参见3.2.3节。
Hadoop 对文件系统提供了许多接口,它一般使用URI 方案来选取合适的文件系统实例进行交互。举例来说,我们在前一小节中遇到的文件系统命令行解释器可以操作所有的Hadoop 文件系统命令。要想列出本地文件系统根目录下的文件,可以输入以下命令:
复制代码
尽管运行的MapReduce程序可以访问任何文件系统(有时也很方便),但在处理大数据集时,建议你还是选择一个有数据本地优化的分布式文件系统,如HDFS(参见2.4节)。
接口
Hadoop是用Java写的,通过Java API可以调用所有Hadoop文件系统的交互操作。例如,文件系统的命令解释器就是一个Java 应用,它使用Java 的FileSystem类来提供文件系统操作。其他一些文件系统接口也将在本小节中做简单介绍。这些接口通常与HDFS一同使用,因为Hadoop中的其他文件系统一般都有访问基本文件系统的工具(对于FTP,有FTP客户端;对于S3,有S3工具,等等),但它们大多数都能用于任何Hadoop 文件系统。
1. HTTP
通过HTTP来访问HDFS有两种方法:直接访问,HDFS后台进程直接服务于来自客户端的请求;通过代理(一个对多个)访问,客户端通常使用DistributedFileSystem API访问HDFS。这两种方法如图3-1所示。
图3-1. 通过HTTP直接访问HDFS或者通过多个HDFS代理访问HDFS
在第一种情况中,由namenode内嵌的web服务器(运行在端口50070上)提供目录服务,目录列表以XML或者JSON格式存储,并且文件数据由datanode的web服务器(运行在端口50075上)以数据流的形式传输。
原来那个的HTTP接口(HFTP和HSFTP)是只读的,但是新的WebHDFS实现支持所有的文件系统操作,包括Kerberos认证。WebHDFS必须通过将dfs.webhdfs.enalbe选项设置为真后才能启用,并且只有启用它之后,你才可以使用webhdfs URI。
第二种方法依靠一个或者多个独立代理服务器通过HTTP访问HDFS。(由于代理服务是无状态的,因此可以运行在标准的负载均衡器之后。)所有到集群的网络通信都需要经过代理。使用代理服务器后可以使用更严格的防火墙策略和带宽限制策略。通常情况下通过代理服务器,实现在不同数据中心中部署的Hadoop集群之间的数据传输。
原来那个的HDFS代理服务器(在src/contrib/hdfsproxy)是只读的并且客户端使用HSFTP Filesystem实现(hsftp URI)进行访问。从1.0.0版本开始,实现了一个称为HttpFS的新代理服务器(具备读和写的能力),并且提供了和WebHDFS的一样的HTTP接口,因此客户端可以通过webhdfsURI访问这两类接口。
在规范正式定义了WebHDFS中使用的HTTP REST API,以期望以后使用非Java语言编写的客户端有望直接使用这个API。
2. C语言
Hadoop提供了一个名为libhdfs的C语言库,该语言库是Java,FileSystem接口类的一个镜像(它被写成访问HDFS的C语言库,但其实它可以访问全部Hadoop文件系统)。它使用Java原生接口(Java Native Interface,JNI)调用Java 文件系统客户端。
这个C语言API 与Java的API非常相似,但它的开发一般滞后于Java API,因此目前一些新的特性可能还不支持。可以在Hapdoop发行包的Libhdfs/docs/api目录找到CAPI的相关文档。
3. FUSE
用户空间文件系统(Filesystem in Userspace,FUSE)允许把按照用户空间实现的文件系统整合成一个Unix文件系统。通过使用Hadoop的Fuse-DFS功能模块,任何一个Hadoop 文件系统(不过一般为HDFS)均可以作为一个标准文件系统进行挂载。随后便可以使用Unix工具(如ls和cat)与该文件系统交互,还可以通过任意一种编程语言调用POSIX 库来访问文件系统。
Fuse-DFS是用C语言实现的,调用libhdfs并作为访问HDFS的接口。关于如何编译和运行Fuse-DFS的文档,可以在Hadoop发行版的src/contrib./fuse-dfs目录中找到。
1.5 Java接口
在本小节中,我们要深入探索Hadoop的Filesystem类:它是与Hadoop的某一文件系统进行交互的API。虽然我们主要聚焦于HDFS实例,即DistributedFileSystem,但总体来说,还是应该集成FileSystem抽象类,并编写代码,使其在不同文件系统中可移植。这对测试你编写的程序非常重要,例如,你可以使用本地文件系统中的存储数据快速进行测试。
1.5.1 从Hadoop URL读取数据
要从Hadoop 文件系统读取文件,最简单的方法是使用java.net.URL对象打开数据流,从中读取数据。具体格式如下:
InputStream in = null;
try {
in = new URL("hdfs://host/path").openStream();
// process in
} finally {
IOUtils.closeStream(in);
} 复制代码
让Java程序能够识别Hadoop的hdfs URL方案还需要一些额外的工作。这里采用的方法是通过FsUrlStreamHandlerFactory实例调用java.net.URL对象的setURLStreamHandlerFactory方法。每个Java虚拟机只能调用一次这个方法,因此通常在静态方法中调用。这个限制意味着如果程序的其他组件——如不受你控制的第三方组件——已经声明一个URLStreamHandlerFactory实例,你将无法使用这种方法从Hadoop中读取数据。下一节将讨论另一种备选方法。
范例3-1展示的程序以标准输出方式显示Hadoop文件系统中的文件,类似于Unix中的cat命令。
范例3-1. 通过URLStreamHandler实例以标准输出方式显示Hadoop文件系统的文件
public class URLCat {
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) throws Exception {
InputStream in = null;
try {
in = new URL(args[0]).openStream();
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
} 复制代码
我们可以调用Hadoop中简洁的IOUtils类,并在finally子句中关闭数据流,同时也可以在输入流和输出流之间复制数据(本例中为System.out)。copyBytes方法的最后两个参数,第一个设置用于复制的缓冲区大小,第二个设置复制结束后是否关闭数据流。这里我们选择自行关闭输入流,因而System.out不必关闭输入流。
下面是一个运行示例:
% hadoop URLCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat. 复制代码
1.5.2 通过FileSystem API读取数据
正如前一小节所解释的,有时根本不可能在应用中设置URLStreamHandlerFactory实例。在这种情况下,需要使用FileSystem API来打开一个文件的输入流。
Hadoop文件系统中通过Hadoop Path对象(而非java.io.File对象,因为它的语义与本地文件系统联系太紧密)来代表文件。可以将路径视为一个Hadoop文件系统URI,如hdfs://localhost/user/tom/quangle.txt。
FileSystem是一个通用的文件系统API,所以第一步是检索我们需要使用的文件系统实例,这里是HDFS。获取FileSystem实例有下面这几个静态工厂方法:
public static FileSystem get(Configuration conf) throws IOException
Public static FileSystem get(URI uri, Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf, String user)
throws IOException 复制代码
Configuration对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现(如conf/core-site.xml)。第一个方法返回的是默认文件系统(在conf/core-site.xml中指定的,如果没有指定,则使用默认的本地文件系统)。第二个方法通过给定的URI方案和权限来确定要使用的文件系统,如果给定URI中没有指定方案,则返回默认文件系统。第三,作为给定用户来访问文件系统,对安全来说是至关重要。(参见9.6节)。
在某些情况下,你可能希望获取本地文件系统的运行实例,此时你可以使用的getLocal()方法很方便地获取。
public static LocalFileSystem getLocal(Configuration conf) throws IOException 复制代码
有了FileSystem实例之后,我们调用open()函数来获取文件的输入流:
Public FSDataInputStream open(Path f) throws IOException
Public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException
复制代码
第一个方法使用默认的缓冲区大小4 KB。
最后,我们重写范例3-1,得到范例3-2。
范例3-2. 直接使用FileSystem以标准输出格式显示Hadoop文件系统中的文件
public class FileSystemCat {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
InputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
} 复制代码
程序运行结果如下:
% hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat. 复制代码
FSDataInputStream对象
实际上,FileSystem对象中的open()方法返回的是FSDataInputStream对象,而不是标准的java.io类对象。这个类是继承了java.io.DataInputStream接口的一个特殊类,并支持随机访问,由此可以从流的任意位置读取数据。
package org.apache.hadoop.fs;
public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable {
// implementation elided
} 复制代码
Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量(getPos())的查询方法:
public interface Seekable {
void seek(long pos) throws IOException;
long getPos() throws IOException;
boolean seekToNewSource(long targetPos) throws IOException;
} 复制代码
调用seek()来定位大于文件长度的位置会引发IOException异常。与java.io.InputStream的skip()不同,seek()可以移到文件中任意一个绝对位置,skip()则只能相对于当前位置定位到另一个新位置。
范例3-3为范例3-2的简单扩展,它将一个文件写入标准输出两次:在一次写完之后,定位到文件的起始位置再次以流方式读取该文件。
范例3-3. 使用seek()方法,将Hadoop文件系统中的一个文件在标准输出上显示两次
public class FileSystemDoubleCat {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
FSDataInputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
in.seek(0); // go back to the start of the file
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
} 复制代码
在一个小文件上运行的结果如下:
% hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat. 复制代码
FSDataInputStream类也实现了PositionedReadable接口,从一个指定偏移量处读取文件的一部分:
public interface PositionedReadable {
public int read(long position, byte[] buffer, int offset, int length)
throws IOException;
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException;
public void readFully(long position, byte[] buffer) throws IOException;
}
复制代码
read()方法从文件的指定position处读取至多为length字节的数据并存入缓冲区buffer的指定偏离量offset处。返回值是实际读到的字节数:调用者需要检查这个值,它有可能小于指定的length长度。readFully()方法将指定length长度的字节数数据读取到buffer中(或在只接受buffer字节数组的版本中,读取buffer.length长度字节数据),除非已经读到文件末尾,这种情况下将抛出EOFException异常。
所有这些方法会保留文件当前偏移量,并且是线程安全的(FSDataInputStrean并不是为并发访问设计的,因此最好为此新建多个实例),因此它们提供了在读取文件——可能是元数据——的主体时,访问文件其他部分的便利方法。事实上,这只是按照以下模式实现的Seekable 接口。
最后务必牢记,seek()方法是一个相对高开销的操作,需要慎重使用。建议用流数据来构建应用的访问模式 (如使用MapReduce),而非执行大量seek()方法。
1.5.3 写入数据
FileSystem类有一系列新建文件的方法。最简单的方法是给准备建的文件指定一个Path对象,然后返回一个用于写入数据的输出流:
public FSDataOutputStream create(Path f) throws IOException 复制代码
此方法有多个重载版本,允许我们指定是否需要强制覆盖现有的文件、文件备份数量、写入文件时所用缓冲区大小、文件块大小以及文件权限。
create()方法能够为需要写入且当前不存在的文件创建父目录。尽管这样很方便,但有时并不希望这样。如果希望父目录不存在就导致文件写入失败,则应该先调用exists()方法检查父目录是否存在。
还有一个重载方法Progressable用于传递回调接口,如此一来,可以把数据写入datanode的进度通知给应用:
package org.apache.hadoop.util;
public interface Progressable {
public void progress();
} 复制代码
另一种新建文件的方法是使用append()方法在一个已有文件末尾追加数据(还有其他一些重载版本):
public FSDataOutputStream append(Path f) throws IOException 复制代码
这样的追加操作允许一个writer打开文件后在访问该文件的最后偏移量处追加数据。有了这个API,某些应用可以创建无边界文件,例如,应用可以在关闭日志文件之后继续追加日志。该追加操作是可选的,并非所有Hadoop文件系统都实现了该操作。例如,HDFS支持追加[ 在Hadoop 1.x版本中的追加实现存在可靠性问题,因此通常在1.x之后的版本中推荐使用追加,因为这些版本中包含有一个新的、重写的实现。],但S3 文件系统就不支持。
范例3-4 显示了如何将本地文件复制到Hadoop文件系统。每次Hadoop调用progress()方法时——也就是每次将64 KB数据包写入datanode管线后——打印一个时间点来显示整个运行过程。注意,这个操作并不是通过API实现的,因此Hadoop后续版本能否执行该操作,取决于该版本是否修改过上述操作。API只让你知道到“正在发生什么事情”。
范例3-4. 将本地文件复制到Hadoop文件系统
public class FileCopyWithProgress {
public static void main(String[] args) throws Exception {
String localSrc = args[0];
String dst = args[1];
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out = fs.create(new Path(dst), new Progressable() {
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
}
} 复制代码
典型应用如下:
% hadoop FileCopyWithProgress input/docs/1400-8.txt hdfs://localhost/user/tom/1400-8.txt
............... 复制代码
目前,其他Hadoop文件系统写入文件时均不调用progress()方法。后面几章将展示进度对MapReduce应用的重要性。
FSDataOutputStream对象
FileSystem实例的create()方法返回FSDataOutputStream对象,与FSDataInputStream类相似,它也有一个查询文件当前位置的方法:
package org.apache.hadoop.fs;
public class FSDataOutputStream extends DataOutputStream implements Syncable {
public long getPos() throws IOException {
// implementation elided
}
// implementation elided
} 复制代码
但与FSDataInputStream类不同的是,FSDataOutputStream类不允许在文件中定位。这是因为HDFS只允许对一个已打开的文件顺序写入,或在现有文件的末尾追加数据。换句话说,它不支持在除文件末尾之外的其他位置进行写入,因此,写入时定位就没有什么意义。
1.5.4 目录
Filesystem实例提供了创建目录的方法:
public boolean mkdirs(Path f) throws IOException 复制代码
这个方法可以一次性新建所有必要但还没有的父目录,就像java.io.File类的mkdirs()方法。如果目录(以及所有父目录)都已经创建成功,则返回true。
通常,你不需要显式创建一个目录,因为调用create()方法写入文件时会自动创建父目录。
1.5.5 查询文件系统
1. 文件元数据:FileStatus
任何文件系统的一个重要特征都是提供其目录结构浏览和检索它所存文件和目录相关信息的功能。FileStatus类封装了文件系统中文件和目录的元数据,包括文件长度、块大小、复本、修改时间、所有者以及权限信息。
FileSystem的getFileStatus()方法用于获取文件或目录的FileStatus对象。范例3-5显示了它的用法。
范例3-5. 展示文件状态信息
public class ShowFileStatusTest {
private MiniDFSCluster cluster; // use an in-process HDFS cluster for testing
private FileSystem fs;
@Before
public void setUp() throws IOException {
Configuration conf = new Configuration();
if (System.getProperty("test.build.data") == null) {
System.setProperty("test.build.data", "/tmp");
}
cluster = new MiniDFSCluster(conf, 1, true, null);
fs = cluster.getFileSystem();
OutputStream out = fs.create(new Path("/dir/file"));
out.write("content".getBytes("UTF-8"));
out.close();
}
@After
public void tearDown() throws IOException {
if (fs != null) { fs.close(); }
if (cluster != null) { cluster.shutdown(); }
}
@Test(expected = FileNotFoundException.class)
public void throwsFileNotFoundForNonExistentFile() throws IOException {
fs.getFileStatus(new Path("no-such-file"));
}
@Test
public void fileStatusForFile() throws IOException {
Path file = new Path("/dir/file");
FileStatus stat = fs.getFileStatus(file);
assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));
assertThat(stat.isDir(), is(false));
assertThat(stat.getLen(), is(7L));
assertThat(stat.getModificationTime(),
is(lessThanOrEqualTo(System.currentTimeMillis())));
assertThat(stat.getReplication(), is((short) 1));
assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L));
assertThat(stat.getOwner(), is("tom"));
assertThat(stat.getGroup(), is("supergroup"));
assertThat(stat.getPermission().toString(), is("rw-r--r--"));
}
@Test
public void fileStatusForDirectory() throws IOException {
Path dir = new Path("/dir");
FileStatus stat = fs.getFileStatus(dir);
assertThat(stat.getPath().toUri().getPath(), is("/dir"));
assertThat(stat.isDir(), is(true));
assertThat(stat.getLen(), is(0L));
assertThat(stat.getModificationTime(),
is(lessThanOrEqualTo(System.currentTimeMillis())));
assertThat(stat.getReplication(), is((short) 0));
assertThat(stat.getBlockSize(), is(0L));
assertThat(stat.getOwner(), is("tom"));
assertThat(stat.getGroup(), is("supergroup"));
assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));
}
} 复制代码
如果文件或目录均不存在,会抛出一个FileNotFoundException异常。但如果只是想检查文件或目录是否存在,那么调用exists()方法会更方便:
public boolean exists(Path f) throws IOException 复制代码
2. 列出文件
查找一个文件或目录相关的信息很实用,但通常还需要能够列出目录中的内容。这就是FileSystem的listStatus()方法的功能:
public FileStatus[] listStatus(Path f) throws IOException
public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException
public FileStatus[] listStatus(Path[] files) throws IOException
public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException 复制代码
当传入的参数是一个文件时,它会简单转变成以数组方式返回长度为1的FileStatus对象。当传入参数是一个目录时,则返回0或多个FileStatus对象,表示此目录中包含的文件和目录。
它的重载方法允许使用PathFilter来限制匹配的文件和目录——可以参见3.5.5节提供的例子。最后,如果指定一组路径,其执行结果相当于依次轮流传递每条路径并对其调用listStatus()方法,再将FileStatus对象数组累积存入同一数组中,但该方法更为方便。这从文件系统树的不同分支构建输入文件列表时,这是很有用的。范例3-6简单显示了这个方法。注意FileUtil中stat2Paths()方法的使用,它将一个FileStatus对象数组转换为一个Path对象数组。
范例3-6. 显示Hadoop 文件系统中一组路径的文件信息
public class ListStatus {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path[] paths = new Path[args.length];
for (int i = 0; i < paths.length; i++) {
paths[i] = new Path(args[i]);
}
FileStatus[] status = fs.listStatus(paths);
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
} 复制代码
我们可以用这个程序来显示一组路径集目录列表的并集:
% hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom
hdfs://localhost/user
hdfs://localhost/user/tom/books
hdfs://localhost/user/tom/quangle.txt 复制代码
3. 文件模式
在单个操作中处理一批文件是一个很常见的需求。例如,一个用于处理日志的MapReduce作业可能需要分析一个月内包含在大量目录中的日志文件。在一个表达式中使用通配符来匹配多个文件是比较方便的,无需列举每个文件和目录来指定输入,该操作称为“通配”(globbing)。Hadoop为执行通配提供了两个FileSystem方法:
public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException 复制代码
globStatus()方法返回与其路径匹配于指定模式的所有文件的FileStatus对象数组,并按路径排序。PathFilter命令作为可选项可以进一步对匹配结果进行限制。
Hadoop支持的通配符与Unix bash的相同(参见表3-2)。
表3-2. 通配符及其含义
通配符
名称
匹配
*
星号
匹配0 或多个字符
?
问号
匹配单一字符
[ab]
字符类
匹配{a,b}集合中的一个字符
[^ab]
非字符类
匹配非{a,b}集合中的一个字符
[a-b]
字符范围
匹配一个在{a,b}范围内的字符(包括ab),a在字典顺序上要小于或等于b
[^a-b]
非字符范围
匹配一个不在{a,b}范围内的字符(包括ab),a在字典顺序上要小于或等于b
{a,b}
或选择
匹配包含a 或b 中的一个的表达式
\c
转义字符
匹配元字符c
假设有日志文件存储在按日期分层组织的目录结构中。如此一来,2007年最后一天的日志文件就会保存在名为/2007/12/31的目录中。假设整个文件列表如下所示:
/
|—— 2007/
| ┗——12/
| |—— 30/
| ┗——31/
┗——2008/
┗—— 01/
|—— 01/
┗—— 02/ 复制代码
一些文件通配符及其扩展如下所示。
通配符
扩展
/*
/2007/2008
/*/*
/2007/12/2008/01
/*/12/*
/2007/12/30/2007/12/31
/200?
/2007/2008
/200[78]
/2007/2008
/200[7-8]
/2007/2008
/200[^01234569]
/2007/2008
/*/*/{31,01}
/2007/12/31/2008/01/01
/*/*/3{0,1}
/2007/12/30/2007/12/31
/*/{12/31,01/01}
/2007/12/31/2008/01/01
4. PathFilter对象
通配符模式并不总能够精确地描述我们想要访问的文件集。比如,使用通配格式排除一个特定的文件就不太可能。FileSystem中的listStatus()和globStatus()方法提供了可选的PathFilter对象,以编程方式控制通配符:
package org.apache.hadoop.fs;
public interface PathFilter {
boolean accept(Path path);
} 复制代码
PathFilter与java.io.FileFilter一样,是Path对象而不是File对象。
范例3-7显示了PathFilter用于排除匹配正则表达式的路径。
范例3-7. PathFilter,用于排除匹配正则表达式的路径
public class RegexExcludePathFilter implements PathFilter {
private final String regex;
public RegexExcludePathFilter(String regex) {
this.regex = regex;
}
public boolean accept(Path path) {
return !path.toString().matches(regex);
}
} 复制代码
这个过滤器只传递不匹配于正则表达式的文件。在通配符选出一组需要包含的初始文件之后,过滤器可优化其结果。如下示例将扩展到/2007/12/30:
fs.globStatus(new Path("/2007/*/*"), new RegexExcludeFilter("^.*/2007/12/31fs.globStatus(new Path("/2007/*/*"), new RegexExcludeFilter("^.*/2007/12/31$")))) 复制代码
过滤器由Path表示,只能作用于文件名。不能针对文件的属性(例如创建时间)来构建过滤器。但是,通配符模式和正则表达式同样无法对文件属性进行匹配。例如,如果将文件存储在按照日期排列的目录结构中(如前一节中讲述的那样),则可以根据Pathfilter在给定时间范围内选出文件。
过滤器由Path表示,只能作用于文件名。不能针对文件的属性(例如创建时间)来构建过滤器。但是,通配符模式和正则表达式同样无法对文件属性进行匹配。例如,如果将文件存储在按照日期排列的目录结构中(如前一节中讲述的那样),则可以根据Pathfilter在给定时间范围内选出文件。
1.5.6 删除数据
使用FileSystem的delete()方法可以永久性删除文件或目录。
public boolean delete(Path f, boolean recursive) throws IOException
如果f是一个文件或空目录,那么recursive的值就会被忽略。只有在recrusive值为true时,非空目录及其内容才会被删除(否则会抛出IOException异常)。
3.6 数据流
3.6.1 剖析文件读取
为了了解客户端及与之交互的HDFS、namenode和datanode之间的数据流是什么样的,我们可参考图3-2,该图显示了在读取文件时事件的发生顺序。
客户端通过调用FileSyste对象的open()方法来打开希望读取的文件,对于HDFS 来说,这个对象是分布式文件系统(图3-2 中的步骤1)的一个实例。DistributedFileSystem通过使用RPC来调用namenode,以确定文件起始块的位置(步骤2)。对于每一个块,namenode返回存有该块副本的datanode地址。此外,这些datanode根据它们与客户端的距离来排序(根据集群的网络拓扑;参见3.6.1节的的补充材料“网络拓扑与Hadoop”)。如果该客户端本身就是一个datanode (比如,在一个MapReduce任务中),并保存有相应数据块的一个副本本时,该节点就会从本地datanode读取数据(参见图3-2)。
图3-2. 客户端读取HDFS中的数据
DistributedFileSystem类返回一个FSDataInputStream对象(一个支持文件定位的输入流)给客户端并读取数据。FSDataInputStream类转而封装DFSInputStream对象,该对象管理着datanode和namenode的I/O。
接着,客户端对这个输入流调用read()方法(步骤3)。存储着文件起始几个块的datanode地址的DFSInputStream随即连接距离最近的datanode。通过对数据流反复调用read()方法,可以将数据从datanode传输到客户端(步骤4)。到达块的末端时,DFSInputStream关闭与该datanode的连接,然后寻找下一个块的最佳datanode(步骤5)。客户端只需要读取连续的流,并且对于客户端都是透明的。
客户端从流中读取数据时,块是按照打开DFSInputStream与datanode新建连接的顺序读取的。它也会根据需要询问namenode来检索下一批数据块的datanode的位置。一旦客户端完成读取,就对FSDataInputStream调用close()方法(步骤6)。
在读取数据的时候,如果DFSInputStream在与datanode通信时遇到错误,会尝试从这个块的另外一个最邻近datanode读取数据。它也记住那个故障datanode,以保证以后不会反复读取该节点上后续的块。DFSInputStream也会通过校验和确认从datanode发来的数据是否完整。如果发现有损坏的块,就在DFSInputStream试图从其他datanode读取其复本之前通知namenode。
这个设计的一个重点是,namenode告知客户端每个块中最佳的datanode,并让客户端直接连接到该datanode检索数据。由于数据流分散在集群中的所有datanode,所以这种设计能使HDFS可扩展到大量的并发客户端。同时,namenode只需要响应块位置的请求(这些信息存储在内存中,因而非常高效),无需响应数据请求,否则随着客户端数量的增长,namenode会很快成为瓶颈。
网络拓扑与Hadoop
在本地网络中,两个节点被称为“彼此近邻”是什么意思?在海量数据处理中,其主要限制因素是节点之间数据的传输速率——带宽很稀缺。这里的想法是将两个节点间的带宽作为距离的衡量标准。
不用衡量节点之间的带宽——实际上很难实现(它需要一个稳定的集群,并且在集群中两两节点对数量是节点数量的平方)——Hadoop为此采用一个简单的方法:把网络看作一棵树,两个节点间的距离是它们到最近共同祖先的距离总和。该树中的层次是没有预先设定的,但是相对于数据中心、机架和正在运行的节点,通常可以设定等级。具体想法是针对以下每个场景,可用带宽依次递减:
同一节点上的进程 同一机架上的不同节点 同一数据中心中不同机架上的节点 不同数据中心中的节点
例如,假设有数据中心d1机架r1中的节点n1。该节点可以表示为/d1/r1/n1。利用这种标记,这里给出四种距离描述:
distance(/d1/r1/n1, /d1/r1/n1)=0(同一节点上的进程) distance(/d1/r1/n1, /d1/r1/n2)=2(同一机架上的不同节点) distance(/d1/r1/n1, /d1/r2/n3)=4(同一数据中心中不同机架上的节点) distance(/d1/r1/n1, /d2/r3/n4)=6(不同数据中心中的节点)
示意图参见图3-3(数学爱好者会注意到,这是一个测量距离的例子)。最后,我们必须意识到Hadoop无法自行定义网络拓扑结构。它需要我们能够理解并辅助定义,我们将在9.1.1节的“网络拓扑”中讨论如何配置网络拓扑。不过在默认情况下,假设网络是扁平化的只有一层——或换句话说,所有节点都在同一数据中心的同一机架上。规模小的集群可能如此,不需要进一步配置。
图3-3. Hadoop中的网络距离
3.6.2 剖析文件写入
接下来我们看看文件是如何写入HDFS 的。尽管比较详细,但对于理解数据流还是很有用的,因为它清楚地说明了HDFS 的一致模型。
我们要考虑的情况是如何新建一个文件,把数据写入该文件,最后关闭该文件。参见图3-4。
图3-4. 客户端将数据写入HDFS
客户端通过对DistributedFileSystem对象调用create()函数来新建文件(图3-4中的步骤1)。
DistributedFileSystem对namenode创建一个RPC调用,在文件系统的命名空间中新建一个文件,此时该文件中还没有相应的数据块(步骤2)。namenode执行各种不同的检查以确保这个文件不存在以及客户端有新建该文件的权限。如果这些检查均通过,namenode就会为创建新文件记录一条记录;否则,文件创建失败并向客户端抛出一个IOException异常。DistributedFileSystem向客户端返回一个FSDataOutputStream对象,由此客户端可以开始写入数据。就像读取事件一样,FSDataOutputStream封装一个DFSoutPutstream对象,该对象负责处理datanode和namenode之间的通信。
在客户端写入数据时(步骤3),DFSOutputStream将它分成一个个的数据包,并写入内部队列,称为“数据队列”(data queue)。DataStreamer处理数据队列,它的责任是根据datanode列表来要求namenode分配适合的新块来存储数据复本。这一组datanode构成一个管线——我们假设复本数为3,所以管线中有3个节点。DataStreamer将数据包流式传输到管线中第1个datanode,该datanode存储数据包并将它发送到管线中的第2个datanode。同样,第2个datanode存储该数据包并且发送给管线中的第3个(也是最后一个)datanode (步骤4)。
DFSOutputStream也维护着一个内部数据包队列来等待datanode的收到确认回执,称为“确认队列”(ack queue)。收到管道中所有datanode确认信息后,该数据包才会从确认队列删除(步骤5)。
如果在数据写入期间datanode发生故障,则执行以下操作(对写入数据的客户端是透明的)。首先关闭管线,确认把队列中的所有数据包都添加回数据队列的最前端,以确保故障节点下游的datanode不会漏掉任何一个数据包。为存储在另一正常datanode的当前数据块指定一个新的标识,并将该标识传送给namenode,以便故障datanode在恢复后可以删除存储的部分数据块。从管线中删除故障数据节点并且把余下的数据块写入管线中另外两个正常的的datanode。namenode注意到块复本量不足时,会在另一个节点上创建一个新的复本。后续的数据块继续正常接受处理。
在一个块被写入期间可能会有多个datanode同时发生故障,但非常少见。只要写入了dfs.replication.min的复本数(默认为1),写操作就会成功,并且这个块可以在集群中异步复制,直到达到其目标复本数(dfs.replication的默认值为3)。
客户端完成数据的写入后,对数据流调用close()方法(步骤6)。该操作将剩余的所有数据包写入datanode管线,并在联系到namenode且发送文件写入完成信号之前,等待确认(步骤7)。namenode已经知道文件由哪些块组成(通过Datastreamer请求分配数据块),所以它在返回成功前只需要等待数据块进行最小量的复制。
复本怎么放
namenode如何选择在哪个datanode存储复本(replica)?这里需要对可靠性、写入带宽和读取带宽进行权衡。例如,把所有复本都存储在一个节点损失的写入带宽最小,因为复制管线都是在同一节点上运行,但这并不提供真实的冗余(如果节点发生故障,那么该块中的数据会丢失)。同时,同一机架上服务器间的读取带宽是很高的。另一个极端,把复本放在不同的数据中心可以最大限度地提高冗余,但带宽的损耗非常大。即使在同一数据中心(到目前为止,所有Hadoop 集群均运行在同一数据中心内),也有许多不同的数据布局策略。其实,在发布的Hadoop 0.17.0版中改变了数据布局策略来辅助保持数据块在集群内分布相对均匀(第350页的“均衡器”详细说明了如何保持集群的均衡)。在1.x之后的发行版本,可即时选择数据块的布局策略。
Hadoop 的默认布局策略是在运行客户端的节点上放第1个复本 (如果客户端运行在集群之外,就随机选择一个节点,不过系统会避免挑选那些存储太满或太忙的节点)。第2个复本放在与第一个不同且随机另外选择的机架中节点上(离架)。第3个复本与第2个复本放在同一个机架上,且随机选择另一个节点。其他复本放在集群中随机选择的节点上,不过系统会尽量避免在同一个的机架上放太多复本。
一旦选定复本的放置位置,就根据网络拓扑创建一个管线。如果复本数为3,则有图3-5所示的管线。
总的来说,这一方法不仅提供很好的稳定性(数据块存储在两个机架中)并实现很好的负载均衡,包括写入带宽(写入操作只需要遍历一个交换机)、读取性能(可以从两个机架中选择读取)和集群中块的均匀分布(客户端只在本地机架上写入一个块)。
图3-5. 一个典型的复本管线
1.6.3 一致模型
文件系统的一致模型(coherency model)描述了文件读/写的数据可见性。HDFS为性能牺牲了一些POSIX 要求,因此一些操作与你期望的可能不同。
新建一个文件之后,它能在文件系统的命名空间中立即可见,如下所示:
Path p = new Path("p");
Fs.create(p);
assertThat(fs.exists(p),is(true)); 复制代码
但是,写入文件的内容并不保证能立即可见,即使数据流已经刷新并存储。所以文件长度显示为0:
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(),is(0L)); 复制代码
当写入的数据超过一个块后,第一个数据块对新的reader就是可见的。之后的块也不例外。总之,当前正在写入的块对其他reader不可见。
HDFS提供一个方法来使所有缓存与数据节点强行同步,即对FSDataOutputStream调用sync()方法。当sync()方法返回成功后,对所有新的reader而言,HDFS能保证文件中到目前为止写入的数据均到达所有datanode的写入管道并且对所有新的reader均可见:[ 在Hadoop 1.x之后的版本,sync()方法被丢弃了,进而采用等价的hflush()方法。另外还增加了一个hsync()方法,确保操作系统刷新数据到磁盘(类似于POSIX的fsync方法)。但在本书写作期间,此方法还没有实现,只有hflush()方法。]
Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
out.sync();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length()))); 复制代码
该操作类似于POSIX中的fsync系统调用,该调用提交的是一个文件描述符的缓冲数据。例如,利用标准Java API将数据写入本地文件,我们能够在刷新数据流且同步之后看到文件内容:
FileOutputStream out = new FileOutputStream(localFile);
out.write("content".getBytes("UTF-8"));
out.flush(); // flush to operating system
out.getFD().sync(); // sync to disk
assertThat(localFile.length(), is(((long) "content".length()))); 复制代码
在HDFS中关闭文件其实还隐含执行sync()方法:
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.close(); 复制代码
对应用设计的重要性
这个一致模型和设计应用程序的具体方法息息相关。如果不调用sync()方法,就要准备好在客户端或系统发生故障时可能会丢失数据块。对很多应用来说,这是不可接受的,所以需要在适当的地方调用sync()方法,例如在写入一定的记录或字节之后。尽管sync()操作被设计成尽量减少HDFS负载,但它有许多额外的开销,所以在数据鲁棒性和吞吐量之间就会有所取舍。怎样权衡与具体的应用相关,通过设置不同调用sync()方法的频率来衡量应用程序的性能,最终找到一个合适的频率。
3.7 通过Flume和Sqoop导入数据
不需要重新写一个应用程序来将数据导入HDFS中,更值得考虑的是使用一些现成的工具将数据导入,因为这些工具已经涵盖了很多常用的需求。
Apache Flume(
http://incubator.apache.org/flume/ )是一个将大规模流数据导入HDFS的工具。最典型的应用是从另外一个系统中收集日志数据——例如,银行的网络服务器——并实现在HDFS中的聚集操作以便用于后期的分析操作。Flume能够支持大量的数据源,其中一些通常用于包含tail(如同Unix的tail一样,通过管道的方式将本地文件写入Flume中),syslog和apache log4j(允许Java应用通过Flume将事件写入HDFS中的文件)的系统。
Flume节点允许以任何拓扑方式进行组织。典型配置是在每个源机器上(例如每个Web服务器)运行一个Flume节点,通过多个层级的聚合节点,最后将数据存入HDFS中。
Flume提供了不同级别的数据投递可靠性,例如:最大努力投递,该级别不能容忍任何Flume节点失效;端到端投递,该级别能确保当源节点和HDFS节点之间有多个Flume节点失效的情况下数据成功投递。
另一方面,Apache Sqoop(
http://sqoop.apache.org/ )是为了将数据从结构化存储设备批量导入HDFS中设计的,例如关系数据库。Sqoop的应用场景,是组织将白天生产的数据库中的数据在晚间导入Hive数据仓库中进行分析。第15章将详细介绍Sqoop。
3.8 通过distcp并行复制
前面着重介绍单线程访问的HDFS访问模型。例如,通过指定文件通配符,可以对一组文件进行处理,但是为了提高性能,需要写一个程序来并行处理这些文件。Hadoop有一个有用的distcp分布式复制程序,该程序可以从Hadoop 文件系统间复制大量数据,也可以将大量的数据复制到Hadoop中。
distcp的典型应用场景是在两个HDFS集群之间传输数据。如果两个集群运行相同版本的Hadoop,就非常适合使用hdfs方案:
% hadoop distcp hdfs://namenode1/foo hdfs://namenode2/bar 复制代码
这行指令把第一个集群/foo目录(及其内容)复制到第二个集群的/bar目录下,所以第二个集群最后的目录结构是/bar/foo。如果/bar不存在,则新建一个。也可以指定多个源路径,并把所有路径都复制到目标路径下。注意,源路径必须是绝对路径。
在默认情况下,distcp会跳过目标路径下已经存在的文件,但可以通过-overwrite选项覆盖现有的文件。也可以通过-update选项来选择有改动的文件。
使用-overwrite和-update选项中任意一个(或两个)需要改变源路径和目标路径的解释方式。这里最好用一个例子来说明。如果改变先前例子中第一个集群/foo子树下的一个文件,就会进行下面的命令将修改同步到第二个集群上:
% hadoop distcp -update hdfs://namenode1/foo hdfs://namenode2/bar/foo 复制代码
因为源目录下的内容已被复制到目标目录下,所以需要在目标路径中添加额外的子目录/foo。(如果对rsync命令比较熟悉,可以认为-overwrite或-update选项就是在源路径末尾添加一个斜杠。)
如果不确定discp 操作的效果,最好先在一个小的测试目录树下试着运行一次。
有很多选项可以用来控制distcp的复制方式,包括保留文件属性,忽略节点故障和限制文件或总数据的复制量。不带任何选项运行时,将显示使用说明。
distcp是作为一个MapReduce作业来实现的,该复制作业是通过集群中并行运行的map来完成。这里没有reducer。每个文件通过一个map进行复制,并且distcp试图为每一个map分配大致相等的数据来执行,即把文件划分为大致相等的块。
map的数量是这样确定的。让每一个map复制合理的数据量来尽量减少构建任务时所涉及的开销,这是一个很好的想法,所以每个map 至少复制256 MB数据(除非输入的总数据量较少,否则一个map就可以完成所有的复制)。例如,将1 GB大小的文件分给4个map任务。如果数据非常大,则有必要限制map的数量进而限制带宽和集群的使用。默认情况下,每个集群节点(或tasktracker)最多分配20个map任务。例如,将1000 GB的文件复制到一个由100个节点组成的集群,一共分配2000个map任务(每个节点20个map任务),所以每个map任务平均复制512 MB数据。通过对distcp指定-m参数,可以减少分配的map任务数。例如,-m 1000将分配1000个map任务,每个平均复制1 GB数据。
如果试图在两个运行着不同HDFS版本的集群上使用distcp复制数据并使用hdfs 协议,会导致复制作业失败,因为两个系统版本的RPC是不兼容的。想要弥补这种情况,可以使用基于只读HTTP协议的HFTP文件系统并从源文件系统中读取数据。这个作业必须运行在目标集群上,进而实现HDFS RPC版本的兼容。使用HFTP协议重复前面的例子:
% hadoop distcp hftp://namenode1:50070/foo hdfs://namenode2/bar 复制代码
注意,需要在URI源中指定namenode的Web 端口。这是由dfs.http.address属性决定的,其默认值为50070。
使用新出的webhdfs协议(替代hftp)后,对源集群和目标集群均可以使用HTTP协议进行通信,且不会造成任何不兼容的问题。
% hadoop distcp webhdfs://namenode1:50070/foo webhdfs://namenode2:50070/bar 复制代码
另外一个变种是使用HDFS HTTP代理服务作为源distcp或者目标distcp,进而具备了设置防火墙和控制带宽的优点,详情参见3.4.1节对HTTP的讨论。
保持 HDFS 集群的均衡
向HDFS复制数据时,考虑集群的均衡性是相当重要的。当文件块在集群中均匀分布时,HDFS 能达到最佳工作状态。回到前面1000 GB数据的例子,将-m选项指定为1,即由一个map来执行复制作业,它的意思是——不考虑速度变慢和未充分利用集群资源——每个块的第一个复本将存储到运行map 的节点上(直到磁盘被填满)。第二和第三个复本将分散在集群中,但这一个节点是不均衡的。将map的数量设定为多于集群中节点的数量,可以避免这个问题。鉴于此,最好首先使用默认的每个节点20个map来运行distcp命令。
然而,这也并不总能阻止集群的不均衡。也许想限制map的数量以便另外一些节点可以运行其他作业。若是这样,可以使用均衡器(balancer)工具(参见10.1.4节对均衡器的讨论)进而改善集群中块分布的均匀程度。
3.9 Hadoop存档
每个文件均按块方式存储,每个块的元数据存储在namenode的内存中,因此Hadoop存储小文件会非常低效。因为大量的小文件会耗尽namenode中的大部分内存。但注意,存储小文件所需的磁盘容量和存储这些文件原始内容所需要的磁盘空间相比也不会增多。例如,一个1 MB的文件以大小为128 MB 的块存储,使用的是1 MB 的磁盘空间,而不是128 MB。
Hadoop存档文件或HAR 文件,是一个更高效的文件存档工具,它将文件存入HDFS 块,在减少namenode内存使用的同时,允许对文件进行透明地访问。具体说来,Hadoop存档文件可以用作MapReduce的输入。
3.9.1 使用Hadoop存档工具
Hadoop存档是通过archive工具根据一组文件创建而来的。该存档工具运行一个MapReduce作业来并行处理所有的输入文件,因此你需要一个MapReduce集群来运行和使用它。这里,HDFS中有一些文档我们希望对它们进行存档:
% hadoop fs -lsr /my/files
-rw-r--r-- 1 tom supergroup 1 2009-04-09 19:13 /my/files/a
drwxr-xr-x - tom supergroup 0 2009-04-09 19:13 /my/files/dir
-rw-r--r-- 1 tom supergroup 1 2009-04-09 19:13 /my/files/dir/b 复制代码
现在我们可以运行archive指令:
% hadoop archive -archiveName files.har /my/files /my 复制代码
第一个选项是存档文件的名称,这里是file.har。HAR文件总是一个以.har为扩展名的文件,这是必需的,具体理由见后文描述。接下来的参数是需要存档的文件。这里我们只存档一棵源文件树下的文件,即HDFS下/my/files中的文件,但事实上该工具可以接受多棵源文件树。最后一个参数是HAR文件的输出目录。让我们看看这个存档文件是怎么创建的:
% hadoop fs -ls /my
Found 2 items
drwxr-xr-x - tom supergroup 0 2009-04-09 19:13 /my/files
drwxr-xr-x - tom supergroup 0 2009-04-09 19:13 /my/files.har
% hadoop fs -ls /my/files.har
Found 3 items
-rw-r--r-- 10 tom supergroup 165 2009-04-09 19:13 /my/files.har/_index
-rw-r--r-- 10 tom supergroup 23 2009-04-09 19:13 /my/files.har/_masterindex
-rw-r--r-- 1 tom supergroup 2 2009-04-09 19:13 /my/files.har/part-0 复制代码
这个目录列表显示了HAR文件的组成部分:两个索引文件以及部分文件的集合——本例中只有一个。这些部分文件中包含已经链接在一起的大量原始文件的内容,并且我们通过索引可以找到包含在存档文件中的部分文件,它的起始点和长度。但所有这些细节对于使用har URI方案与HAR文件交互的应用都是隐式的,并且HAR文件系统是建立在基础文件系统上的(本例中是HDFS)。以下命令以递归方式列出了存档文件中的部分文件:
% hadoop fs -lsr har:///my/files.har
drw-r--r-- - tom supergroup 0 2009-04-09 19:13 /my/files.har/my
drw-r--r-- - tom supergroup 0 2009-04-09 19:13 /my/files.har/my/files
-rw-r--r-- 10 tom supergroup 1 2009-04-09 19:13 /my/files.har/my/files/a
drw-r--r-- - tom supergroup 0 2009-04-09 19:13 /my/files.har/my/files/dir
-rw-r--r-- 10 tom supergroup 1 2009-04-09 19:13 /my/files.har/my/files/dir/b 复制代码
如果HAR文件所在的文件系统是默认的文件系统,这就非常直观易懂。另一方面,如果想在其他文件系统中引用HAR文件,则需要使用一个不同于正常情况的URI路径格式。以下两个指令作用相同,示例如下:
% hadoop fs -lsr har:///my/files.har/my/files/dir
% hadoop fs -lsr har://hdfs-localhost:8020/my/files.har/my/files/dir 复制代码
注意第二个格式,仍以har方案表示一个HAR文件系统,但是由hdfs指定基础文件系统方案的权限,后面加上一个斜杠和HDFS 主机(localhost)及端口(8020)。我们现在知道为什么HAR文件必须要有.har扩展名。通过查看权限和路径及.har扩展名的组成,HAR文件系统将har URI转换成为一个基础文件系统的URI。在本例中是hdfs://localhost:8020/user/tom/files.har。路径的剩余部分是文件在存档文件系统中的路径:/user/tom/files/dir。
要想删除HAR文件,需要使用递归格式进行删除,因为对于基础文件系统来说,HAR 文件是一个目录:
% hadoop fs -rmr /my/files.har 复制代码
3.9.2 不足
对于HAR文件,还需要了解它的一些不足。新建一个存档文件会创建原始文件的一个副本,因此至少需要与要存档(尽管创建了存档文件后可以删除原始文件)的文件容量相同大小的磁盘空间。虽然存档文件中源文件能被压缩(HAR文件在这方面更接近于tar文件),但目前还不支持存档文件的压缩。
一旦创建,存档文件便不能再修改。要想从中增加或删除文件,必须重新创建存档文件。事实上,一般不会再对存档后的文件进行修改,因为它们是定期成批存档的,比如每日或每周。
如前所述,HAR文件可以作为MapReduce的输入。然而,InputFormat类并不知道文件已经存档,尽管该类可以将多个文件打包成一个MapReduce分片,所以即使在HAR文件中处理许多小文件,也仍然低效的。7.2.1节对小文件和CombineFileInputFormat的讨论中,将提到该问题的另一种解决方案。
最后,如果已经尽量减少系统中小文件的数量,但仍然受制于namenode的内存容量,可以考虑使用联邦HDFS来提高命名空间的可扩展性(请参见3.2.3节)。