分享

hadoop集群迁移

pig2 2013-12-24 14:12:55 发表于 介绍解说 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 8655
本帖最后由 pig2 于 2013-12-24 14:15 编辑

首先介绍一个简单方法:

hadoop集群之间有时候需要将数据进行迁移,如将一些保存的过期文档放置在一个小集群中进行保存。

    使用的是社区提供的功能,distcp。用法非常简单:

    hadoop distcp hdfs://nn1:8020/foo/bar  hdfs://nn2:8020/bar/foo

    加上参数 -i 表示不用去管failure -m 设置map数

下面给大家介绍一下:

hadoop集群中的hbase数据迁移
在日常的使用过程中,可能经常需要将一个集群中hbase的数据迁移到或者拷贝到另外一个集群中,这时候,可能会出很多问题
以下是我在处理的过程中的一些做法和处理方式。
前提,两个hbase的版本一直,否则可能出现不可预知的问题,造成数据迁移失败
当两个集群不能通讯的时候,可以先将数据所在集群中hbase的数据文件拷贝到本地
具体做法如下:
  1. <P> </P>
复制代码
然后你懂得,将文件拷贝到你需要的你需要迁移到的那个集群中,目录是你的表的目录,
如果这个集群中也有对应的表文件,那么删除掉,然后拷贝。
  1. /bin/hadoop fs -rmr /hbase/tab_keywordflow
  2. /bin/hadoop fs -copyFromLocal /home/other/xiaochenbak /hbase/tab_keywordflow
  3. 此时的/home/other/xiaochenbak为你要迁移到数据的集群。
复制代码
重置该表在.META.表中的分区信息
  1. bin/hbase org.jruby.Main /home/other/hbase/bin/add_table.rb /hbase/tab_keywordflow
复制代码
  1. /home/other/hbase/bin/add_table.rb为ruby脚本,可以执行,脚本内容如下:另存为add_table.rb即可
