本帖最后由 pig2 于 2013-12-24 14:15 编辑
首先介绍一个简单方法:
hadoop集群之间有时候需要将数据进行迁移,如将一些保存的过期文档放置在一个小集群中进行保存。 使用的是社区提供的功能,distcp。用法非常简单: hadoop distcp hdfs://nn1:8020/foo/bar hdfs://nn2:8020/bar/foo 加上参数 -i 表示不用去管failure -m 设置map数 下面给大家介绍一下: hadoop集群中的hbase数据迁移
在日常的使用过程中,可能经常需要将一个集群中hbase的数据迁移到或者拷贝到另外一个集群中,这时候,可能会出很多问题
以下是我在处理的过程中的一些做法和处理方式。
前提,两个hbase的版本一直,否则可能出现不可预知的问题,造成数据迁移失败
当两个集群不能通讯的时候,可以先将数据所在集群中hbase的数据文件拷贝到本地
具体做法如下:
复制代码 然后你懂得,将文件拷贝到你需要的你需要迁移到的那个集群中,目录是你的表的目录,
如果这个集群中也有对应的表文件,那么删除掉,然后拷贝。- /bin/hadoop fs -rmr /hbase/tab_keywordflow
-
- /bin/hadoop fs -copyFromLocal /home/other/xiaochenbak /hbase/tab_keywordflow
- 此时的/home/other/xiaochenbak为你要迁移到数据的集群。
复制代码
重置该表在.META.表中的分区信息- bin/hbase org.jruby.Main /home/other/hbase/bin/add_table.rb /hbase/tab_keywordflow
复制代码
- /home/other/hbase/bin/add_table.rb为ruby脚本,可以执行,脚本内容如下:另存为add_table.rb即可
复制代码
- #
- # Copyright 2009 The Apache Software Foundation
- #
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # Script adds a table back to a running hbase.
- # Currently only works on if table data is in place.
- #
- # To see usage for this script, run:
- #
- # ${HBASE_HOME}/bin/hbase org.jruby.Main addtable.rb
- #
- include Java
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.hadoop.hbase.HConstants
- import org.apache.hadoop.hbase.regionserver.HRegion
- import org.apache.hadoop.hbase.HRegionInfo
- import org.apache.hadoop.hbase.client.HTable
- import org.apache.hadoop.hbase.client.Delete
- import org.apache.hadoop.hbase.client.Put
- import org.apache.hadoop.hbase.client.Scan
- import org.apache.hadoop.hbase.HTableDescriptor
- import org.apache.hadoop.hbase.HBaseConfiguration
- import org.apache.hadoop.hbase.util.FSUtils
- import org.apache.hadoop.hbase.util.Writables
- import org.apache.hadoop.fs.Path
- import org.apache.hadoop.fs.FileSystem
- import org.apache.commons.logging.LogFactory
-
- # Name of this script
- NAME = "add_table"
-
- # Print usage for this script
- def usage
- puts 'Usage: %s.rb TABLE_DIR [alternate_tablename]' % NAME
- exit!
- end
-
- # Get configuration to use.
- c = HBaseConfiguration.new()
-
- # Set hadoop filesystem configuration using the hbase.rootdir.
- # Otherwise, we'll always use localhost though the hbase.rootdir
- # might be pointing at hdfs location.
- c.set("fs.default.name", c.get(HConstants::HBASE_DIR))
- fs = FileSystem.get(c)
-
- # Get a logger and a metautils instance.
- LOG = LogFactory.getLog(NAME)
-
- # Check arguments
- if ARGV.size < 1 || ARGV.size > 2
- usage
- end
-
- # Get cmdline args.
- srcdir = fs.makeQualified(Path.new(java.lang.String.new(ARGV[0])))
-
- if not fs.exists(srcdir)
- raise IOError.new("src dir " + srcdir.toString() + " doesn't exist!")
- end
-
- # Get table name
- tableName = nil
- if ARGV.size > 1
- tableName = ARGV[1]
- raise IOError.new("Not supported yet")
- elsif
- # If none provided use dirname
- tableName = srcdir.getName()
- end
- HTableDescriptor.isLegalTableName(tableName.to_java_bytes)
-
- # Figure locations under hbase.rootdir
- # Move directories into place; be careful not to overwrite.
- rootdir = FSUtils.getRootDir(c)
- tableDir = fs.makeQualified(Path.new(rootdir, tableName))
-
- # If a directory currently in place, move it aside.
- if srcdir.equals(tableDir)
- LOG.info("Source directory is in place under hbase.rootdir: " + srcdir.toString());
- elsif fs.exists(tableDir)
- movedTableName = tableName + "." + java.lang.System.currentTimeMillis().to_s
- movedTableDir = Path.new(rootdir, java.lang.String.new(movedTableName))
- LOG.warn("Moving " + tableDir.toString() + " aside as " + movedTableDir.toString());
- raise IOError.new("Failed move of " + tableDir.toString()) unless fs.rename(tableDir, movedTableDir)
- LOG.info("Moving " + srcdir.toString() + " to " + tableDir.toString());
- raise IOError.new("Failed move of " + srcdir.toString()) unless fs.rename(srcdir, tableDir)
- end
-
- # Clean mentions of table from .META.
- # Scan the .META. and remove all lines that begin with tablename
- LOG.info("Deleting mention of " + tableName + " from .META.")
- metaTable = HTable.new(c, HConstants::META_TABLE_NAME)
- tableNameMetaPrefix = tableName + HConstants::META_ROW_DELIMITER.chr
- scan = Scan.new((tableNameMetaPrefix + HConstants::META_ROW_DELIMITER.chr).to_java_bytes)
- scanner = metaTable.getScanner(scan)
- # Use java.lang.String doing compares. Ruby String is a bit odd.
- tableNameStr = java.lang.String.new(tableName)
- while (result = scanner.next())
- rowid = Bytes.toString(result.getRow())
- rowidStr = java.lang.String.new(rowid)
- if not rowidStr.startsWith(tableNameMetaPrefix)
- # Gone too far, break
- break
- end
- LOG.info("Deleting row from catalog: " + rowid);
- d = Delete.new(result.getRow())
- metaTable.delete(d)
- end
- scanner.close()
-
- # Now, walk the table and per region, add an entry
- LOG.info("Walking " + srcdir.toString() + " adding regions to catalog table")
- statuses = fs.listStatus(srcdir)
- for status in statuses
- next unless status.isDir()
- next if status.getPath().getName() == "compaction.dir"
- regioninfofile = Path.new(status.getPath(), HRegion::REGIONINFO_FILE)
- unless fs.exists(regioninfofile)
- LOG.warn("Missing .regioninfo: " + regioninfofile.toString())
- next
- end
- is = fs.open(regioninfofile)
- hri = HRegionInfo.new()
- hri.readFields(is)
- is.close()
- # TODO: Need to redo table descriptor with passed table name and then recalculate the region encoded names.
- p = Put.new(hri.getRegionName())
- p.add(HConstants::CATALOG_FAMILY, HConstants::REGIONINFO_QUALIFIER, Writables.getBytes(hri))
- metaTable.put(p)
- LOG.info("Added to catalog: " + hri.toString())
- end
复制代码
如何集群键可以通信,那就更好办了,相信你懂得,scp
|