fc013 发表于 2018-11-17 18:52:34

Scala实战之访问Kerberos环境的HDFS

本帖最后由 fc013 于 2018-11-17 19:16 编辑



问题导读:
1.如何使用Scala代码访问Kerberos环境的HDFS?2.怎样导出Kerberos集群中访问HDFS的keytab文件?3.怎样实现客户端访问HDFS工具类?
最新经典文章,欢迎关注公众号
http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg

文章编写目的
随着开发语言的多样性,也有基于Scala语言进行开发,本篇文章主要介绍如何使用Scala代码访问Kerberos环境的HDFS。

[*]内容概述
1.环境准备
2.Kerberos环境连接示例

[*]测试环境
1.CDH版本为5.15.0
2.OS为Redhat7.2

[*]前置条件
1.CDH集群运行正常
2.集群已启用Kerberos

环境准备
使用IDE工具通过Maven创建一个Scala工程,这里就不详细介绍Scala的开发环境搭建了。
1.在工程的pom.xml文件中增加如下依赖
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0-cdh5.15.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.6.0-cdh5.15.0</version>
</dependency>

2.为Kerberos集群,需要导出一个keytab文件用于访问HDFS,导出步骤如下
在CMD命令行执行如下命令导出AD中用户的keytab文件
ktpass -princ hdfs/admin@FAYSON.COM-mapuser hdfs/admin -pass 123!QAZ -out hdfsadmin.keytab -crypto RC4-HMAC-NT



导出的keytab文件会在当前命令执行目录。



3.获取集群krb5.conf文件,内容如下# more /etc/krb5.conf
# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/

includedir /var/lib/sss/pubconf/krb5.include.d/

default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log


dns_lookup_realm = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
rdns = true
default_realm = FAYSON.COM
#default_ccache_name = KEYRING:persistent:%{uid}


FAYSON.COM = {
kdc = adserver.fayson.com
admin_server = adserver.fayson.com
}


.fayson.com = FAYSON.COM
fayson.com = FAYSON.COM



4.配置hosts文件,确保本地开发环境与集群所有节点通且端口均放通(如8020等)


由于Fayson这里使用的是公网环境所以hostname与外网的ip对应,这里会导致一个问题在向集群put数据文件时会失败,如果开发环境和HDFS都属于内网环境则不会有这个问题。

5.通过Cloudera Manager下载HDFS客户端配置

6.将上述准备的配置文件及keytab等信息拷贝至本地目录或工程中,Fayson的工程目录结构如下:

客户端访问HDFS工具类

1.ClientUtils类主要提供客户端初始化方法,内容如下:package com.cloudera.utils

import java.io.IOException
import java.util.Properties

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.UserGroupInformation

/**
* package: com.cloudera.utils
* describe: 客户端访问HDFS工具类
* creat_user: Fayson
* email: htechinfo@163.com
* creat_date: 2018/11/13
* creat_time: 下午9:16
* 公众号:Hadoop实操
*/
object ClientUtils {

/**
    * 初始化HDFS的Configuration
    * @return
    */
def initConfiguration(): Configuration = {
    val configuration = new Configuration
    configuration.addResource(this.getClass().getResourceAsStream("/hdfs-client-kb/core-site.xml"))
    configuration.addResource(this.getClass().getResourceAsStream("/hdfs-client-kb/hdfs-site.xml"))

    configuration
}

/**
    * 初始化访问Kerberos访问
    * @param configuration
    * @param debug 是否启用Kerberos的Debug模式
    * @param properties 客户端配置信息
    */
def initKerberosENV(configuration: Configuration, debug: Boolean, properties: Properties):Unit = {
    System.setProperty("java.security.krb5.conf", properties.getProperty("krb5.conf.path"))
    System.setProperty("javax.security.auth.useSubjectCredsOnly", "false")
    if (debug) System.setProperty("sun.security.krb5.debug", "true")
    try {
      UserGroupInformation.setConfiguration(configuration)
      UserGroupInformation.loginUserFromKeytab(properties.getProperty("kerberos.user"), properties.getProperty("kerberos.keytab.path"))
      System.out.println(UserGroupInformation.getCurrentUser)
    } catch {
      case e: IOException => {
      e.printStackTrace()
      }
    }
}
}



2.HDFSUtils用于操作HDFS的工具类
package com.cloudera.utils

import org.apache.hadoop.fs.permission._
import org.apache.hadoop.fs.{FileSystem, Path}
import scala.collection.JavaConversions._


