package org.apache.hadoop.examples;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class NoteTest {
public static class ValueTuple implements Writable{
int antrID=0;
double tangent=0;
public int getantrID(){
return antrID;
}
public void setantrID(int antrID){
this.antrID=antrID;
}
public double gettangent(){
return tangent;
}
public void settangent(double tangent){
this.tangent=tangent;
}
@Override
public void readFields(DataInput in) throws IOException {
antrID=in.readInt();
tangent=in.readDouble();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(antrID);
out.writeDouble(tangent);
}
public String toString() {
return antrID + "\t" + tangent ;
}
}
public static class Map extends Mapper<Object,Text,IntWritable,ValueTuple>{
ValueTuple outTuple=new ValueTuple();
int countTokens;
int i=0;
int Num=0;
double distance =0;
double angle=0;
int[] id=null;
double[] x=null;
double[] y=null;
double[] z=null;
@Override
protected void map(Object key, Text value, Context context)throws IOException, InterruptedException {
String total=value.toString();
StringTokenizer st = new StringTokenizer(total,"\n");
Num=Integer.valueOf(st.nextToken());
id = new int[Num];
x = new double[Num];
y = new double[Num];
z = new double[Num];
while(st.hasMoreTokens()){
StringTokenizer str = new StringTokenizer(st.nextToken());
countTokens = str.countTokens();
while(str.hasMoreTokens()){
String line=str.nextToken();
if (countTokens == 4) {
int ID=Integer.valueOf(line.trim());
id= ID;
double X=Double.valueOf(str.nextToken());
x = X;
double Y=Double.valueOf(str.nextToken());
y = Y;
double Z=Double.valueOf(str.nextToken());
z = Z;
}
}
}
for (int j=0;j<Num-1;j++){
for(int k=1;k<Num;k++){
distance =(double) Math.sqrt((x[k]-x[j])*(x[k]-x[j])+(y[k]-y[j])*(y[k]-y[j])+(z[k]-z[j])*(z[k]-z[j]));
if (distance<=1.8){
angle=Math.abs(Math.atan((y[k]-y[j])/(x[k]-x[j])));
IntWritable outID=new IntWritable(id[j]);
outTuple.setantrID(id[k]);
outTuple.settangent(angle);
context.write(outID, outTuple);
}
}
}
}
}
public static class Reduce extends Reducer<IntWritable,ValueTuple,IntWritable,ValueTuple>{
ValueTuple result = new ValueTuple();
@Override
protected void reduce(IntWritable key, Iterable<ValueTuple> values,Context context)throws IOException, InterruptedException {
context.write(key, result);
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
System.out.println("Begin");
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.out.println("please input at least 2 arguments");
System.exit(2);
}
Job job = new Job(conf,"NoteTest");
job.setJarByClass(NoteTest.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(ValueTuple.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
System.out.println("End");
}
}
|
|