xiaobaiyang 发表于 2019-4-26 17:08:45

生成HFile文件报错

今天执行生成HFile代码时报java.lang.ClassNotFoundException: Class com.it.hbase.IteblogBulkLoadDriver$HFileImportMapper2 not found 类找不到的问题。但是代码中有这个类。所以就搞不懂了。代码以前应该执行过,现在新搭建的环境不知道什么问题。我看后台进程服务都有启。
代码:

public class GenerateHFileMain {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

      final String INPUT_PATH = "hdfs://xxxxx:9000/input";
      final String OUTPUT_PATH = "hdfs://xxxxx:9000/output";

      Configuration conf = HBaseConfiguration.create();
      conf.addResource("core-site.xml");
      conf.addResource("hbase-site.xml");
      conf.addResource("hdfs-site.xml");
      conf.addResource("hive-site.xml");
      conf.addResource("mapred-site.xml");
      conf.addResource("yarn-site.xml");
      conf.addResource("adtec-config.xml");

      Connection connection = ConnectionFactory.createConnection(conf);
      Table table = connection.getTable(TableName.valueOf("dws_tfc_state_inter_tp_index_d"));
      Job job = Job.getInstance(conf);
      job.setJarByClass(GenerateHFileMain.class);
      job.setMapperClass(GenerateHFile.class);
      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
      job.setMapOutputValueClass(Put.class);
      job.setOutputFormatClass(HFileOutputFormat2.class);
      HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("dws_tfc_state_inter_tp_index_d")));
      FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
      FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
      System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    static class GenerateHFile extends Mapper<LongWritable,
            Text, ImmutableBytesWritable, Put> {

      private static Logger log = Logger.getLogger(GenerateHFile.class);

      private long checkpoint = 10000L;//执行过程统计打印百分比进度
      private long count = 0L;//
      private String columns = null;//HBASE表字段配置信息
      private byte[] family = null;//列簇
      private int colLen = -1;//列长度
      private byte[][] cols = null;//列集合
      private boolean rowKeyReverse = false;//ROWKEY反转标识

      /**
         * 随机生成字母数字的混合
         *
         * @param length 生成长度
         * @return 字符串
         */
      public static String randomMix(int length) {
            StringBuilder sb = new StringBuilder();
            Random random = new Random();
            for (int i = 0; i < length; i++) {
                String charOrNum = random.nextInt(2) % 2 == 0 ? "char" : "num";
                if ("char".equalsIgnoreCase(charOrNum)) {
                  int temp = random.nextInt(2) % 2 == 0 ? 65 : 97;
                  sb.append((char) (random.nextInt(26) + temp));
                } else if ("num".equalsIgnoreCase(charOrNum)) {
                  sb.append(String.valueOf(random.nextInt(10)));
                }
            }
            return sb.toString();
      }

      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
            columns = context.getConfiguration().get("ztesoft.day.columns");
            family = context.getConfiguration().get("ztesoft.day.family").getBytes();

            String colNames[] = columns.split(",", -1);
            colLen = colNames.length;
            cols = new byte[colLen][];

            for (int i = 0; i < colLen; i++) {
                cols = Bytes.toBytes(colNames);
            }

            log.info("@BulkLoadMapper.setup,loadInfo:" + columns);
            log.info("@BulkLoadMapper.setup,family:" + family);
            log.info("@BulkLoadMapper.setup,rowKeyReverse:" + rowKeyReverse);
            log.info("@BulkLoadMapper.setup,colLen:" + colLen);
      }

      @Override
      protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
         /* String line = value.toString();
            String[] items = line.split("\t");

            String ROWKEY = items + items + items;
            ImmutableBytesWritable rowkey = new ImmutableBytesWritable(ROWKEY.getBytes());
            Put put = new Put(ROWKEY.getBytes());   //ROWKEY
            put.addColumn("INFO".getBytes(), "URL".getBytes(), items.getBytes());
            put.addColumn("INFO".getBytes(), "SP".getBytes(), items.getBytes());//出发点
            put.addColumn("INFO".getBytes(), "EP".getBytes(), items.getBytes());//目的地
            put.addColumn("INFO".getBytes(), "ST".getBytes(), items.getBytes());   //出发时间
            put.addColumn("INFO".getBytes(), "PRICE".getBytes(), Bytes.toBytes(Integer.valueOf(items)));//价格
            put.addColumn("INFO".getBytes(), "TRAFFIC".getBytes(), items.getBytes());//交通方式
            put.addColumn("INFO".getBytes(), "HOTEL".getBytes(), items.getBytes()); //酒店

            context.write(rowkey, put);*/

            String[] vals = value.toString().split(",", -1);//读取一行数据转换成数组

            String suffixRowKey = new StringBuffer(vals[0].trim()).reverse().append(vals[2].trim()).append(vals[1].trim()).toString();
            String rowKey = randomMix(4) + suffixRowKey;
            System.out.println("rowKey : " + rowKey);
            Put row = new Put(Bytes.toBytes(rowKey));//新建PUT
            for (int i = 3; i < vals.length; i++) {
                row.addColumn(family, cols3], Bytes.toBytes(vals));
            }

            try {
                context.write(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), row); //写文件到HBASE
            } catch (InterruptedException e) {
                log.error("写入HBASE失败!数据:" + value.toString() + e.getMessage(), e);
            }

            System.out.println("==========================");
            if (++count % checkpoint == 0) {
                //显示写入HBASE进度情况
                context.setStatus("Emitting Put " + this.count);
            }
      }
    }
}

有谁知道是什么问题吗

s060403072 发表于 2019-4-26 19:33:32

那就不是代码的问题。找找其它的原因。比如权限,能否访问到,打包是否有问题等
页: [1]
查看完整版本: 生成HFile文件报错