/**
* package: com.cloudera.utils
* describe: 用户操作HDFS工具类
* creat_user: Fayson
* email: htechinfo@163.com
* creat_date: 2018/11/13
* creat_time: 下午10:05
* 公众号:Hadoop实操
*/
object HDFSUtils {

/**
    * 使用HDFS API向HDFS创建目录
    * 在创建目录指定目录权限为777时,该权限需要与HDFS默认的umask权限相减,最终得出目录权限为755
    * umask默认为022,0表示对owner没有限制,2表示对group不允许有写权限,2表示对other不允许有写权限
    * 因此在创建目录指定777,但创建出来的目录为755的原因
    * @param fileSystem
    * @param dirName
    */
def mkdir(fileSystem: FileSystem, dirName: String):Unit = {
    val path = new Path(dirName)
    if(fileSystem.exists(path)) {
      System.out.println("目录已存在")
    } else {
      val isok = fileSystem.mkdirs(path)
      if(isok) {
      System.out.println("HDFS目录创建成功:" + dirName)
      } else {
      System.out.println("HDFS目录创建失败:" + dirName)
      }
    }
}

/**
    * 设置HDFS指定目录及文件权限
    * @param fileSystem
    * @param path文件或目录路径
    * @param mode权限模式,如:777、755、644,数字对应的R=4,W=2,X=1
    */
def setPermission(fileSystem: FileSystem, path: String, mode: String): Unit = {
    val fspath = new Path(path)
    fileSystem.setPermission(fspath, new FsPermission(mode))
}

/**
    * 设置HDFS指定目录或文件的属主及属组
    * @param fileSystem
    * @param path
    * @param username
    * @param groupname
    */
def setowner(fileSystem: FileSystem, path: String, username: String, groupname: String): Unit = {
    val fspath = new Path(path)
    fileSystem.setOwner(fspath, username, groupname)
}

/**
    * 设置HDFS指定目录的ACL权限
    * 在指定ACL时AclEntryScope.ACCESS表示当前目录所拥有的访问权限
    * AclEntryScope.DEFAULT,表示该目录下所有子目录及文件集成父目录的Default ACL权限
    * @param fileSystem
    * @param path
    */
def setAcl(fileSystem: FileSystem,path: String): Unit = {
    val fspath = new Path(path)
    val listAcl = List(
      new AclEntry.Builder().setType(AclEntryType.GROUP).setScope(AclEntryScope.ACCESS).setName("testa").setPermission(FsAction.ALL).build()
    )

    fileSystem.modifyAclEntries(fspath, listAcl)
}

/**
    * 递归指定路径下所有目录及文件
    * @param path
    * @param fileSystem
    * @return
    */
def recursiveDir(path: String, fileSystem: FileSystem): List = {
    var listPath = List()
    val fspath = new Path(path)
    val listfiles = fileSystem.listStatus(fspath)
    listfiles.foreach(f => {
      System.out.println(f.getPath.toString)
      if(f.isDirectory) {
      recursiveDir(f.getPath.toString, fileSystem)
      }
    })
    listPath
}

}



示例代码及运行

1.OperatorHDFSByAPI为测试类包含API的调用
package com.cloudera.hdfs

import java.util.Properties

import com.cloudera.utils.{ClientUtils, HDFSUtils}
import org.apache.hadoop.fs.FileSystem

/**
* package: com.cloudera.hdfs
* describe: Scala访问Kerberos环境下的HDFS示例
* creat_user: Fayson
* email: htechinfo@163.com
* creat_date: 2018/11/13
* creat_time: 下午9:02
* 公众号:Hadoop实操
*/
object OperatorHDFSByAPI {

def main(args: Array): Unit = {
    //加载客户端配置参数
    val properties = new Properties()
    properties.load(this.getClass.getResourceAsStream("/client.properties"))

    //初始化HDFS Configuration 配置
    val configuration = ClientUtils.initConfiguration()

    //集群启用Kerberos,代码中加入Kerberos环境
    ClientUtils.initKerberosENV(configuration, false, properties)

    val fileSystem = FileSystem.get(configuration)

    val testPath = "/fayson/test"
    //创建HDFS目录
//    HDFSUtils.mkdir(fileSystem, testPath)
    //设置目录属主及组
    HDFSUtils.setowner(fileSystem, testPath, "hive", "hive")
    //设置指定HDFS路径的权限
    HDFSUtils.setPermission(fileSystem, testPath, "771")
    //设置指定HDFS目录的ACL
    HDFSUtils.setAcl(fileSystem, testPath)
    //递归指定路径下所有目录及文件
    HDFSUtils.recursiveDir("/user/hive/warehouse/test.db/", fileSystem)

    fileSystem.close()
}

}



2.示例运行


3.查看HDFS上创建的目录、权限及ACL等


未设置ACL权限的userc用户无权限访问该目录



总结

1.在进行本地开发时,必须将集群的hostname及IP配置在本地的hosts文件中(如果使用DNS服务则可以不配置hosts文件),否则无法与集群互通,确保本地客户端与集群的端口是放通的。
2.在创建目录指定目录权限为777时,创建目录的权限只能到755,是由于HDFS的umask导致,默认的umask为022(0表示对owner没有限制,2表示对group不允许有写权限,2表示对other不允许有写权限),指定的777权限减去022即为创建目录的权限
3.设置HDFS目录或文件的ACL时,需要区分AclEntryScope.ACCESS和DEFAULT。ACCESS表示为当前目录或文件指定ACL访问权限,DEFAULT表示在该目录下创建子目录或文件会继承该ACL权限。

GitHub源码地址:
https://github.com/fayson/cdhproject/tree/master/scalademo



来源: weixin作者: Fayson Hadoop实操
原文链接:如何使用Scala代码访问Kerberos环境的HDFShttps://mp.weixin.qq.com/s/Raprxc6G_YgnDF4m8qNpPA

jiangzi 发表于 2018-11-17 23:37:21

学习学习~~~

jiewuzhe02 发表于 2018-11-18 09:53:46

分享分享
页: [1]
查看完整版本: Scala实战之访问Kerberos环境的HDFS