本帖最后由 xioaxu790 于 2014-7-4 17:33 编辑
问题导读:
1、将数据导入HBase中有哪些方式?
2、如何理解使用Put方法实现Hbase数据迁移的运行原理?
摘要:
要使用Hadoop,需要将现有的各种类型的数据库或数据文件中的数据导入HBase。一般而言,有三种常见方式:使用HBase的API中的Put方法,使用HBase 的bulk load工具和使用定制的MapReduce Job方式。本文均有详细描述。
本篇文章是对数据合并的系列文章之一,针对的情景模式就是将现有的各种类型的数据库或数据文件中的数据转入至HBase中。
概述
将数据导入HBase中有如下几种方式:
使用HBase的API中的Put方法
使用HBase 的bulk load 工具
使用定制的MapReduce Job方式
使用HBase的API中的Put是最直接的方法,用法也很容易学习。但针对大部分情况,它并非都是最高效的方式。当需要将海量数据在规定时间内载入HBase中时,效率问题体现得尤为明显。待处理的数据量一般都是巨大的,这也许是为何我们选择了HBase而不是其他数据库的原因。在项目开始之前,你就该思考如何将所有能够很好的将数据转移进HBase,否则之后可能面临严重的性能问题。
HBase有一个名为 bulk load的功能支持将海量数据高效地装载入HBase中。Bulk load是通过一个MapReduce Job来实现的,通过Job直接生成一个HBase的内部HFile格式文件来形成一个特殊的HBase数据表,然后直接将数据文件加载到运行的集群中。使用bulk load功能最简单的方式就是使用importtsv 工具。importtsv 是从TSV文件直接加载内容至HBase的一个内置工具。它通过运行一个MapReduce Job,将数据从TSV文件中直接写入HBase的表或者写入一个HBase的自有格式数据文件。
尽管importtsv 工具在需要将文本数据导入HBase的时候十分有用,但是有一些情况,比如导入其他格式的数据,你会希望使用编程来生成数据,而MapReduce是处理海量数据最有效的方式。这可能也是HBase中加载海量数据唯一最可行的方法了。当然我们可以使用MapReduce向HBase导入数据,但海量的数据集会使得MapReduce Job也变得很繁重。若处理不当,则可能使得MapReduce的job运行时的吞吐量很小。
在HBase中数据合并是一项频繁执行写操作任务,除非我们能够生成HBase的内部数据文件,并且直接加载。这样尽管HBase的写入速度一直很快,但是若合并过程没有合适的配置,也有可能造成写操作时常被阻塞。写操作很重的任务可能引起的另一个问题就是将数据写入了相同的族群服务器(region server),这种情况常出现在将海量数据导入到一个新建的HBase中。一旦数据集中在相同的服务器,整个集群就变得不平衡,并且写速度会显著的降低。我们将会在本文中致力于解决这些问题。我们将从一个简单的任务开始,使用API中的Put方法将MySQL中的数据导入HBase。接着我们会描述如何使用 importtsv 和 bulk load将TSV数据文件导入HBase。我们也会有一个MapReduce样例展示如何使用其他数据文件格式来导入数据。上述方式都包括将数据直接写入 HBase中,以及在HDFS中直接写入HFile类型文件。本文中最后一节解释在向HBase导入数据之前如何构建好集群。本文代码均是以Java编写,我们假设您具有基本Java知识,所以我们将略过如何编译与打包文中的Java示例代码,但我们会在示例源码中进行注释。
通过单个客户端导入MySQL数据
数据合并最常见的应用场景就是从已经存在的关系型数据库将数据导入到HBase中。对于此类型任务,最简单直接的方式就是从一个单独的客户端获取数据,然后通过HBase的API中Put方法将数据存入HBase中。这种方式适合处理数据不是太多的情况。
本节描述的是使用Put方法将MySQL数据导入HBase中的方式。所有的操作均是在一个单独的客户端执行,并且不会使用到MapReduce。本节将会带领你通过HBase Shell创建HBase表格,通过Java来连接集群,并将数据导入HBase。
准备
公共数据集合是个练习HBase数据合并的很好数据源。互联网上有很多公共数据集合。我们在本文中奖使用 “美国国家海洋和大气管理局 1981-2010气候平均值”的公共数据集合。访问http://www1.ncdc.noaa.gov/pub/data/normals /1981-2010/下载。
这些气候报表数据是由美国国家海洋和大气管理局(NOAA)生成的。在本文中,我们使用在目录 products | hourly 下的小时温度数据(可以在上述链接页面中找到)。下载hly-temp-normal.txt文件。
需要一个MySQL实例,在MySQL数据库中创建hly_temp_normal表格,使用如下的SQL命令:
- create table hly_temp_normal (
- id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
- stnid CHAR(11),
- month TINYINT,
- day TINYINT,
- value1 VARCHAR(5),
- value2 VARCHAR(5),
- value3 VARCHAR(5),
- value4 VARCHAR(5),
- value5 VARCHAR(5),
- value6 VARCHAR(5),
- value7 VARCHAR(5),
- value8 VARCHAR(5),
- value9 VARCHAR(5),
- value10 VARCHAR(5),
- value11 VARCHAR(5),
- value12 VARCHAR(5),
- value13 VARCHAR(5),
- value14 VARCHAR(5),
- value15 VARCHAR(5),
- value16 VARCHAR(5),
- value17 VARCHAR(5),
- value18 VARCHAR(5),
- value19 VARCHAR(5),
- value20 VARCHAR(5),
- value21 VARCHAR(5),
- value22 VARCHAR(5),
- value23 VARCHAR(5),
- value24 VARCHAR(5)
- );
复制代码
本文提供了一些脚本将txt中的数据导入到MySQL表中。你可以使用 insert_hly.py 来加载每小时的NOAA数据。只需要修改脚本中的主机(host),用户(user),密码(password)以及数据名称(database name)。完成修改后就能够将下载的hly-temp-normal.txt数据导入到mysql的hly_temp_normal 表中,使用命令如下:
- $ python insert_hly.py -f hly-temp-normal.txt -t hly_temp_normal
复制代码
译者注:此处给出python脚本下载地址(https://github.com/uprush/hac-book/blob/master/2-data-migration/script/insert_hly.py)
- (注:由于对于python的了解有限以及环境限制,所以单独另写了一段Java的代码,可以直接使用的:
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.io.Reader;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.PreparedStatement;
- import java.sql.SQLException;
- import java.util.ArrayList;
- import java.util.List;
-
- public class InsertHly {
- static String user="root";
- static String pwd="root123";
- static String driver="com.mysql.jdbc.Driver";
- static String url="jdbc:mysql://127.0.0.1:3306/htom?useUnicode=true&characterEncoding=UTF-8";
-
- public static void main(String[] args) throws SQLException {
- Connection baseCon = null;
- String sqlStr="insert into hly_temp_normal (stnid,month,day,value1,value2,value3,value4,value5,value6,value7,value8,value9,value10,value11,value12,value13,value14,value15,value16,value17,value18,value19,value20,value21,value22,value23,value24) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
- List parasValues=new ArrayList();
- try {
- baseCon = DriverManager.getConnection(url, user, pwd);
- } catch (SQLException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- // 替换为文件地址
- String allRowsStr=readFileByChars("d:\\TestZone\\hly-temp-normal.txt", "gbk");
- String[] rows=allRowsStr.split("\n");
- for(String row : rows){
- parasValues.add(row.split("\\s+"));
- }
-
- PreparedStatement basePsm = null;
- try {
- baseCon.setAutoCommit(false);
- basePsm = baseCon.prepareStatement(sqlStr);
- for (int i = 0; i < parasValues.size(); i++) {
- Object[] parasValue = parasValues.get(i);
- for (int j = 0; j < parasValue.length; j++) {
- basePsm.setObject(j + 1, parasValue[j]);
- }
- basePsm.addBatch();
- }
- basePsm.executeBatch();
- baseCon.commit();
- } catch (SQLException e) {
- baseCon.rollback();
- throw e;
- } finally {
- if (basePsm != null) {
- basePsm.close();
- basePsm = null;
- }
- if (baseCon != null) {
- baseCon.close();
- }
- }
-
- }
-
- public static String readFileByChars(String fileName, String enc) {
- StringBuffer content=new StringBuffer();
- Reader reader = null;
- try {
- // 一次读多个字符
- char[] tempchars = new char[30];
- int charread = 0;
- reader = new InputStreamReader(new FileInputStream(fileName),enc);
- // 读入多个字符到字符数组中,charread为一次读取字符数
- while ((charread = reader.read(tempchars)) != -1) {
- // 同样屏蔽掉\r不显示
- if ((charread == tempchars.length)
- && (tempchars[tempchars.length - 1] != '\r')) {
- content.append(tempchars);
- } else {
- for (int i = 0; i < charread; i++) {
- if (tempchars[i] == '\r') {
- continue;
- } else {
- content.append(tempchars[i]);
- }
- }
- }
- }
- return content.toString();
-
- } catch (Exception e1) {
- e1.printStackTrace();
- } finally {
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException e1) {
- }
- }
- }
- return null;
- }
- }
-
- )
-
复制代码
为使得下一节中的Java源码能够编译,你需要下列库支持:
hadoop-core-1.0.2.jar
hbase-0.92.1.jar
mysql-connector-java-5.1.18.jar
你可以将他们手动加入classpath中,或者使用本文中的可用的示例代码。
在导入数据之前,确认HDFS, ZooKeeper,和HBase集群均正常运行。在HBase的客户端节点记录日志。
如何实施
通过单节点客户端将数据从MySQL导入HBase:
1.从HBase的客户端服务器从过HBase的Shell命令行,连接到HBase的集群。
- hadoop$ $HBASE_HOME/bin/hbase shell
复制代码
2.在HBase中创建 hly_temp 表
- hbase> create ‘hly_temp’, {NAME => ‘n’, VERSIONS => 1}
复制代码
3.写一个Java程序将数据从MySQL中导入HBase,并将其打包成jar。在Java中按照下列步骤导入数据:
i. 使用Java创建一个connectHBase() 方法来连接到指定的HBase表:
- $ vi Recipe1.java
- private static HTable connectHBase(String tablename) \
- throws IOException {
- HTable table = null;
- Configuration conf = HBaseConfiguration.create();
- table = new HTable(conf, tablename);
- return table;
- }
复制代码
ii. 使用Java创建一个 connectDB() 方法来 MySQL :
- $ vi Recipe1.java
- private static Connection connectDB() \
- throws Exception {
- String userName = "db_user";
- String password = "db_password";
- String url = "jdbc:mysql://db_host/database";
- Class.forName("com.mysql.jdbc.Driver").newInstance();
- Connection conn = DriverManager.getConnection(url,
- userName, password);
- return conn;
- }
复制代码
- $ vi Recipe1.java
- public class Recipe1 {
- public static void main(String[] args) {
- Connection dbConn = null;
- HTable htable = null;
- Statement stmt = null;
- String query = "select * from hly_temp_normal";
- try {
- dbConn = connectDB();
- htable = connectHBase("hly_temp");
- byte[] family = Bytes.toBytes("n");
- stmt = dbConn.createStatement();
- ResultSet rs = stmt.executeQuery(query);
- // time stamp for all inserted rows
- // 所有插入数据的时间戳
- long ts = System.currentTimeMillis();
- while (rs.next()) {
- String stationid = rs.getString("stnid");
- int month = rs.getInt("month");
- int day = rs.getInt("day");
- String rowkey = stationid + Common.lpad(String.
- valueOf(month), 2,
- '0') + Common.lpad(String.valueOf(day), 2, '0');
- Put p = new Put(Bytes.toBytes(rowkey));
- // get hourly data from MySQL and put into hbase
- //从MySQL中获取小时数据并存入HBase
- for (int i = 5; i < 29; i++) {
- String columnI = "v" + Common.lpad
- (String.valueOf(i - 4), 2, '0');
- String valueI = rs.getString(i);
- p.add(family, Bytes.toBytes(columnI), ts,
- Bytes.toBytes(valueI));
- }
- htable.put(p);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- if (stmt != null) {
- stmt.close();
- }
- if (dbConn != null) {
- dbConn.close();
- }
- if (htable != null) {
- htable.close();
- }
- } catch (Exception e) {
- // ignore
- }
- }
- }
- }
复制代码
4.运行导入任务,下面的脚本就是用于执行JAR文件:
- #/bin/bash
- bin=`dirname $0`
- bin=`cd $bin;pwd`
- cp=$HBASE_HOME/conf:$HBASE_HOME/hbase-0.92.1.jar:$bin/build/hac-
- chapter2.jar
- for jar in $bin/lib/*.jar
- do
- cp=$cp:$jar
- done
- for jar in $HBASE_HOME/lib/*.jar
- do
- cp=$cp:$jar
- done
-
-
- $JAVA_HOME/bin/java -classpath $cp “hac.chapter2.Recipe1″
-
复制代码
5.验证HBase中导入的数据,通过HBase的Shell连接至HBase:
- hadoop$ $HBASE_HOME/bin/hbase shell
复制代码
6.验证数据已经被导入了HBase的对应表中:
- hbase> count ‘hly_temp’
- 95630 row(s) in 8.9850 seconds
- hbase> scan ‘hly_temp’, {LIMIT => 10}
- …
- AQW000617050110 column=n:v23,
- timestamp=1322958813521, value=814S
- AQW000617050110 column=n:v24,
- timestamp=1322958813521, value=811C
- 10 row(s) in 0.6730 seconds
复制代码
运行原理
在步骤1和2中,我们在HBase中创建了目标表用于插入数据。目标表名称为hly_temp,且只有单个列族(column family) n。我们将列族名称设计为一个字母的原因,是因为列族名称会存储在HBase的每个键值对中。使用短名能够让数据的存储和缓存更有效率。我们只需要保留一个版本的数据,所以为列族指定VERSION属性。
在Java代码中,为了连接到HBase,我们首先创建一个配置(Configuration )对象,使用该对象创建一个HTable实例。这个HTable对象用于处理所有的客户端API调用。如你所见,我们在代码没有设置任何 ZooKeeper或HBase的连接配置。所以程序该如何连接到运行的HBase集群呢?这或许是因为我们在步骤4中将 $HBase/conf目录添加到classpath中了。通过上述设置,HBase的客户端API会classpath中的hbase- site.xml加载配置信息。连接配置信息在hbase-site.xml中设置。
在使用JDBC中MySQL中获取数据之后,我们循环读取结果集,将MySQL中的一行映射为HBase表中的一行。此处我们使用 stationid,月份和日期栏位来生成HBase数据的row key。我们在月份和日期左边也填充0,补足2位数。这样做很重要,因为HBase的row key是按照字典排序的,意味着12将排序在2之前,这样可能会导致一些意外的情况发生。
我们创建了Put对象,利用row key添加一行数据。每小时的数据的添加需要调用Put.add()方法,传入参数包括列族(column family), 限定符(qualifier),时间戳( timestamp), and 值(value)。再次声明,我们使用很短的列族名称能够让存储数据更高效。所有的数据都被添加之后,我们调用HTable.put() 方法会将数据保存进HBase的table中。
最后,所有打开的资源都需要手动关闭。我们在代码中的final块中结束了MySQL和HBase的连接,这样确保即时导入动作中抛出异常仍然会被调用到。
你能够通过对比MySQL和HBase的数据行数来验证导入是否正确。你可以在扫描(scan)结果集中发现数据都准确的导入了HBase。
|