about云discuz论坛apache日志hadoop大数据分析项目: 数据时如何导入hbase与hive的到了这里项目的基本核心功能已经完成。这里介绍一下hive以及hbase是如何入库以及代码实现。
首先我们将hbase与hive整合,详细参考
about云分析discuz论坛apache日志hadoop大数据项目:hive与hbase是如何整合使用的
about云分析discuz论坛apache日志hadoop大数据项目:hive与hbase是如何整合使用的
整合完毕,我们就可以通过mapreduce把数据导入hbase,当然在导入hbase的同时,hive数据同时也可以查询出结果。
那么我们是如何导入hbase的,思路前面已经介绍,这里采用的是hbase put。以后的版本中,我们将采用多种方法来实现此功能包括hive分区、hbase后面如果遇到问题,我们可能还会重构。
开发环境介绍:
1.Eclipse
2.Hadoop2.2
3.hbase-0.98.3-hadoop2
思路:
在导入hbase的过程中,我们直接使用了mapreduce中的map函数,reduce在这里对我们没有太大的用处,我们这里借助的是mapreduce的分布式,提高查询效率。
mapreduce中map函数主要实现了哪些功能
1.清洗数据
通过
- public static void StringResolves(String line, Context context)
复制代码
函数实现
2.数据的导入
通过public static void addData(String rowKey, String tableName, String[] column1, String[] value1, Context context)
函数实现
下面贴上代码:
HbaseMain.java代码
- package www.aboutyun.com;
- import java.io.IOException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- public class HbaseMain {
-
- static final String INPUT_PATH = "hdfs://master:8020/test.txt";
- static final String OUT_PATH = "hdfs://master:8020/Output";
-
- public static void main(String[] args) throws IOException,
- InterruptedException, ClassNotFoundException {
-
- // 主类
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf, HbaseMain.class.getSimpleName());
- job.setJarByClass(HbaseMain.class);
- // 寻找输入
- FileInputFormat.setInputPaths(job, INPUT_PATH);
- // 1.2对输入数据进行格式化处理的类
- job.setInputFormatClass(TextInputFormat.class);
- job.setMapperClass(HbaseMap.class);
- // 1.2指定map输出类型<key,value>类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
- job.setNumReduceTasks(0);
- // 指定输出路径
- FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
-
- job.waitForCompletion(true);
-
- }
- }
复制代码
HbaseMap.java代码
- package www.aboutyun.com;
-
- import java.io.IOException;
- import java.text.DateFormat;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.Locale;
- import java.util.Random;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.HColumnDescriptor;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Mapper.Context;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
-
- public class HbaseMap extends Mapper<LongWritable, Text, Text, IntWritable> {
- private static Configuration conf = null;
- /**
- * 初始化配置
- */
-
- static {
- conf = HBaseConfiguration.create();
- conf.set("hbase.zookeeper.quorum", "master");// 使用eclipse时必须添加这个,否则无法定位
- conf.set("hbase.zookeeper.property.clientPort", "2181");
- }
-
- /**************************************************************************/
- public void map(LongWritable key, Text line, Context context)
- throws IOException, InterruptedException {
-
- try {
- StringResolves(line.toString(), context);
- } catch (ParseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
-
- /**************************************************************************/
- // 字符串解析
-
- public static void StringResolves(String line, Context context)
- throws ParseException {
- String ipField, dateField, urlField, browserField;
-
- // 获取ip地址
- ipField = line.split("- -")[0].trim();
-
- // 获取时间,并转换格式
- int getTimeFirst = line.indexOf("[");
- int getTimeLast = line.indexOf("]");
- String time = line.substring(getTimeFirst + 1, getTimeLast).trim();
- Date dt = null;
- DateFormat df1 = DateFormat.getDateTimeInstance(DateFormat.LONG,
- DateFormat.LONG);
- dt = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.US)
- .parse(time);
- dateField = df1.format(dt);
- SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHMM");
- String dateField1 = sdf.format(dt);
- // 获取url
- String[] getUrl = line.split(""");
-
- String firtGeturl = getUrl[1].substring(3).trim();
-
- String secondGeturl = getUrl[3].trim();
- urlField = firtGeturl + "分隔符" + secondGeturl;
-
- // 获取浏览器
- String[] getBrowse = line.split(""");
- String strBrowse = getBrowse[5].toString();
- String str = "(KHTML, like Gecko)";
- int i = strBrowse.indexOf(str);
- strBrowse = strBrowse.substring(i);
- String strBrowse1[] = strBrowse.split("\\/");
- strBrowse = strBrowse1[0].toString();
- String strBrowse2[] = strBrowse.split("\\)");
- browserField = strBrowse2[1].trim();
-
- // 添加到数据库
-
- String rowKey = ipField + dateField1 + urlField
- + new Random().nextInt();
- String[] cols = new String[] { "IpAddress", "AccressTime", "Url",
- "UserBrowser", };
- String[] colsValue = new String[] { ipField, dateField, urlField,
- browserField };
-
- try {
- addData(rowKey, "LogTable", cols, colsValue, context);
- context.write(new Text("1"), new IntWritable(1));
-
- } catch (IOException | InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- /*
- * 为表添加数据(适合知道有多少列族的固定表)
- *
- * @rowKey rowKey
- *
- * @tableName 表名
- *
- * @column1 第一个列族列表
- *
- * @value1 第一个列的值的列表
- */
- public static void addData(String rowKey, String tableName,
- String[] column1, String[] value1, Context context)
- throws IOException {
-
- Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey
- HTable table = new HTable(conf, Bytes.toBytes(tableName));// HTabel负责跟记录相关的操作如增删改查等//
- // 获取表
- HColumnDescriptor[] columnFamilies = table.getTableDescriptor() // 获取所有的列族
- .getColumnFamilies();
-
- for (int i = 0; i < columnFamilies.length; i++) {
- String familyName = columnFamilies[i].getNameAsString(); // 获取列族名
- if (familyName.equals("Info")) { // info列族put数据
- for (int j = 0; j < column1.length; j++) {
- put.add(Bytes.toBytes(familyName),
- Bytes.toBytes(column1[j]), Bytes.toBytes(value1[j]));
- }
- }
-
- }
- table.put(put);
- // context.write(new Text(rowKey), null);
- System.out.println("add data Success!");
- }
-
- }
复制代码
后面我们将会不断完善此功能。
上面的一些准备工作,就不要说了,这里展现一下运行后的效果:
hive效果图
Hbase效果图
这样就达到了效果。后面我们使用hive统计,然后通过将统计结果展示,项目基本完成,后面就不断完善即可。
上文中test.txt数据
static final String INPUT_PATH = "hdfs://master:8020/test.txt";
test.zip
(1.04 KB, 下载次数: 77, 售价: 1 云币)
|