把后面的URLString 封装成 URL类型。代码如下
[mw_shl_code=java,true]import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.hadoop.io.Writable;
/**
* @author liuyazhuang
*/
public class URLWritable implements Writable {
protected URL url;
public URLWritable() {
}
public URLWritable(URL url) {
this.url = url;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(url.toString());
}
@Override
public void readFields(DataInput in) throws IOException {
this.url = new URL(in.readUTF());
}
public void set(String string) {
try {
this.url = new URL(string);
} catch (MalformedURLException e) {
throw new RuntimeException("Should not have happened " + e.toString());
}
}
}
[/mw_shl_code]
[mw_shl_code=java,true]import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
/**
* @author liuyazhuang
*/
public class TimeUrlLineRecordReader extends RecordReader<Text, URLWritable> {
public static final String Time_URL_SEPERATOR =
"mapreduce.input.keyvaluelinerecordreader.key.value.separator";
private final LineRecordReader lineRecordReader;
private byte separator = (byte) '\t';
private Text innerValue;
private Text key;
private URLWritable value;
public static int findSeparator(byte[] utf, int start, int length, byte sep) {
for (int i = start; i < (start + length); i++) {
if (utf == sep) {
return i;
}
}
return -1;
}
public static void setKeyValue(Text key, URLWritable value, byte[] line,
int lineLen, int pos) {
if (pos == -1) {
key.set(line, 0, lineLen);
value.set(StringUtils.EMPTY);
} else {
key.set(line, 0, pos);
String url = null;
System.arraycopy(line, pos + 1,url , 0, lineLen - pos - 1);
value.set(url);
}
}
public TimeUrlLineRecordReader(Configuration conf) throws IOException {
lineRecordReader = new LineRecordReader();
String sepStr = conf.get(Time_URL_SEPERATOR, "\t");
this.separator = (byte) sepStr.charAt(0);
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
lineRecordReader.initialize(split, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
byte[] line = null;
int lineLen = -1;
if (lineRecordReader.nextKeyValue()) {
innerValue = lineRecordReader.getCurrentValue();
line = innerValue.getBytes();
lineLen = innerValue.getLength();
} else {
return false;
}
if (line == null) {
return false;
}
if (key == null) {
key = new Text();
}
if (value == null) {
value = new URLWritable();
}
int pos = findSeparator(line, 0, lineLen, this.separator);
setKeyValue(key, value, line, lineLen, pos);
return true;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public URLWritable getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return lineRecordReader.getProgress();
}
@Override
public void close() throws IOException {
lineRecordReader.close();
}
}
[/mw_shl_code]
[mw_shl_code=java,true]import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
* @author liuyazhuang
*/
public class TimeUrlTextInputFormat extends FileInputFormat<Text, URLWritable>{
@Override
protected boolean isSplitable(JobContext context, Path file) {
final CompressionCodec codec = new CompressionCodecFactory(
context.getConfiguration()).getCodec(file);
return codec == null;
}
@Override
public RecordReader<Text, URLWritable> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
context.setStatus(split.toString());
return new TimeUrlLineRecordReader(context.getConfiguration());
}
}
[/mw_shl_code]
[mw_shl_code=java,true]import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author liuyazhuang
*/
public class CustomTimeUrl extends Configured implements Tool {
public static class CustomTimeUrlMapper extends Mapper<Text, URLWritable, Text, URLWritable> {
@Override
protected void map(Text key, URLWritable value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
public static class CustomTimeUrlReducer extends Reducer<Text, URLWritable, Text, URLWritable> {
@Override
protected void reduce(Text key, Iterable<URLWritable> values,Context context)throws IOException, InterruptedException {
for (URLWritable value : values) {
context.write(key, value);
}
}
}
@Override
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
job.setJarByClass(getClass());
job.setJobName("CustomTimeUrl");
job.setInputFormatClass(TimeUrlTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(URLWritable.class);
job.setMapperClass(CustomTimeUrlMapper.class);
job.setReducerClass(CustomTimeUrlReducer.class);
FileInputFormat.setInputPaths(job, new Path("/timeurl/input/"));
FileOutputFormat.setOutputPath(job, new Path("/timeurl/output"));
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int result = ToolRunner.run(new TimeUrl(), args);
System.exit(result);
}
}
[/mw_shl_code]
|