public class StoreToHBaseAlgorithm extends BasicAlgorithm {
private static Logger logger = LoggerFactory.getLogger(StoreToHBaseAlgorithm.class);
public class Mapper01 extends Mapper<Object, Text, Text, Text>{
@Override
public void map(Object key,Text value, Context context) throws IOException, InterruptedException{
context.write(new Text(UUID.randomUUID().toString()), value);
}
}
public class Reducer01 extends TableReducer<Text, Text, NullWritable>{
private byte[] tableNameBytes = null;
private ArrayList<byte[]> columnNames = null;
@Override
public void setup(Context context){
tableNameBytes = Bytes.toBytes(context.getConfiguration().get("TABLE_NAME"));
String [] tempColumnNames = context.getConfiguration().getStrings("COLUMN_NAMES");
for (int i = 0; i < tempColumnNames.length; i++) {
columnNames.add(Bytes.toBytes(tempColumnNames[i]));
}
}
@Override
public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException{
Put put = new Put(Bytes.toBytes(key.toString()));
String[] values = pattern.split(key.toString());
for (int i = 0; i < columnNames.size(); i++) {
put.add(tableNameBytes, columnNames.get(i), Bytes.toBytes(values[i]));
}
context.write(NullWritable.get(), put);
}
}
@Override
public int run(Configuration conf, List<String> inputPaths, String outputPath, StandardTask task, String taskName) {
int result = Constants.ALGORITHM_RESULT_SUCCESS;
Path jarPath = null;
try {
Path input = null;
int i = 0;
for (String in : inputPaths) {
input = new Path(in);
Job job = Job.getInstance(conf, taskName + "store_to_hbase_job_" + i);
jarPath = storeTempJarToHDFS(conf, StoreToHBaseAlgorithm.class);
job.addArchiveToClassPath(jarPath);
job.setJarByClass(StoreToHBaseAlgorithm.class);
job.setMapperClass(Mapper01.class);
job.setReducerClass(Reducer01.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, input);
TableMapReduceUtil.initTableReducerJob(taskName, Reducer01.class, job);
if (!job.waitForCompletion(true)) {
throw new InterruptedException(job.getJobName());
} else {
if (jarPath != null) {
deleteTempJarFromHDFS(conf, jarPath);
jarPath = null;
}
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
result = Constants.ALGORITHM_RESULT_FAIL;
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
result = Constants.ALGORITHM_RESULT_FAIL;
} catch (InterruptedException e) {
logger.error("the job {} unexpected interrupted",e.getMessage());
result = Constants.ALGORITHM_RESULT_FAIL;
}
if (jarPath != null) {
deleteTempJarFromHDFS(conf, jarPath);
}
return result;
}
}
错误如下:
Error: java.lang.ClassNotFoundException: com.missionsky.scp.dataanalysis.algorithm.BasicAlgorithm
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:455)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:367)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2658)
at java.lang.Class.getConstructor0(Class.java:3062)
at java.lang.Class.getDeclaredConstructor(Class.java:2165)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:722)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
|
|