分享

hcatalog简介和使用

sstutu 2014-7-8 19:25:50 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 36271
问题导读:
1.Hcatalog是什么?
2.HCatalog底层依赖于Hive Metastore,如何获取表结构?






Hcatalog是apache开源的对于表和底层数据管理统一服务平台,目前最新release版本是0.5,不过需要hive 0.10支持,由于我们hive集群版本是0.9.0,所以只能降级使用hcatalog 0.4,由于hcatalog中所有的底层数据信息都是保存在hive metastore里,所以hive版本升级后schema变动或者api变动会对hacatalog产生影响,因此在hive 0.11中已经集成了了hcatalog,以后也会成为hive的一部分,而不是独立的项目。



HCatalog底层依赖于Hive Metastore,执行过程中会创建一个HiveMetaStoreClient,通过这个instance提供的api来获取表结构数据,如果是local metastore mode的话,会直接返回一个HiveMetaStore.HMSHandler,如果是remote mode的话(hive.metastore.local设置为false),会依据hive.metastore.uris(比如thrift://10.1.8.42:9083, thrift://10.1.8.51:9083)中设定的一串uri逐一顺序建立连接。只要有一个链接建立就可以了,同时为了避免所有client都和第一个uri建立连接,导致负载过大,我加了点小trick,对这串uris随机shuffle来做load balance



由于我们的集群开启了kerberos security,需要获取DelegationToken,但是local mode是不支持的,所以只用能remote mode

HiveMetaStoreClient.java

  1. public String getDelegationToken(String owner, String renewerKerberosPrincipalName) throws  
  2.     MetaException, TException {  
  3.   if (localMetaStore) {  
  4.     throw new UnsupportedOperationException("getDelegationToken() can be " +  
  5.         "called only in thrift (non local) mode");  
  6.   }  
  7.   return client.get_delegation_token(owner, renewerKerberosPrincipalName);  
  8. }  
复制代码


HCatInputFormat和HCatOutputFormat提供一些mapreduce api来读取表和写入表

HCatInputFormat API:

  1. public static void setInput(Job job,  
  2.     InputJobInfo inputJobInfo) throws IOException;  
复制代码
先实例化一个InputJobInfo对象,该对象包含三个参数dbname,tablename,filter,然后传给setInput函数,来读取相应的数据
  1. public static HCatSchema getTableSchema(JobContext context)   
  2.     throws IOException;  
复制代码

在运行时(比如mapper阶段的setup函数中),可以传进去JobContext,调用静态getTableSchema来获取先前setInput时设置的table schema信息

HCatOutputFormat API:

  1. public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException;  
复制代码


OutPutJobInfo接受三个参数databaseName, tableName, partitionValues,其中第三个参数类型是Map<String, String>,partition key放在map key里,partition value放在对应map key的value中,该参数可传入null或空map,如果指定的partition存在的话,会抛org.apache.hcatalog.common.HCatException : 2002 : Partition already present with given partition key values

比如要要写入指定的partition(dt='2013-06-13',country='china' ),可以这样写
  1. Map<String, String> partitionValues = new HashMap<String, String>();  
  2. partitionValues.put("dt", "2013-06-13");  
  3. partitionValues.put("country", "china");  
  4. HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues);  
  5. HCatOutputFormat.setOutput(job, info);  
复制代码
  1. public static HCatSchema getTableSchema(JobContext context) throws IOException;  
复制代码
获取之前HCatOutputFormat.setOutput指定的table schema信息


  1. public static void setSchema(final Job job, final HCatSchema schema) throws IOException;  
复制代码

设置最终写入数据的schema信息,若不调用这个方法,则默认会使用table schema信息

