package hadoop.mapjoin;
import hadoop.other.Stock;
import hadoop.other.StockPrices;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MapSideJoin extends Configured implements Tool {
enum TESTW{
MISSING,
HELLO;
}
public static class MapSideJoinMapper extends Mapper<LongWritable, Text, Stock, StockPrices> {
private String stockSymbol;
private HashMap<Stock,Double> stocks = new HashMap<Stock,Double>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
stockSymbol = context.getConfiguration().get("stockSymbol");
Configuration conf = context.getConfiguration();
//Path[] files = context.getLocalCacheFiles();
Path[] files = DistributedCache.getLocalCacheFiles(conf);
FileSystem fs = FileSystem.getLocal(conf);
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files[0])));
String[] splits = br.readLine().split(",");
if(splits[1].equals(stockSymbol)){
stocks.put(new Stock(splits[1],splits[2]), Double.valueOf(splits[3]));
}
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Stock tmp = null;
String currentLine = value.toString();
String[] splits = currentLine.split(",");
if(splits[1].equals(stockSymbol)){
context.getCounter(TESTW.HELLO).increment(1);
context.getCounter(TESTW.MISSING).increment(stocks.size());
tmp = new Stock(splits[1],splits[2]);
}
if(stocks.containsKey(tmp)){
context.write(tmp, new StockPrices(stocks.get(tmp), Double.valueOf(splits[6])));
}
}
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "MapSideJoinJob");
job.setJarByClass(getClass());
Configuration conf = job.getConfiguration();
conf.set("stockSymbol", args[0]);
conf.set(TextOutputFormat.SEPERATOR, ",");
Path out = new Path("joinoutput");
out.getFileSystem(conf).delete(out,true);
FileInputFormat.setInputPaths(job, new Path("stocks"));
FileOutputFormat.setOutputPath(job, out);
DistributedCache.addCacheFile(new Path("dividends/NYSE_dividends_A.csv").toUri() , conf);
//job.addCacheFile(new Path("dividends/NYSE_dividends_A.csv").toUri());
job.setMapperClass(MapSideJoinMapper.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(Stock.class);
job.setMapOutputValueClass(StockPrices.class);
job.setNumReduceTasks(0);
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) {
int result = 0;
try {
result = ToolRunner.run(new Configuration(), new MapSideJoin(), args);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(result);
}
}
|