复制代码
  1. #
  2. # Copyright 2009 The Apache Software Foundation
  3. #
  4. # Licensed to the Apache Software Foundation (ASF) under one
  5. # or more contributor license agreements. See the NOTICE file
  6. # distributed with this work for additional information
  7. # regarding copyright ownership. The ASF licenses this file
  8. # to you under the Apache License, Version 2.0 (the
  9. # "License"); you may not use this file except in compliance
  10. # with the License. You may obtain a copy of the License at
  11. #
  12. # http://www.apache.org/licenses/LICENSE-2.0
  13. #
  14. # Unless required by applicable law or agreed to in writing, software
  15. # distributed under the License is distributed on an "AS IS" BASIS,
  16. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  17. # See the License for the specific language governing permissions and
  18. # limitations under the License.
  19. #
  20. # Script adds a table back to a running hbase.
  21. # Currently only works on if table data is in place.
  22. #
  23. # To see usage for this script, run:
  24. #
  25. # ${HBASE_HOME}/bin/hbase org.jruby.Main addtable.rb
  26. #
  27. include Java
  28. import org.apache.hadoop.hbase.util.Bytes
  29. import org.apache.hadoop.hbase.HConstants
  30. import org.apache.hadoop.hbase.regionserver.HRegion
  31. import org.apache.hadoop.hbase.HRegionInfo
  32. import org.apache.hadoop.hbase.client.HTable
  33. import org.apache.hadoop.hbase.client.Delete
  34. import org.apache.hadoop.hbase.client.Put
  35. import org.apache.hadoop.hbase.client.Scan
  36. import org.apache.hadoop.hbase.HTableDescriptor
  37. import org.apache.hadoop.hbase.HBaseConfiguration
  38. import org.apache.hadoop.hbase.util.FSUtils
  39. import org.apache.hadoop.hbase.util.Writables
  40. import org.apache.hadoop.fs.Path
  41. import org.apache.hadoop.fs.FileSystem
  42. import org.apache.commons.logging.LogFactory
  43. # Name of this script
  44. NAME = "add_table"
  45. # Print usage for this script
  46. def usage
  47. puts 'Usage: %s.rb TABLE_DIR [alternate_tablename]' % NAME
  48. exit!
  49. end
  50. # Get configuration to use.
  51. c = HBaseConfiguration.new()
  52. # Set hadoop filesystem configuration using the hbase.rootdir.
  53. # Otherwise, we'll always use localhost though the hbase.rootdir
  54. # might be pointing at hdfs location.
  55. c.set("fs.default.name", c.get(HConstants::HBASE_DIR))
  56. fs = FileSystem.get(c)
  57. # Get a logger and a metautils instance.
  58. LOG = LogFactory.getLog(NAME)
  59. # Check arguments
  60. if ARGV.size < 1 || ARGV.size > 2
  61. usage
  62. end
  63. # Get cmdline args.
  64. srcdir = fs.makeQualified(Path.new(java.lang.String.new(ARGV[0])))
  65. if not fs.exists(srcdir)
  66. raise IOError.new("src dir " + srcdir.toString() + " doesn't exist!")
  67. end
  68. # Get table name
  69. tableName = nil
  70. if ARGV.size > 1
  71. tableName = ARGV[1]
  72. raise IOError.new("Not supported yet")
  73. elsif
  74. # If none provided use dirname
  75. tableName = srcdir.getName()
  76. end
  77. HTableDescriptor.isLegalTableName(tableName.to_java_bytes)
  78. # Figure locations under hbase.rootdir
  79. # Move directories into place; be careful not to overwrite.
  80. rootdir = FSUtils.getRootDir(c)
  81. tableDir = fs.makeQualified(Path.new(rootdir, tableName))
  82. # If a directory currently in place, move it aside.
  83. if srcdir.equals(tableDir)
  84. LOG.info("Source directory is in place under hbase.rootdir: " + srcdir.toString());
  85. elsif fs.exists(tableDir)
  86. movedTableName = tableName + "." + java.lang.System.currentTimeMillis().to_s
  87. movedTableDir = Path.new(rootdir, java.lang.String.new(movedTableName))
  88. LOG.warn("Moving " + tableDir.toString() + " aside as " + movedTableDir.toString());
  89. raise IOError.new("Failed move of " + tableDir.toString()) unless fs.rename(tableDir, movedTableDir)
  90. LOG.info("Moving " + srcdir.toString() + " to " + tableDir.toString());
  91. raise IOError.new("Failed move of " + srcdir.toString()) unless fs.rename(srcdir, tableDir)
  92. end
  93. # Clean mentions of table from .META.
  94. # Scan the .META. and remove all lines that begin with tablename
  95. LOG.info("Deleting mention of " + tableName + " from .META.")
  96. metaTable = HTable.new(c, HConstants::META_TABLE_NAME)
  97. tableNameMetaPrefix = tableName + HConstants::META_ROW_DELIMITER.chr
  98. scan = Scan.new((tableNameMetaPrefix + HConstants::META_ROW_DELIMITER.chr).to_java_bytes)
  99. scanner = metaTable.getScanner(scan)
  100. # Use java.lang.String doing compares. Ruby String is a bit odd.
  101. tableNameStr = java.lang.String.new(tableName)
  102. while (result = scanner.next())
  103. rowid = Bytes.toString(result.getRow())
  104. rowidStr = java.lang.String.new(rowid)
  105. if not rowidStr.startsWith(tableNameMetaPrefix)
  106. # Gone too far, break
  107. break
  108. end
  109. LOG.info("Deleting row from catalog: " + rowid);
  110. d = Delete.new(result.getRow())
  111. metaTable.delete(d)
  112. end
  113. scanner.close()
  114. # Now, walk the table and per region, add an entry
  115. LOG.info("Walking " + srcdir.toString() + " adding regions to catalog table")
  116. statuses = fs.listStatus(srcdir)
  117. for status in statuses
  118. next unless status.isDir()
  119. next if status.getPath().getName() == "compaction.dir"
  120. regioninfofile = Path.new(status.getPath(), HRegion::REGIONINFO_FILE)
  121. unless fs.exists(regioninfofile)
  122. LOG.warn("Missing .regioninfo: " + regioninfofile.toString())
  123. next
  124. end
  125. is = fs.open(regioninfofile)
  126. hri = HRegionInfo.new()
  127. hri.readFields(is)
  128. is.close()
  129. # TODO: Need to redo table descriptor with passed table name and then recalculate the region encoded names.
  130. p = Put.new(hri.getRegionName())
  131. p.add(HConstants::CATALOG_FAMILY, HConstants::REGIONINFO_QUALIFIER, Writables.getBytes(hri))
  132. metaTable.put(p)
  133. LOG.info("Added to catalog: " + hri.toString())
  134. end
复制代码
如何集群键可以通信,那就更好办了,相信你懂得,scp





没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条