下面提供一个完整mapreduce例子计算一天每个guid访问页面次数,map阶段从表中读取guid字段,reduce阶段统计该guid对应pageview的总数,然后写回另外一张带有guid和count字段的表中
  1. import java.io.IOException;  
  2. import java.util.Iterator;  
  3.   
  4. import org.apache.hadoop.conf.Configuration;  
  5. import org.apache.hadoop.conf.Configured;  
  6. import org.apache.hadoop.io.IntWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.io.WritableComparable;  
  9. import org.apache.hadoop.mapreduce.Job;  
  10. import org.apache.hadoop.mapreduce.Mapper;  
  11. import org.apache.hadoop.mapreduce.Reducer;  
  12. import org.apache.hadoop.util.Tool;  
  13. import org.apache.hadoop.util.ToolRunner;  
  14. import org.apache.hcatalog.data.DefaultHCatRecord;  
  15. import org.apache.hcatalog.data.HCatRecord;  
  16. import org.apache.hcatalog.data.schema.HCatSchema;  
  17. import org.apache.hcatalog.mapreduce.HCatInputFormat;  
  18. import org.apache.hcatalog.mapreduce.HCatOutputFormat;  
  19. import org.apache.hcatalog.mapreduce.InputJobInfo;  
  20. import org.apache.hcatalog.mapreduce.OutputJobInfo;  
  21.   
  22. public class GroupByGuid extends Configured implements Tool {  
  23.   
  24.     @SuppressWarnings("rawtypes")  
  25.     public static class Map extends  
  26.             Mapper<WritableComparable, HCatRecord, Text, IntWritable> {  
  27.         HCatSchema schema;  
  28.         Text guid;  
  29.         IntWritable one;  
  30.   
  31.         @Override  
  32.         protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)  
  33.                 throws IOException, InterruptedException {  
  34.             guid = new Text();  
  35.             one = new IntWritable(1);  
  36.             schema = HCatInputFormat.getTableSchema(context);  
  37.         }  
  38.   
  39.         @Override  
  40.         protected void map(WritableComparable key, HCatRecord value,  
  41.                 Context context) throws IOException, InterruptedException {  
  42.             guid.set(value.getString("guid", schema));  
  43.             context.write(guid, one);  
  44.         }  
  45.     }  
  46.   
  47.     @SuppressWarnings("rawtypes")  
  48.     public static class Reduce extends  
  49.             Reducer<Text, IntWritable, WritableComparable, HCatRecord> {  
  50.         HCatSchema schema;  
  51.   
  52.         @Override  
  53.         protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)  
  54.                 throws IOException, InterruptedException {  
  55.             schema = HCatOutputFormat.getTableSchema(context);  
  56.         }  
  57.   
  58.         @Override  
  59.         protected void reduce(Text key, Iterable<IntWritable> values,  
  60.                 Context context) throws IOException, InterruptedException {  
  61.             int sum = 0;  
  62.             Iterator<IntWritable> iter = values.iterator();  
  63.             while (iter.hasNext()) {  
  64.                 sum++;  
  65.                 iter.next();  
  66.             }  
  67.             HCatRecord record = new DefaultHCatRecord(2);  
  68.             record.setString("guid", schema, key.toString());  
  69.             record.setInteger("count", schema, sum);  
  70.             context.write(null, record);  
  71.         }  
  72.     }  
  73.   
  74.     @Override  
  75.     public int run(String[] args) throws Exception {  
  76.         Configuration conf = getConf();  
  77.   
  78.         String dbname = args[0];  
  79.         String inputTable = args[1];  
  80.         String filter = args[2];  
  81.         String outputTable = args[3];  
  82.         int reduceNum = Integer.parseInt(args[4]);  
  83.   
  84.         Job job = new Job(conf,  
  85.                 "GroupByGuid, Calculating every guid's pageview");  
  86.         HCatInputFormat.setInput(job,  
  87.                 InputJobInfo.create(dbname, inputTable, filter));  
  88.   
  89.         job.setJarByClass(GroupByGuid.class);  
  90.         job.setInputFormatClass(HCatInputFormat.class);  
  91.         job.setMapperClass(Map.class);  
  92.         job.setReducerClass(Reduce.class);  
  93.         job.setMapOutputKeyClass(Text.class);  
  94.         job.setMapOutputValueClass(IntWritable.class);  
  95.         job.setOutputKeyClass(WritableComparable.class);  
  96.         job.setOutputValueClass(DefaultHCatRecord.class);  
  97.         job.setNumReduceTasks(reduceNum);  
  98.   
  99.         HCatOutputFormat.setOutput(job,  
  100.                 OutputJobInfo.create(dbname, outputTable, null));  
  101.         HCatSchema s = HCatOutputFormat.getTableSchema(job);  
  102.         HCatOutputFormat.setSchema(job, s);  
  103.   
  104.         job.setOutputFormatClass(HCatOutputFormat.class);  
  105.   
  106.         return (job.waitForCompletion(true) ? 0 : 1);  
  107.     }  
  108.   
  109.     public static void main(String[] args) throws Exception {  
  110.         int exitCode = ToolRunner.run(new GroupByGuid(), args);  
  111.         System.exit(exitCode);  
  112.     }  
  113. }  
复制代码


其实hcatalog还支持动态分区dynamic partition,我们可以在OutJobInfo中指定部分partition keyvalue pair,在运行时候根据传进来的值设置HCatRecord对应的其他partition keyvalue pair,这样就能在一个job中同时写多个partition了



出处:http://blog.csdn.net/lalaguozhe/article/details/908390

欢迎加入about云群425860289432264021 ,云计算爱好者群,关注about云腾讯认证空间

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条