今天执行生成HFile代码时报java.lang.ClassNotFoundException: Class com.it.hbase.IteblogBulkLoadDriver$HFileImportMapper2 not found 类找不到的问题。但是代码中有这个类。所以就搞不懂了。代码以前应该执行过,现在新搭建的环境不知道什么问题。我看后台进程服务都有启。
代码:
public class GenerateHFileMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
final String INPUT_PATH = "hdfs://xxxxx:9000/input";
final String OUTPUT_PATH = "hdfs://xxxxx:9000/output";
Configuration conf = HBaseConfiguration.create();
conf.addResource("core-site.xml");
conf.addResource("hbase-site.xml");
conf.addResource("hdfs-site.xml");
conf.addResource("hive-site.xml");
conf.addResource("mapred-site.xml");
conf.addResource("yarn-site.xml");
conf.addResource("adtec-config.xml");
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf("dws_tfc_state_inter_tp_index_d"));
Job job = Job.getInstance(conf);
job.setJarByClass(GenerateHFileMain.class);
job.setMapperClass(GenerateHFile.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("dws_tfc_state_inter_tp_index_d")));
FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
static class GenerateHFile extends Mapper<LongWritable,
Text, ImmutableBytesWritable, Put> {
private static Logger log = Logger.getLogger(GenerateHFile.class);
private long checkpoint = 10000L;//执行过程统计打印百分比进度
private long count = 0L;//
private String columns = null;//HBASE表字段配置信息
private byte[] family = null;//列簇
private int colLen = -1;//列长度
private byte[][] cols = null;//列集合
private boolean rowKeyReverse = false;//ROWKEY反转标识
/**
* 随机生成字母数字的混合
*
* @param length 生成长度
* @return 字符串
*/
public static String randomMix(int length) {
StringBuilder sb = new StringBuilder();
Random random = new Random();
for (int i = 0; i < length; i++) {
String charOrNum = random.nextInt(2) % 2 == 0 ? "char" : "num";
if ("char".equalsIgnoreCase(charOrNum)) {
int temp = random.nextInt(2) % 2 == 0 ? 65 : 97;
sb.append((char) (random.nextInt(26) + temp));
} else if ("num".equalsIgnoreCase(charOrNum)) {
sb.append(String.valueOf(random.nextInt(10)));
}
}
return sb.toString();
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
columns = context.getConfiguration().get("ztesoft.day.columns");
family = context.getConfiguration().get("ztesoft.day.family").getBytes();
String colNames[] = columns.split(",", -1);
colLen = colNames.length;
cols = new byte[colLen][];
for (int i = 0; i < colLen; i++) {
cols = Bytes.toBytes(colNames);
}
log.info("@BulkLoadMapper.setup,loadInfo:" + columns);
log.info("@BulkLoadMapper.setup,family:" + family);
log.info("@BulkLoadMapper.setup,rowKeyReverse:" + rowKeyReverse);
log.info("@BulkLoadMapper.setup,colLen:" + colLen);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/* String line = value.toString();
String[] items = line.split("\t");
String ROWKEY = items[1] + items[2] + items[3];
ImmutableBytesWritable rowkey = new ImmutableBytesWritable(ROWKEY.getBytes());
Put put = new Put(ROWKEY.getBytes()); //ROWKEY
put.addColumn("INFO".getBytes(), "URL".getBytes(), items[0].getBytes());
put.addColumn("INFO".getBytes(), "SP".getBytes(), items[1].getBytes()); //出发点
put.addColumn("INFO".getBytes(), "EP".getBytes(), items[2].getBytes()); //目的地
put.addColumn("INFO".getBytes(), "ST".getBytes(), items[3].getBytes()); //出发时间
put.addColumn("INFO".getBytes(), "PRICE".getBytes(), Bytes.toBytes(Integer.valueOf(items[4]))); //价格
put.addColumn("INFO".getBytes(), "TRAFFIC".getBytes(), items[5].getBytes());//交通方式
put.addColumn("INFO".getBytes(), "HOTEL".getBytes(), items[6].getBytes()); //酒店
context.write(rowkey, put);*/
String[] vals = value.toString().split(",", -1);//读取一行数据转换成数组
String suffixRowKey = new StringBuffer(vals[0].trim()).reverse().append(vals[2].trim()).append(vals[1].trim()).toString();
String rowKey = randomMix(4) + suffixRowKey;
System.out.println("rowKey : " + rowKey);
Put row = new Put(Bytes.toBytes(rowKey));//新建PUT
for (int i = 3; i < vals.length; i++) {
row.addColumn(family, cols[i - 3], Bytes.toBytes(vals));
}
try {
context.write(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), row); //写文件到HBASE
} catch (InterruptedException e) {
log.error("写入HBASE失败!数据:" + value.toString() + e.getMessage(), e);
}
System.out.println("==========================");
if (++count % checkpoint == 0) {
//显示写入HBASE进度情况
context.setStatus("Emitting Put " + this.count);
}
}
}
}
有谁知道是什么问题吗
|
-
后台进程
-
|